Azure
zenml.integrations.azure
special
Initialization of the ZenML Azure integration.
The Azure integration submodule provides a way to run ZenML pipelines in a cloud
environment. Specifically, it allows the use of cloud artifact stores,
and an io
module to handle file operations on Azure Blob Storage.
The Azure Step Operator integration submodule provides a way to run ZenML steps
in AzureML.
AzureIntegration (Integration)
Definition of Azure integration for ZenML.
Source code in zenml/integrations/azure/__init__.py
class AzureIntegration(Integration):
"""Definition of Azure integration for ZenML."""
NAME = AZURE
REQUIREMENTS = [
"adlfs>=2021.10.0",
"azure-keyvault-keys",
"azure-keyvault-secrets",
"azure-identity==1.10.0",
"azureml-core==1.54.0.post1",
"azure-mgmt-containerservice>=20.0.0",
"azure-storage-blob==12.17.0", # temporary fix for https://github.com/Azure/azure-sdk-for-python/issues/32056
"kubernetes",
]
@staticmethod
def activate() -> None:
"""Activate the Azure integration."""
from zenml.integrations.azure import service_connectors # noqa
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declares the flavors for the integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.azure.flavors import (
AzureArtifactStoreFlavor,
AzureMLStepOperatorFlavor,
)
return [
AzureArtifactStoreFlavor,
AzureMLStepOperatorFlavor,
]
activate()
staticmethod
Activate the Azure integration.
Source code in zenml/integrations/azure/__init__.py
@staticmethod
def activate() -> None:
"""Activate the Azure integration."""
from zenml.integrations.azure import service_connectors # noqa
flavors()
classmethod
Declares the flavors for the integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/azure/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declares the flavors for the integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.azure.flavors import (
AzureArtifactStoreFlavor,
AzureMLStepOperatorFlavor,
)
return [
AzureArtifactStoreFlavor,
AzureMLStepOperatorFlavor,
]
artifact_stores
special
Initialization of the Azure Artifact Store integration.
azure_artifact_store
Implementation of the Azure Artifact Store integration.
AzureArtifactStore (BaseArtifactStore, AuthenticationMixin)
Artifact Store for Microsoft Azure based artifacts.
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
class AzureArtifactStore(BaseArtifactStore, AuthenticationMixin):
"""Artifact Store for Microsoft Azure based artifacts."""
_filesystem: Optional[adlfs.AzureBlobFileSystem] = None
@property
def config(self) -> AzureArtifactStoreConfig:
"""Returns the `AzureArtifactStoreConfig` config.
Returns:
The configuration.
"""
return cast(AzureArtifactStoreConfig, self._config)
def get_credentials(self) -> Optional[AzureSecretSchema]:
"""Returns the credentials for the Azure Artifact Store if configured.
Returns:
The credentials.
Raises:
RuntimeError: If the connector is not configured with Azure service
principal credentials.
"""
connector = self.get_connector()
if connector:
from azure.identity import ClientSecretCredential
from azure.storage.blob import BlobServiceClient
client = connector.connect()
if not isinstance(client, BlobServiceClient):
raise RuntimeError(
f"Expected a {BlobServiceClient.__module__}."
f"{BlobServiceClient.__name__} object while "
f"trying to use the linked connector, but got "
f"{type(client)}."
)
# Get the credentials from the client
credentials = client.credential
if not isinstance(credentials, ClientSecretCredential):
raise RuntimeError(
"The Azure Artifact Store connector can only be used "
"with a service connector that is configured with "
"Azure service principal credentials."
)
return AzureSecretSchema(
client_id=credentials._client_id,
client_secret=credentials._client_credential,
tenant_id=credentials._tenant_id,
account_name=client.account_name,
)
secret = self.get_typed_authentication_secret(
expected_schema_type=AzureSecretSchema
)
return secret
@property
def filesystem(self) -> adlfs.AzureBlobFileSystem:
"""The adlfs filesystem to access this artifact store.
Returns:
The adlfs filesystem to access this artifact store.
"""
if not self._filesystem:
secret = self.get_credentials()
credentials = secret.get_values() if secret else {}
self._filesystem = adlfs.AzureBlobFileSystem(
**credentials,
anon=False,
use_listings_cache=False,
)
return self._filesystem
def _split_path(self, path: PathType) -> Tuple[str, str]:
"""Splits a path into the filesystem prefix and remainder.
Example:
```python
prefix, remainder = ZenAzure._split_path("az://my_container/test.txt")
print(prefix, remainder) # "az://" "my_container/test.txt"
```
Args:
path: The path to split.
Returns:
A tuple of the filesystem prefix and the remainder.
"""
path = convert_to_str(path)
prefix = ""
for potential_prefix in self.config.SUPPORTED_SCHEMES:
if path.startswith(potential_prefix):
prefix = potential_prefix
path = path[len(potential_prefix) :]
break
return prefix, path
def open(self, path: PathType, mode: str = "r") -> Any:
"""Open a file at the given path.
Args:
path: Path of the file to open.
mode: Mode in which to open the file. Currently, only
'rb' and 'wb' to read and write binary files are supported.
Returns:
A file-like object.
"""
return self.filesystem.open(path=path, mode=mode)
def copyfile(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file.
Args:
src: The path to copy from.
dst: The path to copy to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to copy to destination '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to copy anyway."
)
# TODO [ENG-151]: Check if it works with overwrite=True or if we need to
# manually remove it first
self.filesystem.copy(path1=src, path2=dst)
def exists(self, path: PathType) -> bool:
"""Check whether a path exists.
Args:
path: The path to check.
Returns:
True if the path exists, False otherwise.
"""
return self.filesystem.exists(path=path) # type: ignore[no-any-return]
def glob(self, pattern: PathType) -> List[PathType]:
"""Return all paths that match the given glob pattern.
The glob pattern may include:
- '*' to match any number of characters
- '?' to match a single character
- '[...]' to match one of the characters inside the brackets
- '**' as the full name of a path component to match to search
in subdirectories of any depth (e.g. '/some_dir/**/some_file)
Args:
pattern: The glob pattern to match, see details above.
Returns:
A list of paths that match the given glob pattern.
"""
prefix, _ = self._split_path(pattern)
return [
f"{prefix}{path}" for path in self.filesystem.glob(path=pattern)
]
def isdir(self, path: PathType) -> bool:
"""Check whether a path is a directory.
Args:
path: The path to check.
Returns:
True if the path is a directory, False otherwise.
"""
return self.filesystem.isdir(path=path) # type: ignore[no-any-return]
def listdir(self, path: PathType) -> List[PathType]:
"""Return a list of files in a directory.
Args:
path: The path to list.
Returns:
A list of files in the given directory.
"""
_, path = self._split_path(path)
def _extract_basename(file_dict: Dict[str, Any]) -> str:
"""Extracts the basename from a dictionary returned by the Azure filesystem.
Args:
file_dict: A dictionary returned by the Azure filesystem.
Returns:
The basename of the file.
"""
file_path = cast(str, file_dict["name"])
base_name = file_path[len(path) :]
return base_name.lstrip("/")
return [
_extract_basename(dict_)
for dict_ in self.filesystem.listdir(path=path)
]
def makedirs(self, path: PathType) -> None:
"""Create a directory at the given path.
If needed also create missing parent directories.
Args:
path: The path to create.
"""
self.filesystem.makedirs(path=path, exist_ok=True)
def mkdir(self, path: PathType) -> None:
"""Create a directory at the given path.
Args:
path: The path to create.
"""
self.filesystem.makedir(path=path, exist_ok=True)
def remove(self, path: PathType) -> None:
"""Remove the file at the given path.
Args:
path: The path to remove.
"""
self.filesystem.rm_file(path=path)
def rename(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to rename file to '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to rename anyway."
)
# TODO [ENG-152]: Check if it works with overwrite=True or if we need
# to manually remove it first
self.filesystem.rename(path1=src, path2=dst)
def rmtree(self, path: PathType) -> None:
"""Remove the given directory.
Args:
path: The path of the directory to remove.
"""
self.filesystem.delete(path=path, recursive=True)
def stat(self, path: PathType) -> Dict[str, Any]:
"""Return stat info for the given path.
Args:
path: The path to get stat info for.
Returns:
Stat info.
"""
return self.filesystem.stat(path=path) # type: ignore[no-any-return]
def size(self, path: PathType) -> int:
"""Get the size of a file in bytes.
Args:
path: The path to the file.
Returns:
The size of the file in bytes.
"""
return self.filesystem.size(path=path) # type: ignore[no-any-return]
def walk(
self,
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Unused argument to conform to interface.
onerror: Unused argument to conform to interface.
Yields:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
# TODO [ENG-153]: Additional params
prefix, _ = self._split_path(top)
for (
directory,
subdirectories,
files,
) in self.filesystem.walk(path=top):
yield f"{prefix}{directory}", subdirectories, files
config: AzureArtifactStoreConfig
property
readonly
Returns the AzureArtifactStoreConfig
config.
Returns:
Type | Description |
---|---|
AzureArtifactStoreConfig |
The configuration. |
filesystem: adlfs.AzureBlobFileSystem
property
readonly
The adlfs filesystem to access this artifact store.
Returns:
Type | Description |
---|---|
adlfs.AzureBlobFileSystem |
The adlfs filesystem to access this artifact store. |
copyfile(self, src, dst, overwrite=False)
Copy a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The path to copy from. |
required |
dst |
Union[bytes, str] |
The path to copy to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If a file already exists at the destination
and overwrite is not set to |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def copyfile(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file.
Args:
src: The path to copy from.
dst: The path to copy to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to copy to destination '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to copy anyway."
)
# TODO [ENG-151]: Check if it works with overwrite=True or if we need to
# manually remove it first
self.filesystem.copy(path1=src, path2=dst)
exists(self, path)
Check whether a path exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the path exists, False otherwise. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def exists(self, path: PathType) -> bool:
"""Check whether a path exists.
Args:
path: The path to check.
Returns:
True if the path exists, False otherwise.
"""
return self.filesystem.exists(path=path) # type: ignore[no-any-return]
get_credentials(self)
Returns the credentials for the Azure Artifact Store if configured.
Returns:
Type | Description |
---|---|
Optional[zenml.secret.schemas.azure_secret_schema.AzureSecretSchema] |
The credentials. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the connector is not configured with Azure service principal credentials. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def get_credentials(self) -> Optional[AzureSecretSchema]:
"""Returns the credentials for the Azure Artifact Store if configured.
Returns:
The credentials.
Raises:
RuntimeError: If the connector is not configured with Azure service
principal credentials.
"""
connector = self.get_connector()
if connector:
from azure.identity import ClientSecretCredential
from azure.storage.blob import BlobServiceClient
client = connector.connect()
if not isinstance(client, BlobServiceClient):
raise RuntimeError(
f"Expected a {BlobServiceClient.__module__}."
f"{BlobServiceClient.__name__} object while "
f"trying to use the linked connector, but got "
f"{type(client)}."
)
# Get the credentials from the client
credentials = client.credential
if not isinstance(credentials, ClientSecretCredential):
raise RuntimeError(
"The Azure Artifact Store connector can only be used "
"with a service connector that is configured with "
"Azure service principal credentials."
)
return AzureSecretSchema(
client_id=credentials._client_id,
client_secret=credentials._client_credential,
tenant_id=credentials._tenant_id,
account_name=client.account_name,
)
secret = self.get_typed_authentication_secret(
expected_schema_type=AzureSecretSchema
)
return secret
glob(self, pattern)
Return all paths that match the given glob pattern.
The glob pattern may include: - '' to match any number of characters - '?' to match a single character - '[...]' to match one of the characters inside the brackets - '' as the full name of a path component to match to search in subdirectories of any depth (e.g. '/some_dir/*/some_file)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pattern |
Union[bytes, str] |
The glob pattern to match, see details above. |
required |
Returns:
Type | Description |
---|---|
List[Union[bytes, str]] |
A list of paths that match the given glob pattern. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def glob(self, pattern: PathType) -> List[PathType]:
"""Return all paths that match the given glob pattern.
The glob pattern may include:
- '*' to match any number of characters
- '?' to match a single character
- '[...]' to match one of the characters inside the brackets
- '**' as the full name of a path component to match to search
in subdirectories of any depth (e.g. '/some_dir/**/some_file)
Args:
pattern: The glob pattern to match, see details above.
Returns:
A list of paths that match the given glob pattern.
"""
prefix, _ = self._split_path(pattern)
return [
f"{prefix}{path}" for path in self.filesystem.glob(path=pattern)
]
isdir(self, path)
Check whether a path is a directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the path is a directory, False otherwise. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def isdir(self, path: PathType) -> bool:
"""Check whether a path is a directory.
Args:
path: The path to check.
Returns:
True if the path is a directory, False otherwise.
"""
return self.filesystem.isdir(path=path) # type: ignore[no-any-return]
listdir(self, path)
Return a list of files in a directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to list. |
required |
Returns:
Type | Description |
---|---|
List[Union[bytes, str]] |
A list of files in the given directory. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def listdir(self, path: PathType) -> List[PathType]:
"""Return a list of files in a directory.
Args:
path: The path to list.
Returns:
A list of files in the given directory.
"""
_, path = self._split_path(path)
def _extract_basename(file_dict: Dict[str, Any]) -> str:
"""Extracts the basename from a dictionary returned by the Azure filesystem.
Args:
file_dict: A dictionary returned by the Azure filesystem.
Returns:
The basename of the file.
"""
file_path = cast(str, file_dict["name"])
base_name = file_path[len(path) :]
return base_name.lstrip("/")
return [
_extract_basename(dict_)
for dict_ in self.filesystem.listdir(path=path)
]
makedirs(self, path)
Create a directory at the given path.
If needed also create missing parent directories.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to create. |
required |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def makedirs(self, path: PathType) -> None:
"""Create a directory at the given path.
If needed also create missing parent directories.
Args:
path: The path to create.
"""
self.filesystem.makedirs(path=path, exist_ok=True)
mkdir(self, path)
Create a directory at the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to create. |
required |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def mkdir(self, path: PathType) -> None:
"""Create a directory at the given path.
Args:
path: The path to create.
"""
self.filesystem.makedir(path=path, exist_ok=True)
open(self, path, mode='r')
Open a file at the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
Path of the file to open. |
required |
mode |
str |
Mode in which to open the file. Currently, only 'rb' and 'wb' to read and write binary files are supported. |
'r' |
Returns:
Type | Description |
---|---|
Any |
A file-like object. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def open(self, path: PathType, mode: str = "r") -> Any:
"""Open a file at the given path.
Args:
path: Path of the file to open.
mode: Mode in which to open the file. Currently, only
'rb' and 'wb' to read and write binary files are supported.
Returns:
A file-like object.
"""
return self.filesystem.open(path=path, mode=mode)
remove(self, path)
Remove the file at the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to remove. |
required |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def remove(self, path: PathType) -> None:
"""Remove the file at the given path.
Args:
path: The path to remove.
"""
self.filesystem.rm_file(path=path)
rename(self, src, dst, overwrite=False)
Rename source file to destination file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The path of the file to rename. |
required |
dst |
Union[bytes, str] |
The path to rename the source file to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If a file already exists at the destination
and overwrite is not set to |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def rename(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to rename file to '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to rename anyway."
)
# TODO [ENG-152]: Check if it works with overwrite=True or if we need
# to manually remove it first
self.filesystem.rename(path1=src, path2=dst)
rmtree(self, path)
Remove the given directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path of the directory to remove. |
required |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def rmtree(self, path: PathType) -> None:
"""Remove the given directory.
Args:
path: The path of the directory to remove.
"""
self.filesystem.delete(path=path, recursive=True)
size(self, path)
Get the size of a file in bytes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to the file. |
required |
Returns:
Type | Description |
---|---|
int |
The size of the file in bytes. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def size(self, path: PathType) -> int:
"""Get the size of a file in bytes.
Args:
path: The path to the file.
Returns:
The size of the file in bytes.
"""
return self.filesystem.size(path=path) # type: ignore[no-any-return]
stat(self, path)
Return stat info for the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to get stat info for. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
Stat info. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def stat(self, path: PathType) -> Dict[str, Any]:
"""Return stat info for the given path.
Args:
path: The path to get stat info for.
Returns:
Stat info.
"""
return self.filesystem.stat(path=path) # type: ignore[no-any-return]
walk(self, top, topdown=True, onerror=None)
Return an iterator that walks the contents of the given directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
top |
Union[bytes, str] |
Path of directory to walk. |
required |
topdown |
bool |
Unused argument to conform to interface. |
True |
onerror |
Optional[Callable[..., NoneType]] |
Unused argument to conform to interface. |
None |
Yields:
Type | Description |
---|---|
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]] |
An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def walk(
self,
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Unused argument to conform to interface.
onerror: Unused argument to conform to interface.
Yields:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
# TODO [ENG-153]: Additional params
prefix, _ = self._split_path(top)
for (
directory,
subdirectories,
files,
) in self.filesystem.walk(path=top):
yield f"{prefix}{directory}", subdirectories, files
flavors
special
Azure integration flavors.
azure_artifact_store_flavor
Azure artifact store flavor.
AzureArtifactStoreConfig (BaseArtifactStoreConfig, AuthenticationConfigMixin)
Configuration class for Azure Artifact Store.
Source code in zenml/integrations/azure/flavors/azure_artifact_store_flavor.py
class AzureArtifactStoreConfig(
BaseArtifactStoreConfig, AuthenticationConfigMixin
):
"""Configuration class for Azure Artifact Store."""
SUPPORTED_SCHEMES: ClassVar[Set[str]] = {"abfs://", "az://"}
AzureArtifactStoreFlavor (BaseArtifactStoreFlavor)
Azure Artifact Store flavor.
Source code in zenml/integrations/azure/flavors/azure_artifact_store_flavor.py
class AzureArtifactStoreFlavor(BaseArtifactStoreFlavor):
"""Azure Artifact Store flavor."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return AZURE_ARTIFACT_STORE_FLAVOR
@property
def service_connector_requirements(
self,
) -> Optional[ServiceConnectorRequirements]:
"""Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available
service connector types that are compatible with this flavor.
Returns:
Requirements for compatible service connectors, if a service
connector is required for this flavor.
"""
return ServiceConnectorRequirements(
connector_type=AZURE_CONNECTOR_TYPE,
resource_type=BLOB_RESOURCE_TYPE,
resource_id_attr="path",
)
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/artifact_store/azure.png"
@property
def config_class(self) -> Type[AzureArtifactStoreConfig]:
"""Returns AzureArtifactStoreConfig config class.
Returns:
The config class.
"""
return AzureArtifactStoreConfig
@property
def implementation_class(self) -> Type["AzureArtifactStore"]:
"""Implementation class.
Returns:
The implementation class.
"""
from zenml.integrations.azure.artifact_stores import AzureArtifactStore
return AzureArtifactStore
config_class: Type[zenml.integrations.azure.flavors.azure_artifact_store_flavor.AzureArtifactStoreConfig]
property
readonly
Returns AzureArtifactStoreConfig config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.azure.flavors.azure_artifact_store_flavor.AzureArtifactStoreConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[AzureArtifactStore]
property
readonly
Implementation class.
Returns:
Type | Description |
---|---|
Type[AzureArtifactStore] |
The implementation class. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
service_connector_requirements: Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements]
property
readonly
Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.
Returns:
Type | Description |
---|---|
Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements] |
Requirements for compatible service connectors, if a service connector is required for this flavor. |
azureml_step_operator_flavor
AzureML step operator flavor.
AzureMLStepOperatorConfig (BaseStepOperatorConfig, AzureMLStepOperatorSettings)
Config for the AzureML step operator.
Attributes:
Name | Type | Description |
---|---|---|
subscription_id |
str |
The Azure account's subscription ID |
resource_group |
str |
The resource group to which the AzureML workspace is deployed. |
workspace_name |
str |
The name of the AzureML Workspace. |
compute_target_name |
str |
The name of the configured ComputeTarget. An instance of it has to be created on the portal if it doesn't exist already. |
tenant_id |
Optional[str] |
The Azure Tenant ID. |
service_principal_id |
Optional[str] |
The ID for the service principal that is created to allow apps to access secure resources. |
service_principal_password |
Optional[str] |
Password for the service principal. |
Source code in zenml/integrations/azure/flavors/azureml_step_operator_flavor.py
class AzureMLStepOperatorConfig(
BaseStepOperatorConfig, AzureMLStepOperatorSettings
):
"""Config for the AzureML step operator.
Attributes:
subscription_id: The Azure account's subscription ID
resource_group: The resource group to which the AzureML workspace
is deployed.
workspace_name: The name of the AzureML Workspace.
compute_target_name: The name of the configured ComputeTarget.
An instance of it has to be created on the portal if it doesn't
exist already.
tenant_id: The Azure Tenant ID.
service_principal_id: The ID for the service principal that is created
to allow apps to access secure resources.
service_principal_password: Password for the service principal.
"""
subscription_id: str
resource_group: str
workspace_name: str
compute_target_name: str
# Service principal authentication
# https://docs.microsoft.com/en-us/azure/machine-learning/how-to-setup-authentication#configure-a-service-principal
tenant_id: Optional[str] = SecretField(default=None)
service_principal_id: Optional[str] = SecretField(default=None)
service_principal_password: Optional[str] = SecretField(default=None)
@property
def is_remote(self) -> bool:
"""Checks if this stack component is running remotely.
This designation is used to determine if the stack component can be
used with a local ZenML database or if it requires a remote ZenML
server.
Returns:
True if this config is for a remote component, False otherwise.
"""
return True
is_remote: bool
property
readonly
Checks if this stack component is running remotely.
This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.
Returns:
Type | Description |
---|---|
bool |
True if this config is for a remote component, False otherwise. |
AzureMLStepOperatorFlavor (BaseStepOperatorFlavor)
Flavor for the AzureML step operator.
Source code in zenml/integrations/azure/flavors/azureml_step_operator_flavor.py
class AzureMLStepOperatorFlavor(BaseStepOperatorFlavor):
"""Flavor for the AzureML step operator."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return AZUREML_STEP_OPERATOR_FLAVOR
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/step_operator/azureml.png"
@property
def config_class(self) -> Type[AzureMLStepOperatorConfig]:
"""Returns AzureMLStepOperatorConfig config class.
Returns:
The config class.
"""
return AzureMLStepOperatorConfig
@property
def implementation_class(self) -> Type["AzureMLStepOperator"]:
"""Implementation class.
Returns:
The implementation class.
"""
from zenml.integrations.azure.step_operators import AzureMLStepOperator
return AzureMLStepOperator
config_class: Type[zenml.integrations.azure.flavors.azureml_step_operator_flavor.AzureMLStepOperatorConfig]
property
readonly
Returns AzureMLStepOperatorConfig config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.azure.flavors.azureml_step_operator_flavor.AzureMLStepOperatorConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[AzureMLStepOperator]
property
readonly
Implementation class.
Returns:
Type | Description |
---|---|
Type[AzureMLStepOperator] |
The implementation class. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
AzureMLStepOperatorSettings (BaseSettings)
Settings for the AzureML step operator.
Attributes:
Name | Type | Description |
---|---|---|
environment_name |
Optional[str] |
The name of the environment if there already exists one. |
Source code in zenml/integrations/azure/flavors/azureml_step_operator_flavor.py
class AzureMLStepOperatorSettings(BaseSettings):
"""Settings for the AzureML step operator.
Attributes:
environment_name: The name of the environment if there
already exists one.
"""
environment_name: Optional[str] = None
service_connectors
special
Azure Service Connector.
azure_service_connector
Azure Service Connector.
AzureAccessToken (AuthenticationConfig)
Azure access token credentials.
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureAccessToken(AuthenticationConfig):
"""Azure access token credentials."""
token: PlainSerializedSecretStr = Field(
title="Azure Access Token",
description="The Azure access token to use for authentication",
)
AzureAccessTokenConfig (AzureBaseConfig, AzureAccessToken)
Azure token configuration.
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureAccessTokenConfig(AzureBaseConfig, AzureAccessToken):
"""Azure token configuration."""
AzureAuthenticationMethods (StrEnum)
Azure Authentication methods.
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureAuthenticationMethods(StrEnum):
"""Azure Authentication methods."""
IMPLICIT = "implicit"
SERVICE_PRINCIPAL = "service-principal"
ACCESS_TOKEN = "access-token"
AzureBaseConfig (AuthenticationConfig)
Azure base configuration.
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureBaseConfig(AuthenticationConfig):
"""Azure base configuration."""
subscription_id: Optional[UUID] = Field(
default=None,
title="Azure Subscription ID",
description="The subscription ID of the Azure account. If not "
"specified, ZenML will attempt to retrieve the subscription ID from "
"Azure using the configured credentials.",
)
tenant_id: Optional[UUID] = Field(
default=None,
title="Azure Tenant ID",
description="The tenant ID of the Azure account. If not specified, "
"ZenML will attempt to retrieve the tenant from Azure using the "
"configured credentials.",
)
resource_group: Optional[str] = Field(
default=None,
title="Azure Resource Group",
description="A resource group may be used to restrict the scope of "
"Azure resources like AKS clusters and ACR repositories. If not "
"specified, ZenML will retrieve resources from all resource groups "
"accessible with the configured credentials.",
)
storage_account: Optional[str] = Field(
default=None,
title="Azure Storage Account",
description="The name of an Azure storage account may be used to "
"restrict the scope of Azure Blob storage containers. If not "
"specified, ZenML will retrieve blob containers from all storage "
"accounts accessible with the configured credentials.",
)
AzureClientConfig (AzureBaseConfig)
Azure client configuration.
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureClientConfig(AzureBaseConfig):
"""Azure client configuration."""
client_id: UUID = Field(
title="Azure Client ID",
description="The service principal's client ID",
)
tenant_id: UUID = Field(
title="Azure Tenant ID",
description="ID of the service principal's tenant. Also called its "
"'directory' ID.",
)
AzureClientSecret (AuthenticationConfig)
Azure client secret credentials.
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureClientSecret(AuthenticationConfig):
"""Azure client secret credentials."""
client_secret: PlainSerializedSecretStr = Field(
title="Service principal client secret",
description="The client secret of the service principal",
)
AzureServiceConnector (ServiceConnector)
Azure service connector.
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureServiceConnector(ServiceConnector):
"""Azure service connector."""
config: AzureBaseConfig
_subscription_id: Optional[str] = None
_subscription_name: Optional[str] = None
_tenant_id: Optional[str] = None
_session_cache: Dict[
str,
Tuple[TokenCredential, Optional[datetime.datetime]],
] = {}
@classmethod
def _get_connector_type(cls) -> ServiceConnectorTypeModel:
"""Get the service connector type specification.
Returns:
The service connector type specification.
"""
return AZURE_SERVICE_CONNECTOR_TYPE_SPEC
@property
def subscription(self) -> Tuple[str, str]:
"""Get the Azure subscription ID and name.
Returns:
The Azure subscription ID and name.
Raises:
AuthorizationException: If the Azure subscription could not be
determined or doesn't match the configured subscription ID.
"""
if self._subscription_id is None or self._subscription_name is None:
logger.debug("Getting subscription from Azure...")
try:
credential, _ = self.get_azure_credential(self.auth_method)
subscription_client = SubscriptionClient(credential)
if self.config.subscription_id is not None:
subscription = subscription_client.subscriptions.get(
str(self.config.subscription_id)
)
self._subscription_name = subscription.display_name
self._subscription_id = subscription.subscription_id
else:
subscriptions = list(
subscription_client.subscriptions.list()
)
if len(subscriptions) == 0:
raise AuthorizationException(
"no Azure subscriptions found for the configured "
"credentials."
)
if len(subscriptions) > 1:
raise AuthorizationException(
"multiple Azure subscriptions found for the "
"configured credentials. Please configure the "
"subscription ID explicitly in the connector."
)
subscription = subscriptions[0]
self._subscription_id = subscription.subscription_id
self._subscription_name = subscription.display_name
except AzureError as e:
raise AuthorizationException(
f"failed to fetch the Azure subscription: {e}"
) from e
assert self._subscription_id is not None
assert self._subscription_name is not None
return self._subscription_id, self._subscription_name
@property
def tenant_id(self) -> str:
"""Get the Azure tenant ID.
Returns:
The Azure tenant ID.
Raises:
AuthorizationException: If the Azure tenant ID could not be
determined or doesn't match the configured tenant ID.
"""
if self._tenant_id is None:
logger.debug("Getting tenant ID from Azure...")
try:
credential, _ = self.get_azure_credential(self.auth_method)
subscription_client = SubscriptionClient(credential)
tenants = subscription_client.tenants.list()
if self.config.tenant_id is not None:
for tenant in tenants:
if str(tenant.tenant_id) == str(self.config.tenant_id):
self._tenant_id = str(tenant.tenant_id)
break
else:
raise AuthorizationException(
"the configured tenant ID is not associated with "
"the configured credentials."
)
else:
tenants = list(tenants)
if len(tenants) == 0:
raise AuthorizationException(
"no Azure tenants found for the configured "
"credentials."
)
if len(tenants) > 1:
raise AuthorizationException(
"multiple Azure tenants found for the "
"configured credentials. Please configure the "
"tenant ID explicitly in the connector."
)
tenant = tenants[0]
self._tenant_id = tenant.tenant_id
except AzureError as e:
raise AuthorizationException(
f"failed to fetch the Azure tenant: {e}"
) from e
assert self._tenant_id is not None
return self._tenant_id
def get_azure_credential(
self,
auth_method: str,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
) -> Tuple[TokenCredential, Optional[datetime.datetime]]:
"""Get an Azure credential for the specified resource.
Args:
auth_method: The authentication method to use.
resource_type: The resource type to get a credential for.
resource_id: The resource ID to get a credential for.
Returns:
An Azure credential for the specified resource and its expiration
timestamp, if applicable.
"""
# We maintain a cache of all sessions to avoid re-authenticating
# multiple times for the same resource
key = auth_method
if key in self._session_cache:
session, expires_at = self._session_cache[key]
if expires_at is None:
return session, None
# Refresh expired sessions
now = datetime.datetime.now(datetime.timezone.utc)
expires_at = expires_at.replace(tzinfo=datetime.timezone.utc)
# check if the token expires in the near future
if expires_at > now + datetime.timedelta(
minutes=AZURE_SESSION_EXPIRATION_BUFFER
):
return session, expires_at
logger.debug(
f"Creating Azure credential for auth method '{auth_method}', "
f"resource type '{resource_type}' and resource ID "
f"'{resource_id}'..."
)
session, expires_at = self._authenticate(
auth_method, resource_type, resource_id
)
self._session_cache[key] = (session, expires_at)
return session, expires_at
def _authenticate(
self,
auth_method: str,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
) -> Tuple[TokenCredential, Optional[datetime.datetime]]:
"""Authenticate to Azure and return a token credential.
Args:
auth_method: The authentication method to use.
resource_type: The resource type to authenticate for.
resource_id: The resource ID to authenticate for.
Returns:
An Azure token credential and the expiration time of the
temporary credentials if applicable.
Raises:
NotImplementedError: If the authentication method is not supported.
AuthorizationException: If the authentication failed.
"""
cfg = self.config
credential: TokenCredential
if auth_method == AzureAuthenticationMethods.IMPLICIT:
self._check_implicit_auth_method_allowed()
try:
credential = DefaultAzureCredential()
except AzureError as e:
raise AuthorizationException(
f"Failed to authenticate to Azure using implicit "
f"authentication. Please check your local Azure CLI "
f"configuration or managed identity setup: {e}"
)
return credential, None
if auth_method == AzureAuthenticationMethods.SERVICE_PRINCIPAL:
assert isinstance(cfg, AzureServicePrincipalConfig)
try:
credential = ClientSecretCredential(
tenant_id=str(cfg.tenant_id),
client_id=str(cfg.client_id),
client_secret=cfg.client_secret.get_secret_value(),
)
except AzureError as e:
raise AuthorizationException(
f"Failed to authenticate to Azure using the provided "
f"service principal credentials. Please check your Azure "
f"configuration: {e}"
)
return credential, None
if auth_method == AzureAuthenticationMethods.ACCESS_TOKEN:
assert isinstance(cfg, AzureAccessTokenConfig)
expires_at = self.expires_at
assert expires_at is not None
credential = ZenMLAzureTokenCredential(
token=cfg.token.get_secret_value(),
expires_at=expires_at,
)
return credential, expires_at
raise NotImplementedError(
f"Authentication method '{auth_method}' is not supported by "
"the Azure connector."
)
def _parse_blob_container_resource_id(self, resource_id: str) -> str:
"""Validate and convert an Azure blob resource ID to a container name.
The resource ID could mean different things:
- Azure blob container URI: `{az|abfs}://{container-name}`
- Azure blob container name: `{container-name}`
This method extracts the container name from the provided resource ID.
Args:
resource_id: The resource ID to convert.
Returns:
The container name.
Raises:
ValueError: If the provided resource ID is not a valid Azure blob
resource ID.
"""
container_name: Optional[str] = None
if re.match(
r"^(az|abfs)://[a-z0-9](?!.*--)[a-z0-9-]{1,61}[a-z0-9](/.*)*$",
resource_id,
):
# The resource ID is an Azure blob container URI
container_name = resource_id.split("/")[2]
elif re.match(
r"^[a-z0-9](?!.*--)[a-z0-9-]{1,61}[a-z0-9]$",
resource_id,
):
# The resource ID is the Azure blob container name
container_name = resource_id
else:
raise ValueError(
f"Invalid resource ID for an Azure blob storage container: "
f"{resource_id}. "
"Supported formats are:\n"
"Azure blob container URI: {az|abfs}://<container-name>\n"
"Azure blob container name: <container-name>"
)
return container_name
def _parse_acr_resource_id(
self,
resource_id: str,
) -> str:
"""Validate and convert an ACR resource ID to an ACR registry name.
The resource ID could mean different things:
- ACR registry URI: `[https://]{registry-name}.azurecr.io`
- ACR registry name: `{registry-name}`
This method extracts the registry name from the provided resource ID.
Args:
resource_id: The resource ID to convert.
Returns:
The ACR registry name.
Raises:
ValueError: If the provided resource ID is not a valid ACR
resource ID.
"""
registry_name: Optional[str] = None
if re.match(
r"^(https?://)?[a-zA-Z0-9]+\.azurecr\.io(/.+)*$",
resource_id,
):
# The resource ID is an ACR registry URI
registry_name = resource_id.split(".")[0].split("/")[-1]
elif re.match(
r"^[a-zA-Z0-9]+$",
resource_id,
):
# The resource ID is an ACR registry name
registry_name = resource_id
else:
raise ValueError(
f"Invalid resource ID for a ACR registry: {resource_id}. "
f"Supported formats are:\n"
"ACR registry URI: [https://]{registry-name}.azurecr.io\n"
"ACR registry name: {registry-name}"
)
return registry_name
def _parse_aks_resource_id(
self, resource_id: str
) -> Tuple[Optional[str], str]:
"""Validate and convert an AKS resource ID to an AKS cluster name.
The resource ID could mean different things:
- resource group scoped AKS cluster name (canonical): `{resource-group}/{cluster-name}`
- AKS cluster name: `{cluster-name}`
This method extracts the resource group name and cluster name from the
provided resource ID.
Args:
resource_id: The resource ID to convert.
Returns:
The Azure resource group and AKS cluster name.
Raises:
ValueError: If the provided resource ID is not a valid AKS cluster
name.
"""
resource_group: Optional[str] = self.config.resource_group
if re.match(
r"^[a-zA-Z0-9_.()-]+/[a-zA-Z0-9]+[a-zA-Z0-9_-]*[a-zA-Z0-9]+$",
resource_id,
):
# The resource ID is an AKS cluster name including the resource
# group
resource_group, cluster_name = resource_id.split("/")
elif re.match(
r"^[a-zA-Z0-9]+[a-zA-Z0-9_-]*[a-zA-Z0-9]+$",
resource_id,
):
# The resource ID is an AKS cluster name without the resource group
cluster_name = resource_id
else:
raise ValueError(
f"Invalid resource ID for a AKS cluster: {resource_id}. "
f"Supported formats are:\n"
"resource group scoped AKS cluster name (canonical): {resource-group}/{cluster-name}\n"
"AKS cluster name: {cluster-name}"
)
if (
self.config.resource_group
and self.config.resource_group != resource_group
):
raise ValueError(
f"Invalid resource ID for an AKS cluster: {resource_id}. "
f"The resource group '{resource_group}' does not match the "
f"resource group configured in the connector: "
f"'{self.config.resource_group}'."
)
return resource_group, cluster_name
def _canonical_resource_id(
self, resource_type: str, resource_id: str
) -> str:
"""Convert a resource ID to its canonical form.
Args:
resource_type: The resource type to canonicalize.
resource_id: The resource ID to canonicalize.
Returns:
The canonical resource ID.
"""
if resource_type == BLOB_RESOURCE_TYPE:
container_name = self._parse_blob_container_resource_id(
resource_id
)
return f"az://{container_name}"
elif resource_type == KUBERNETES_CLUSTER_RESOURCE_TYPE:
resource_group, cluster_name = self._parse_aks_resource_id(
resource_id
)
if resource_group:
return f"{resource_group}/{cluster_name}"
return cluster_name
elif resource_type == DOCKER_REGISTRY_RESOURCE_TYPE:
registry_name = self._parse_acr_resource_id(
resource_id,
)
return f"{registry_name}.azurecr.io"
else:
return resource_id
def _get_default_resource_id(self, resource_type: str) -> str:
"""Get the default resource ID for a resource type.
Args:
resource_type: The type of the resource to get a default resource ID
for. Only called with resource types that do not support
multiple instances.
Returns:
The default resource ID for the resource type.
Raises:
RuntimeError: If the ECR registry ID (Azure account ID)
cannot be retrieved from Azure because the connector is not
authorized.
"""
if resource_type == AZURE_RESOURCE_TYPE:
_, subscription_name = self.subscription
return subscription_name
raise RuntimeError(
f"Default resource ID for '{resource_type}' not available."
)
def _connect_to_resource(
self,
**kwargs: Any,
) -> Any:
"""Authenticate and connect to an Azure resource.
Initialize and return a session or client object depending on the
connector configuration:
- initialize and return an Azure TokenCredential if the resource type
is a generic Azure resource
- initialize and return an Azure BlobServiceClient for an Azure blob
storage container resource type
For the Docker and Kubernetes resource types, the connector does not
support connecting to the resource directly. Instead, the connector
supports generating a connector client object for the resource type
in question.
Args:
kwargs: Additional implementation specific keyword arguments to pass
to the session or client constructor.
Returns:
A TokenCredential for Azure generic resources and a
BlobServiceClient client for an Azure blob storage container.
Raises:
NotImplementedError: If the connector instance does not support
directly connecting to the indicated resource type.
"""
resource_type = self.resource_type
resource_id = self.resource_id
assert resource_type is not None
assert resource_id is not None
# Regardless of the resource type, we must authenticate to Azure first
# before we can connect to any Azure resource
credential, _ = self.get_azure_credential(
self.auth_method,
resource_type=resource_type,
resource_id=resource_id,
)
if resource_type == BLOB_RESOURCE_TYPE:
container_name = self._parse_blob_container_resource_id(
resource_id
)
containers = self._list_blob_containers(
credential,
container_name=container_name,
)
container_name, storage_account = containers.popitem()
account_url = f"https://{storage_account}.blob.core.windows.net/"
blob_client = BlobServiceClient(
account_url=account_url,
credential=credential,
)
return blob_client
if resource_type == AZURE_RESOURCE_TYPE:
return credential
raise NotImplementedError(
f"Connecting to {resource_type} resources is not directly "
"supported by the Azure connector. Please call the "
f"`get_connector_client` method to get a {resource_type} connector "
"instance for the resource."
)
def _configure_local_client(
self,
**kwargs: Any,
) -> None:
"""Configure a local client to authenticate and connect to a resource.
This method uses the connector's configuration to configure a local
client or SDK installed on the localhost for the indicated resource.
Args:
kwargs: Additional implementation specific keyword arguments to use
to configure the client.
Raises:
NotImplementedError: If the connector instance does not support
local configuration for the configured resource type or
authentication method.registry
AuthorizationException: If the connector instance does not support
local configuration for the configured authentication method.
"""
resource_type = self.resource_type
if resource_type in [AZURE_RESOURCE_TYPE, BLOB_RESOURCE_TYPE]:
if (
self.auth_method
== AzureAuthenticationMethods.SERVICE_PRINCIPAL
):
# Use the service principal credentials to configure the local
# Azure CLI
assert isinstance(self.config, AzureServicePrincipalConfig)
command = [
"az",
"login",
"--service-principal",
"-u",
str(self.config.client_id),
"-p",
self.config.client_secret.get_secret_value(),
"--tenant",
str(self.config.tenant_id),
]
try:
subprocess.run(command, check=True)
except subprocess.CalledProcessError as e:
raise AuthorizationException(
f"Failed to update the local Azure CLI with the "
f"connector service principal credentials: {e}"
) from e
logger.info(
"Updated the local Azure CLI configuration with the "
"connector's service principal credentials."
)
return
raise NotImplementedError(
f"Local Azure client configuration for resource type "
f"{resource_type} is only supported if the "
f"'{AzureAuthenticationMethods.SERVICE_PRINCIPAL}' "
f"authentication method is used."
)
raise NotImplementedError(
f"Configuring the local client for {resource_type} resources is "
"not directly supported by the Azure connector. Please call the "
f"`get_connector_client` method to get a {resource_type} connector "
"instance for the resource."
)
@classmethod
def _auto_configure(
cls,
auth_method: Optional[str] = None,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
resource_group: Optional[str] = None,
storage_account: Optional[str] = None,
**kwargs: Any,
) -> "AzureServiceConnector":
"""Auto-configure the connector.
Instantiate an Azure connector with a configuration extracted from the
authentication configuration available in the environment (e.g.
environment variables or local Azure client/SDK configuration files).
Args:
auth_method: The particular authentication method to use. If not
specified, the connector implementation must decide which
authentication method to use or raise an exception.
resource_type: The type of resource to configure.
resource_id: The ID of the resource to configure. The implementation
may choose to either require or ignore this parameter if it does
not support or detect an resource type that supports multiple
instances.
resource_group: A resource group may be used to restrict the scope
of Azure resources like AKS clusters and ACR repositories. If
not specified, ZenML will retrieve resources from all resource
groups accessible with the discovered credentials.
storage_account: The name of an Azure storage account may be used to
restrict the scope of Azure Blob storage containers. If not
specified, ZenML will retrieve blob containers from all storage
accounts accessible with the discovered credentials.
kwargs: Additional implementation specific keyword arguments to use.
Returns:
An Azure connector instance configured with authentication
credentials automatically extracted from the environment.
Raises:
NotImplementedError: If the connector implementation does not
support auto-configuration for the specified authentication
method.
AuthorizationException: If no Azure credentials can be loaded from
the environment.
"""
auth_config: AzureBaseConfig
expiration_seconds: Optional[int] = None
expires_at: Optional[datetime.datetime] = None
if auth_method == AzureAuthenticationMethods.IMPLICIT:
auth_config = AzureBaseConfig(
resource_group=resource_group,
storage_account=storage_account,
)
else:
# Initialize an Azure credential with the default configuration
# loaded from the environment.
try:
credential = DefaultAzureCredential()
except AzureError as e:
raise AuthorizationException(
f"Failed to authenticate to Azure using implicit "
f"authentication. Please check your local Azure CLI "
f"configuration: {e}"
)
if (
auth_method
and auth_method != AzureAuthenticationMethods.ACCESS_TOKEN
):
raise NotImplementedError(
f"The specified authentication method '{auth_method}' "
"could not be used to auto-configure the connector. "
)
auth_method = AzureAuthenticationMethods.ACCESS_TOKEN
if resource_type == BLOB_RESOURCE_TYPE:
raise AuthorizationException(
f"Auto-configuration for {resource_type} resources is not "
"supported by the Azure connector."
)
else:
token = credential.get_token(
AZURE_MANAGEMENT_TOKEN_SCOPE,
)
# Convert the expiration timestamp from Unix time to datetime
# format.
expires_at = datetime.datetime.fromtimestamp(token.expires_on)
# Convert the expiration timestamp from local time to UTC time.
expires_at = expires_at.astimezone(datetime.timezone.utc)
auth_config = AzureAccessTokenConfig(
token=token.token,
resource_group=resource_group,
storage_account=storage_account,
)
return cls(
auth_method=auth_method,
resource_type=resource_type,
resource_id=resource_id
if resource_type not in [AZURE_RESOURCE_TYPE, None]
else None,
expiration_seconds=expiration_seconds,
expires_at=expires_at,
config=auth_config,
)
@classmethod
def _get_resource_group(cls, resource_id: str) -> str:
"""Get the resource group of an Azure resource.
Args:
resource_id: The ID of the Azure resource.
Returns:
The resource group of the Azure resource.
"""
# The resource group is the fourth component of the resource ID.
return resource_id.split("/")[4]
def _list_blob_containers(
self, credential: TokenCredential, container_name: Optional[str] = None
) -> Dict[str, str]:
"""Get the list of blob storage containers that the connector can access.
Args:
credential: The Azure credential to use to access the blob storage
containers.
container_name: The name of the blob container to get. If omitted,
all accessible blob containers are returned.
Returns:
The list of blob storage containers that the connector can access
as a dictionary mapping container names to their corresponding
storage account names. If the `container_name` argument was
specified, the dictionary will contain a single entry.
Raises:
AuthorizationException: If the connector does not have access to
access the target blob storage containers.
"""
subscription_id, _ = self.subscription
# Azure blob storage containers are scoped to a storage account. We
# need to figure out which storage accounts the connector can access
# and then list the containers in each of them. If a container name
# is provided, we only need to find the storage account that contains
# it.
storage_accounts: List[str] = []
if self.config.storage_account:
storage_accounts = [self.config.storage_account]
else:
try:
storage_client = StorageManagementClient(
credential, subscription_id
)
accounts = list(storage_client.storage_accounts.list())
except AzureError as e:
raise AuthorizationException(
f"failed to list available Azure storage accounts. Please "
f"check that the credentials have permissions to list "
f"storage accounts or consider configuring a storage "
f"account in the connector: {e}"
) from e
if not accounts:
raise AuthorizationException(
"no storage accounts were found. Please check that the "
"credentials have permissions to access one or more "
"storage accounts."
)
if self.config.resource_group:
# Keep only the storage accounts in the specified resource
# group.
accounts = [
account
for account in accounts
if self._get_resource_group(account.id)
== self.config.resource_group
]
if not accounts:
raise AuthorizationException(
f"no storage accounts were found in the "
f"'{self.config.resource_group}' resource group "
f"specified in the connector configuration. Please "
f"check that resource group contains one or more "
f"storage accounts and that the connector credentials "
f"have permissions to access them."
)
storage_accounts = [
account.name for account in accounts if account.name
]
containers: Dict[str, str] = {}
for storage_account in storage_accounts:
account_url = f"https://{storage_account}.blob.core.windows.net/"
try:
blob_client = BlobServiceClient(
account_url, credential=credential
)
response = blob_client.list_containers()
account_containers = [
container.name for container in response if container.name
]
except AzureError as e:
raise AuthorizationException(
f"failed to fetch Azure blob storage containers in the "
f"'{storage_account}' storage account. Please check that "
f"the credentials have permissions to list containers in "
f"that storage account: {e}"
) from e
if container_name:
if container_name not in account_containers:
continue
containers = {container_name: storage_account}
break
containers.update(
{
container: storage_account
for container in account_containers
}
)
if not containers:
if container_name:
if self.config.storage_account:
raise AuthorizationException(
f"the '{container_name}' container was not found in "
f"the '{self.config.storage_account}' storage "
f"account. Please check that container exists and the "
f"credentials have permissions to access that "
f"container."
)
raise AuthorizationException(
f"the '{container_name}' container was not found in any "
f"of the storage accounts accessible to the connector. "
f"Please check that container exists and the credentials "
f"have permissions to access the storage account it "
f"belongs to."
)
raise AuthorizationException(
"no Azure blob storage containers were found in any of the "
"storage accounts accessible to the connector. Please check "
"that the credentials have permissions to access one or more "
"storage accounts."
)
return containers
def _list_acr_registries(
self, credential: TokenCredential, registry_name: Optional[str] = None
) -> Dict[str, str]:
"""Get the list of ACR registries that the connector can access.
Args:
credential: The Azure credential to use.
registry_name: The name of the registry to get. If omitted,
all accessible registries are returned.
Returns:
The list of ACR registries that the connector can access as a
dictionary mapping registry names to their corresponding
resource group names. If the `registry_name` argument was
specified, the dictionary will contain a single entry.
Raises:
AuthorizationException: If the connector does not have access to
access the target ACR registries.
"""
subscription_id, _ = self.subscription
container_registries: Dict[str, str] = {}
if registry_name and self.config.resource_group:
try:
container_client = ContainerRegistryManagementClient(
credential, subscription_id
)
registry = container_client.registries.get(
resource_group_name=self.config.resource_group,
registry_name=registry_name,
)
if registry.name:
container_registries = {
registry.name: self.config.resource_group
}
except AzureError as e:
raise AuthorizationException(
f"failed to fetch the Azure container registry "
f"'{registry_name}' in the '{self.config.resource_group}' "
f"resource group specified in the connector configuration. "
f"Please check that the registry exists in that resource "
f"group and that the connector credentials have "
f"permissions to access that registry: {e}"
) from e
else:
try:
container_client = ContainerRegistryManagementClient(
credential, subscription_id
)
registries = list(container_client.registries.list())
except AzureError as e:
raise AuthorizationException(
"failed to list available Azure container registries. "
"Please check that the credentials have permissions to "
f"list container registries: {e}"
) from e
if not registries:
raise AuthorizationException(
"no container registries were found. Please check that the "
"credentials have permissions to access one or more "
"container registries."
)
if self.config.resource_group:
# Keep only the registries in the specified resource
# group.
registries = [
registry
for registry in registries
if self._get_resource_group(registry.id)
== self.config.resource_group
]
if not registries:
raise AuthorizationException(
f"no container registries were found in the "
f"'{self.config.resource_group}' resource group "
f"specified in the connector configuration. Please "
f"check that resource group contains one or more "
f"container registries and that the connector "
f"credentials have permissions to access them."
)
container_registries = {
registry.name: self._get_resource_group(registry.id)
for registry in registries
if registry.name
}
if registry_name:
if registry_name not in container_registries:
if self.config.resource_group:
raise AuthorizationException(
f"the '{registry_name}' registry was not found in "
f"the '{self.config.resource_group}' resource "
f"group specified in the connector configuration. "
f"Please check that registry exists in that "
f"resource group and that the connector "
f"credentials have permissions to access that "
f"registry."
)
raise AuthorizationException(
f"the '{registry_name}' registry was not found or is "
"not accessible. Please check that registry exists and "
"the credentials have permissions to access it."
)
container_registries = {
registry_name: container_registries[registry_name]
}
return container_registries
def _list_aks_clusters(
self,
credential: TokenCredential,
cluster_name: Optional[str] = None,
resource_group: Optional[str] = None,
) -> List[Tuple[str, str]]:
"""Get the list of AKS clusters that the connector can access.
Args:
credential: The Azure credential to use.
cluster_name: The name of the cluster to get. If omitted,
all accessible clusters are returned.
resource_group: The name of the resource group to which the
search should be limited. If omitted, all accessible
clusters are returned.
Returns:
The list of AKS clusters that the connector can access as a
list of cluster names and their corresponding resource group names.
If the `cluster_name` argument was specified, the dictionary will
contain a single entry.
Raises:
AuthorizationException: If the connector does not have access to
access the target AKS clusters.
"""
subscription_id, _ = self.subscription
clusters: List[Tuple[str, str]] = []
if cluster_name and resource_group:
try:
container_client = ContainerServiceClient(
credential, subscription_id
)
cluster = container_client.managed_clusters.get(
resource_group_name=resource_group,
resource_name=cluster_name,
)
if cluster.name:
clusters = [(cluster.name, resource_group)]
except AzureError as e:
raise AuthorizationException(
f"failed to fetch the Azure AKS cluster '{cluster_name}' "
f"in the '{resource_group}' resource group. Please check "
f"that the cluster exists in that resource group and that "
f"the connector credentials have permissions to access "
f"that cluster: {e}"
) from e
else:
try:
container_client = ContainerServiceClient(
credential, subscription_id
)
aks_clusters = list(container_client.managed_clusters.list())
except AzureError as e:
raise AuthorizationException(
f"failed to list available Azure AKS clusters. Please "
f"check that the credentials have permissions to list "
f"AKS clusters: {e}"
) from e
if not aks_clusters:
raise AuthorizationException(
"no AKS clusters were found. Please check that the "
"credentials have permissions to access one or more "
"AKS clusters."
)
if self.config.resource_group:
# Keep only the clusters in the specified resource
# group.
aks_clusters = [
cluster
for cluster in aks_clusters
if self._get_resource_group(cluster.id)
== self.config.resource_group
]
if not aks_clusters:
raise AuthorizationException(
f"no AKS clusters were found in the "
f"'{self.config.resource_group}' resource group "
f"specified in the connector configuration. Please "
f"check that resource group contains one or more "
f"AKS clusters and that the connector credentials "
f"have permissions to access them."
)
clusters = [
(cluster.name, self._get_resource_group(cluster.id))
for cluster in aks_clusters
if cluster.name
and (not cluster_name or cluster.name == cluster_name)
]
if cluster_name:
if not clusters:
if self.config.resource_group:
raise AuthorizationException(
f"the '{cluster_name}' AKS cluster was not found "
f"in the '{self.config.resource_group}' resource "
f"group specified in the connector configuration. "
f"Please check that cluster exists in that "
f"resource group and that the connector "
f"credentials have permissions to access that "
f"cluster."
)
raise AuthorizationException(
f"the '{cluster_name}' AKS cluster was not found or "
"is not accessible. Please check that cluster exists "
"and the credentials have permissions to access it."
)
if len(clusters) > 1:
resource_groups = [cluster[1] for cluster in clusters]
raise AuthorizationException(
f"the '{cluster_name}' AKS cluster was found in "
f"multiple resource groups: "
f"{', '.join(resource_groups)}. "
f"Please specify a resource group in the connector "
f"configuration or include the resource group in the "
"resource name to disambiguate."
)
return clusters
def _verify(
self,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
) -> List[str]:
"""Verify and list all the resources that the connector can access.
Args:
resource_type: The type of the resource to verify. If omitted and
if the connector supports multiple resource types, the
implementation must verify that it can authenticate and connect
to any and all of the supported resource types.
resource_id: The ID of the resource to connect to. Omitted if a
resource type is not specified. It has the same value as the
default resource ID if the supplied resource type doesn't
support multiple instances. If the supplied resource type does
allows multiple instances, this parameter may still be omitted
to fetch a list of resource IDs identifying all the resources
of the indicated type that the connector can access.
Returns:
The list of resources IDs in canonical format identifying the
resources that the connector can access. This list is empty only
if the resource type is not specified (i.e. for multi-type
connectors).
Raises:
AuthorizationException: If the connector cannot authenticate or
access the specified resource.
"""
# If the resource type is not specified, treat this the
# same as a generic Azure connector.
if not resource_type:
# fetch the Azure subscription and tenant IDs to verify the
# credentials globally
subscription_id, subscription_name = self.subscription
return []
if resource_type == AZURE_RESOURCE_TYPE:
assert resource_id is not None
return [resource_id]
credential, _ = self.get_azure_credential(
self.auth_method,
resource_type=resource_type or AZURE_RESOURCE_TYPE,
resource_id=resource_id,
)
if resource_type == BLOB_RESOURCE_TYPE:
if self.auth_method == AzureAuthenticationMethods.ACCESS_TOKEN:
raise AuthorizationException(
f"the '{self.auth_method}' authentication method is not "
"supported for blob storage resources"
)
container_name: Optional[str] = None
if resource_id:
container_name = self._parse_blob_container_resource_id(
resource_id
)
containers = self._list_blob_containers(
credential,
container_name=container_name,
)
return [f"az://{container}" for container in containers.keys()]
if resource_type == DOCKER_REGISTRY_RESOURCE_TYPE:
registry_name: Optional[str] = None
if resource_id:
registry_name = self._parse_acr_resource_id(resource_id)
registries = self._list_acr_registries(
credential,
registry_name=registry_name,
)
return [f"{registry}.azurecr.io" for registry in registries.keys()]
if resource_type == KUBERNETES_CLUSTER_RESOURCE_TYPE:
cluster_name: Optional[str] = None
resource_group = self.config.resource_group
if resource_id:
resource_group, cluster_name = self._parse_aks_resource_id(
resource_id
)
clusters = self._list_aks_clusters(
credential,
cluster_name=cluster_name,
resource_group=resource_group,
)
return [
f"{resource_group}/{cluster_name}"
for cluster_name, resource_group in clusters
]
return []
def _get_connector_client(
self,
resource_type: str,
resource_id: str,
) -> "ServiceConnector":
"""Get a connector instance that can be used to connect to a resource.
This method generates a client-side connector instance that can be used
to connect to a resource of the given type. The client-side connector
is configured with temporary Azure credentials extracted from the
current connector and, depending on resource type, it may also be
of a different connector type:
- a Kubernetes connector for Kubernetes clusters
- a Docker connector for Docker registries
Args:
resource_type: The type of the resources to connect to.
resource_id: The ID of a particular resource to connect to.
Returns:
An Azure, Kubernetes or Docker connector instance that can be used to
connect to the specified resource.
Raises:
AuthorizationException: If authentication failed.
ValueError: If the resource type is not supported.
RuntimeError: If the Kubernetes connector is not installed and the
resource type is Kubernetes.
"""
connector_name = ""
if self.name:
connector_name = self.name
if resource_id:
connector_name += f" ({resource_type} | {resource_id} client)"
else:
connector_name += f" ({resource_type} client)"
logger.debug(f"Getting connector client for {connector_name}")
if resource_type in [AZURE_RESOURCE_TYPE, BLOB_RESOURCE_TYPE]:
auth_method = self.auth_method
if (
self.resource_type == resource_type
and self.resource_id == resource_id
):
# If the requested type and resource ID are the same as
# those configured, we can return the current connector
# instance because it's fully formed and ready to use
# to connect to the specified resource
return self
config = self.config
expires_at = self.expires_at
# Create a client-side Azure connector instance that is fully formed
# and ready to use to connect to the specified resource (i.e. has
# all the necessary configuration and credentials, a resource type
# and a resource ID where applicable)
return AzureServiceConnector(
id=self.id,
name=connector_name,
auth_method=auth_method,
resource_type=resource_type,
resource_id=resource_id,
config=config,
expires_at=expires_at,
)
subscription_id, _ = self.subscription
credential, expires_at = self.get_azure_credential(
self.auth_method,
resource_type=resource_type,
resource_id=resource_id,
)
resource_group: Optional[str] = None
if resource_type == DOCKER_REGISTRY_RESOURCE_TYPE:
registry_name = self._parse_acr_resource_id(resource_id)
# If a service principal is used for authentication, the client ID
# and client secret can be used to authenticate to the registry, if
# configured.
# https://learn.microsoft.com/en-us/azure/container-registry/container-registry-auth-service-principal#authenticate-with-the-service-principal
if (
self.auth_method
== AzureAuthenticationMethods.SERVICE_PRINCIPAL
):
assert isinstance(self.config, AzureServicePrincipalConfig)
username = str(self.config.client_id)
password = self.config.client_secret
# Without a service principal, this only works if the admin account
# is enabled for the registry, but this is not recommended and
# disabled by default.
# https://docs.microsoft.com/en-us/azure/container-registry/container-registry-authentication#admin-account
else:
registries = self._list_acr_registries(
credential,
registry_name=registry_name,
)
registry_name, resource_group = registries.popitem()
try:
client = ContainerRegistryManagementClient(
credential, subscription_id
)
registry_credentials = client.registries.list_credentials(
resource_group, registry_name
)
username = registry_credentials.username
password = registry_credentials.passwords[0].value
except AzureError as e:
raise AuthorizationException(
f"failed to list admin credentials for Azure Container "
f"Registry '{registry_name}' in resource group "
f"'{resource_group}'. Make sure the registry is "
f"configured with an admin account: {e}"
) from e
# Create a client-side Docker connector instance with the temporary
# Docker credentials
return DockerServiceConnector(
id=self.id,
name=connector_name,
auth_method=DockerAuthenticationMethods.PASSWORD,
resource_type=resource_type,
config=DockerConfiguration(
username=username,
password=password,
registry=f"{registry_name}.azurecr.io",
),
expires_at=expires_at,
)
if resource_type == KUBERNETES_CLUSTER_RESOURCE_TYPE:
resource_group, cluster_name = self._parse_aks_resource_id(
resource_id
)
clusters = self._list_aks_clusters(
credential,
cluster_name=cluster_name,
resource_group=resource_group,
)
cluster_name, resource_group = clusters[0]
try:
client = ContainerServiceClient(credential, subscription_id)
creds = client.managed_clusters.list_cluster_admin_credentials(
resource_group_name=resource_group,
resource_name=cluster_name,
)
kubeconfig_yaml = creds.kubeconfigs[0].value.decode(
encoding="UTF-8"
)
except AzureError as e:
raise AuthorizationException(
f"failed to list credentials for Azure Kubernetes "
f"Service cluster '{cluster_name}' in resource group "
f"'{resource_group}': {e}"
) from e
kubeconfig = yaml.safe_load(kubeconfig_yaml)
# Create a client-side Kubernetes connector instance with the
# Kubernetes credentials
try:
# Import libraries only when needed
from zenml.integrations.kubernetes.service_connectors.kubernetes_service_connector import (
KubernetesAuthenticationMethods,
KubernetesServiceConnector,
KubernetesTokenConfig,
)
except ImportError as e:
raise RuntimeError(
f"The Kubernetes Service Connector functionality could not "
f"be used due to missing dependencies: {e}"
)
cluster_name = kubeconfig["clusters"][0]["name"]
cluster = kubeconfig["clusters"][0]["cluster"]
user = kubeconfig["users"][0]["user"]
return KubernetesServiceConnector(
id=self.id,
name=connector_name,
auth_method=KubernetesAuthenticationMethods.TOKEN,
resource_type=resource_type,
config=KubernetesTokenConfig(
cluster_name=cluster_name,
certificate_authority=cluster.get(
"certificate-authority-data"
),
server=cluster["server"],
token=user["token"],
client_certificate=user.get("client-certificate-data"),
client_key=user.get("client-key-data"),
),
expires_at=expires_at,
)
raise ValueError(f"Unsupported resource type: {resource_type}")
subscription: Tuple[str, str]
property
readonly
Get the Azure subscription ID and name.
Returns:
Type | Description |
---|---|
Tuple[str, str] |
The Azure subscription ID and name. |
Exceptions:
Type | Description |
---|---|
AuthorizationException |
If the Azure subscription could not be determined or doesn't match the configured subscription ID. |
tenant_id: str
property
readonly
Get the Azure tenant ID.
Returns:
Type | Description |
---|---|
str |
The Azure tenant ID. |
Exceptions:
Type | Description |
---|---|
AuthorizationException |
If the Azure tenant ID could not be determined or doesn't match the configured tenant ID. |
get_azure_credential(self, auth_method, resource_type=None, resource_id=None)
Get an Azure credential for the specified resource.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
auth_method |
str |
The authentication method to use. |
required |
resource_type |
Optional[str] |
The resource type to get a credential for. |
None |
resource_id |
Optional[str] |
The resource ID to get a credential for. |
None |
Returns:
Type | Description |
---|---|
Tuple[azure.core.credentials.TokenCredential, Optional[datetime.datetime]] |
An Azure credential for the specified resource and its expiration timestamp, if applicable. |
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
def get_azure_credential(
self,
auth_method: str,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
) -> Tuple[TokenCredential, Optional[datetime.datetime]]:
"""Get an Azure credential for the specified resource.
Args:
auth_method: The authentication method to use.
resource_type: The resource type to get a credential for.
resource_id: The resource ID to get a credential for.
Returns:
An Azure credential for the specified resource and its expiration
timestamp, if applicable.
"""
# We maintain a cache of all sessions to avoid re-authenticating
# multiple times for the same resource
key = auth_method
if key in self._session_cache:
session, expires_at = self._session_cache[key]
if expires_at is None:
return session, None
# Refresh expired sessions
now = datetime.datetime.now(datetime.timezone.utc)
expires_at = expires_at.replace(tzinfo=datetime.timezone.utc)
# check if the token expires in the near future
if expires_at > now + datetime.timedelta(
minutes=AZURE_SESSION_EXPIRATION_BUFFER
):
return session, expires_at
logger.debug(
f"Creating Azure credential for auth method '{auth_method}', "
f"resource type '{resource_type}' and resource ID "
f"'{resource_id}'..."
)
session, expires_at = self._authenticate(
auth_method, resource_type, resource_id
)
self._session_cache[key] = (session, expires_at)
return session, expires_at
model_post_init(self, __context)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
self |
BaseModel |
The BaseModel instance. |
required |
__context |
Any |
The context. |
required |
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
def init_private_attributes(self: BaseModel, __context: Any) -> None:
"""This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args:
self: The BaseModel instance.
__context: The context.
"""
if getattr(self, '__pydantic_private__', None) is None:
pydantic_private = {}
for name, private_attr in self.__private_attributes__.items():
default = private_attr.get_default()
if default is not PydanticUndefined:
pydantic_private[name] = default
object_setattr(self, '__pydantic_private__', pydantic_private)
AzureServicePrincipalConfig (AzureClientConfig, AzureClientSecret)
Azure service principal configuration.
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureServicePrincipalConfig(AzureClientConfig, AzureClientSecret):
"""Azure service principal configuration."""
ZenMLAzureTokenCredential (TokenCredential)
ZenML Azure token credential.
This class is used to provide a pre-configured token credential to Azure clients. Tokens are generated from other Azure credentials and are served to Azure clients to authenticate requests.
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class ZenMLAzureTokenCredential(TokenCredential):
"""ZenML Azure token credential.
This class is used to provide a pre-configured token credential to Azure
clients. Tokens are generated from other Azure credentials and are served
to Azure clients to authenticate requests.
"""
def __init__(self, token: str, expires_at: datetime.datetime):
"""Initialize ZenML Azure token credential.
Args:
token: The token to use for authentication
expires_at: The expiration time of the token
"""
self.token = token
# Convert the expiration time from UTC to local time
expires_at.replace(tzinfo=datetime.timezone.utc)
expires_at = expires_at.astimezone(
datetime.datetime.now().astimezone().tzinfo
)
self.expires_on = int(expires_at.timestamp())
def get_token(self, *scopes: str, **kwargs: Any) -> Any:
"""Get token.
Args:
*scopes: Scopes
**kwargs: Keyword arguments
Returns:
Token
"""
return AccessToken(token=self.token, expires_on=self.expires_on)
__init__(self, token, expires_at)
special
Initialize ZenML Azure token credential.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
token |
str |
The token to use for authentication |
required |
expires_at |
datetime |
The expiration time of the token |
required |
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
def __init__(self, token: str, expires_at: datetime.datetime):
"""Initialize ZenML Azure token credential.
Args:
token: The token to use for authentication
expires_at: The expiration time of the token
"""
self.token = token
# Convert the expiration time from UTC to local time
expires_at.replace(tzinfo=datetime.timezone.utc)
expires_at = expires_at.astimezone(
datetime.datetime.now().astimezone().tzinfo
)
self.expires_on = int(expires_at.timestamp())
get_token(self, *scopes, **kwargs)
Get token.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*scopes |
str |
Scopes |
() |
**kwargs |
Any |
Keyword arguments |
{} |
Returns:
Type | Description |
---|---|
Any |
Token |
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
def get_token(self, *scopes: str, **kwargs: Any) -> Any:
"""Get token.
Args:
*scopes: Scopes
**kwargs: Keyword arguments
Returns:
Token
"""
return AccessToken(token=self.token, expires_on=self.expires_on)
step_operators
special
Initialization of AzureML Step Operator integration.
azureml_step_operator
Implementation of the ZenML AzureML Step Operator.
AzureMLStepOperator (BaseStepOperator)
Step operator to run a step on AzureML.
This class defines code that can set up an AzureML environment and run the ZenML entrypoint command in it.
Source code in zenml/integrations/azure/step_operators/azureml_step_operator.py
class AzureMLStepOperator(BaseStepOperator):
"""Step operator to run a step on AzureML.
This class defines code that can set up an AzureML environment and run the
ZenML entrypoint command in it.
"""
@property
def config(self) -> AzureMLStepOperatorConfig:
"""Returns the `AzureMLStepOperatorConfig` config.
Returns:
The configuration.
"""
return cast(AzureMLStepOperatorConfig, self._config)
@property
def settings_class(self) -> Optional[Type["BaseSettings"]]:
"""Settings class for the AzureML step operator.
Returns:
The settings class.
"""
return AzureMLStepOperatorSettings
@property
def validator(self) -> Optional[StackValidator]:
"""Validates the stack.
Returns:
A validator that checks that the stack contains a remote artifact
store.
"""
def _validate_remote_artifact_store(
stack: "Stack",
) -> Tuple[bool, str]:
if stack.artifact_store.config.is_local:
return False, (
"The AzureML step operator runs code remotely and "
"needs to write files into the artifact store, but the "
f"artifact store `{stack.artifact_store.name}` of the "
"active stack is local. Please ensure that your stack "
"contains a remote artifact store when using the AzureML "
"step operator."
)
return True, ""
return StackValidator(
custom_validation_function=_validate_remote_artifact_store,
)
def _get_authentication(self) -> Optional[AbstractAuthentication]:
"""Returns the authentication object for the AzureML environment.
Returns:
The authentication object for the AzureML environment.
"""
if (
self.config.tenant_id
and self.config.service_principal_id
and self.config.service_principal_password
):
return ServicePrincipalAuthentication(
tenant_id=self.config.tenant_id,
service_principal_id=self.config.service_principal_id,
service_principal_password=self.config.service_principal_password,
)
return None
def _prepare_environment(
self,
workspace: Workspace,
docker_settings: "DockerSettings",
run_name: str,
environment_variables: Dict[str, str],
environment_name: Optional[str] = None,
) -> Environment:
"""Prepares the environment in which Azure will run all jobs.
Args:
workspace: The AzureML Workspace that has configuration
for a storage account, container registry among other
things.
docker_settings: The Docker settings for this step.
run_name: The name of the pipeline run that can be used
for naming environments and runs.
environment_variables: Environment variables to set in the
environment.
environment_name: Optional name of an existing environment to use.
Returns:
The AzureML Environment object.
"""
docker_image_builder = PipelineDockerImageBuilder()
requirements_files = docker_image_builder.gather_requirements_files(
docker_settings=docker_settings,
stack=Client().active_stack,
log=False,
)
requirements = list(
itertools.chain.from_iterable(
r[1].split("\n") for r in requirements_files
)
)
requirements.append(f"zenml=={zenml.__version__}")
logger.info(
"Using requirements for AzureML step operator environment: %s",
requirements,
)
if environment_name:
environment = Environment.get(
workspace=workspace, name=environment_name
)
if not environment.python.conda_dependencies:
environment.python.conda_dependencies = (
CondaDependencies.create(
python_version=ZenMLEnvironment.python_version()
)
)
for requirement in requirements:
environment.python.conda_dependencies.add_pip_package(
requirement
)
else:
environment = Environment(name=f"zenml-{run_name}")
environment.python.conda_dependencies = CondaDependencies.create(
pip_packages=requirements,
python_version=ZenMLEnvironment.python_version(),
)
if docker_settings.parent_image:
# replace the default azure base image
environment.docker.base_image = docker_settings.parent_image
# set credentials to access azure storage
for key in [
"AZURE_STORAGE_ACCOUNT_KEY",
"AZURE_STORAGE_ACCOUNT_NAME",
"AZURE_STORAGE_CONNECTION_STRING",
"AZURE_STORAGE_SAS_TOKEN",
]:
value = os.getenv(key)
if value:
environment_variables[key] = value
environment_variables[ENV_ZENML_CONFIG_PATH] = (
f"./{DOCKER_IMAGE_ZENML_CONFIG_DIR}"
)
environment_variables.update(docker_settings.environment)
environment.environment_variables = environment_variables
return environment
def launch(
self,
info: "StepRunInfo",
entrypoint_command: List[str],
environment: Dict[str, str],
) -> None:
"""Launches a step on AzureML.
Args:
info: Information about the step run.
entrypoint_command: Command that executes the step.
environment: Environment variables to set in the step operator
environment.
"""
if not info.config.resource_settings.empty:
logger.warning(
"Specifying custom step resources is not supported for "
"the AzureML step operator. If you want to run this step "
"operator on specific resources, you can do so by creating an "
"Azure compute target (https://docs.microsoft.com/en-us/azure/machine-learning/concept-compute-target) "
"with a specific machine type and then updating this step "
"operator: `zenml step-operator update %s "
"--compute_target_name=<COMPUTE_TARGET_NAME>`",
self.name,
)
unused_docker_fields = [
"dockerfile",
"build_context_root",
"build_options",
"skip_build",
"target_repository",
"dockerignore",
"copy_files",
"copy_global_config",
"apt_packages",
"user",
"source_files",
]
docker_settings = info.config.docker_settings
ignored_docker_fields = docker_settings.model_fields_set.intersection(
unused_docker_fields
)
if ignored_docker_fields:
logger.warning(
"The AzureML step operator currently does not support all "
"options defined in your Docker settings. Ignoring all "
"values set for the attributes: %s",
ignored_docker_fields,
)
settings = cast(AzureMLStepOperatorSettings, self.get_settings(info))
workspace = Workspace.get(
subscription_id=self.config.subscription_id,
resource_group=self.config.resource_group,
name=self.config.workspace_name,
auth=self._get_authentication(),
)
source_directory = source_utils.get_source_root()
environment = self._prepare_environment(
workspace=workspace,
docker_settings=docker_settings,
run_name=info.run_name,
environment_variables=environment,
environment_name=settings.environment_name,
)
compute_target = ComputeTarget(
workspace=workspace, name=self.config.compute_target_name
)
run_config = ScriptRunConfig(
source_directory=source_directory,
environment=environment,
compute_target=compute_target,
command=entrypoint_command,
)
experiment = Experiment(workspace=workspace, name=info.pipeline.name)
run = experiment.submit(config=run_config)
run.display_name = info.run_name
info.force_write_logs()
run.wait_for_completion(show_output=True)
config: AzureMLStepOperatorConfig
property
readonly
Returns the AzureMLStepOperatorConfig
config.
Returns:
Type | Description |
---|---|
AzureMLStepOperatorConfig |
The configuration. |
settings_class: Optional[Type[BaseSettings]]
property
readonly
Settings class for the AzureML step operator.
Returns:
Type | Description |
---|---|
Optional[Type[BaseSettings]] |
The settings class. |
validator: Optional[zenml.stack.stack_validator.StackValidator]
property
readonly
Validates the stack.
Returns:
Type | Description |
---|---|
Optional[zenml.stack.stack_validator.StackValidator] |
A validator that checks that the stack contains a remote artifact store. |
launch(self, info, entrypoint_command, environment)
Launches a step on AzureML.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
info |
StepRunInfo |
Information about the step run. |
required |
entrypoint_command |
List[str] |
Command that executes the step. |
required |
environment |
Dict[str, str] |
Environment variables to set in the step operator environment. |
required |
Source code in zenml/integrations/azure/step_operators/azureml_step_operator.py
def launch(
self,
info: "StepRunInfo",
entrypoint_command: List[str],
environment: Dict[str, str],
) -> None:
"""Launches a step on AzureML.
Args:
info: Information about the step run.
entrypoint_command: Command that executes the step.
environment: Environment variables to set in the step operator
environment.
"""
if not info.config.resource_settings.empty:
logger.warning(
"Specifying custom step resources is not supported for "
"the AzureML step operator. If you want to run this step "
"operator on specific resources, you can do so by creating an "
"Azure compute target (https://docs.microsoft.com/en-us/azure/machine-learning/concept-compute-target) "
"with a specific machine type and then updating this step "
"operator: `zenml step-operator update %s "
"--compute_target_name=<COMPUTE_TARGET_NAME>`",
self.name,
)
unused_docker_fields = [
"dockerfile",
"build_context_root",
"build_options",
"skip_build",
"target_repository",
"dockerignore",
"copy_files",
"copy_global_config",
"apt_packages",
"user",
"source_files",
]
docker_settings = info.config.docker_settings
ignored_docker_fields = docker_settings.model_fields_set.intersection(
unused_docker_fields
)
if ignored_docker_fields:
logger.warning(
"The AzureML step operator currently does not support all "
"options defined in your Docker settings. Ignoring all "
"values set for the attributes: %s",
ignored_docker_fields,
)
settings = cast(AzureMLStepOperatorSettings, self.get_settings(info))
workspace = Workspace.get(
subscription_id=self.config.subscription_id,
resource_group=self.config.resource_group,
name=self.config.workspace_name,
auth=self._get_authentication(),
)
source_directory = source_utils.get_source_root()
environment = self._prepare_environment(
workspace=workspace,
docker_settings=docker_settings,
run_name=info.run_name,
environment_variables=environment,
environment_name=settings.environment_name,
)
compute_target = ComputeTarget(
workspace=workspace, name=self.config.compute_target_name
)
run_config = ScriptRunConfig(
source_directory=source_directory,
environment=environment,
compute_target=compute_target,
command=entrypoint_command,
)
experiment = Experiment(workspace=workspace, name=info.pipeline.name)
run = experiment.submit(config=run_config)
run.display_name = info.run_name
info.force_write_logs()
run.wait_for_completion(show_output=True)