Skip to content

Azure

zenml.integrations.azure special

Initialization of the ZenML Azure integration.

The Azure integration submodule provides a way to run ZenML pipelines in a cloud environment. Specifically, it allows the use of cloud artifact stores, and an io module to handle file operations on Azure Blob Storage. The Azure Step Operator integration submodule provides a way to run ZenML steps in AzureML.

AzureIntegration (Integration)

Definition of Azure integration for ZenML.

Source code in zenml/integrations/azure/__init__.py
class AzureIntegration(Integration):
    """Definition of Azure integration for ZenML."""

    NAME = AZURE
    REQUIREMENTS = [
        "adlfs>=2021.10.0",
        "azure-keyvault-keys",
        "azure-keyvault-secrets",
        "azure-identity==1.10.0",
        "azureml-core==1.54.0.post1",
        "azure-mgmt-containerservice>=20.0.0",
        "azure-storage-blob==12.17.0",  # temporary fix for https://github.com/Azure/azure-sdk-for-python/issues/32056
        "kubernetes",
    ]

    @staticmethod
    def activate() -> None:
        """Activate the Azure integration."""
        from zenml.integrations.azure import service_connectors  # noqa

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declares the flavors for the integration.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.azure.flavors import (
            AzureArtifactStoreFlavor,
            AzureMLStepOperatorFlavor,
        )

        return [
            AzureArtifactStoreFlavor,
            AzureMLStepOperatorFlavor,
        ]

activate() staticmethod

Activate the Azure integration.

Source code in zenml/integrations/azure/__init__.py
@staticmethod
def activate() -> None:
    """Activate the Azure integration."""
    from zenml.integrations.azure import service_connectors  # noqa

flavors() classmethod

Declares the flavors for the integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/azure/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declares the flavors for the integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.azure.flavors import (
        AzureArtifactStoreFlavor,
        AzureMLStepOperatorFlavor,
    )

    return [
        AzureArtifactStoreFlavor,
        AzureMLStepOperatorFlavor,
    ]

artifact_stores special

Initialization of the Azure Artifact Store integration.

azure_artifact_store

Implementation of the Azure Artifact Store integration.

AzureArtifactStore (BaseArtifactStore, AuthenticationMixin)

Artifact Store for Microsoft Azure based artifacts.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
class AzureArtifactStore(BaseArtifactStore, AuthenticationMixin):
    """Artifact Store for Microsoft Azure based artifacts."""

    _filesystem: Optional[adlfs.AzureBlobFileSystem] = None

    @property
    def config(self) -> AzureArtifactStoreConfig:
        """Returns the `AzureArtifactStoreConfig` config.

        Returns:
            The configuration.
        """
        return cast(AzureArtifactStoreConfig, self._config)

    def get_credentials(self) -> Optional[AzureSecretSchema]:
        """Returns the credentials for the Azure Artifact Store if configured.

        Returns:
            The credentials.

        Raises:
            RuntimeError: If the connector is not configured with Azure service
                principal credentials.
        """
        connector = self.get_connector()
        if connector:
            from azure.identity import ClientSecretCredential
            from azure.storage.blob import BlobServiceClient

            client = connector.connect()
            if not isinstance(client, BlobServiceClient):
                raise RuntimeError(
                    f"Expected a {BlobServiceClient.__module__}."
                    f"{BlobServiceClient.__name__} object while "
                    f"trying to use the linked connector, but got "
                    f"{type(client)}."
                )
            # Get the credentials from the client
            credentials = client.credential
            if not isinstance(credentials, ClientSecretCredential):
                raise RuntimeError(
                    "The Azure Artifact Store connector can only be used "
                    "with a service connector that is configured with "
                    "Azure service principal credentials."
                )
            return AzureSecretSchema(
                client_id=credentials._client_id,
                client_secret=credentials._client_credential,
                tenant_id=credentials._tenant_id,
                account_name=client.account_name,
            )

        secret = self.get_typed_authentication_secret(
            expected_schema_type=AzureSecretSchema
        )
        return secret

    @property
    def filesystem(self) -> adlfs.AzureBlobFileSystem:
        """The adlfs filesystem to access this artifact store.

        Returns:
            The adlfs filesystem to access this artifact store.
        """
        if not self._filesystem:
            secret = self.get_credentials()
            credentials = secret.get_values() if secret else {}

            self._filesystem = adlfs.AzureBlobFileSystem(
                **credentials,
                anon=False,
                use_listings_cache=False,
            )
        return self._filesystem

    def _split_path(self, path: PathType) -> Tuple[str, str]:
        """Splits a path into the filesystem prefix and remainder.

        Example:
        ```python
        prefix, remainder = ZenAzure._split_path("az://my_container/test.txt")
        print(prefix, remainder)  # "az://" "my_container/test.txt"
        ```

        Args:
            path: The path to split.

        Returns:
            A tuple of the filesystem prefix and the remainder.
        """
        path = convert_to_str(path)
        prefix = ""
        for potential_prefix in self.config.SUPPORTED_SCHEMES:
            if path.startswith(potential_prefix):
                prefix = potential_prefix
                path = path[len(potential_prefix) :]
                break

        return prefix, path

    def open(self, path: PathType, mode: str = "r") -> Any:
        """Open a file at the given path.

        Args:
            path: Path of the file to open.
            mode: Mode in which to open the file. Currently, only
                'rb' and 'wb' to read and write binary files are supported.

        Returns:
            A file-like object.
        """
        return self.filesystem.open(path=path, mode=mode)

    def copyfile(
        self, src: PathType, dst: PathType, overwrite: bool = False
    ) -> None:
        """Copy a file.

        Args:
            src: The path to copy from.
            dst: The path to copy to.
            overwrite: If a file already exists at the destination, this
                method will overwrite it if overwrite=`True` and
                raise a FileExistsError otherwise.

        Raises:
            FileExistsError: If a file already exists at the destination
                and overwrite is not set to `True`.
        """
        if not overwrite and self.filesystem.exists(dst):
            raise FileExistsError(
                f"Unable to copy to destination '{convert_to_str(dst)}', "
                f"file already exists. Set `overwrite=True` to copy anyway."
            )

        # TODO [ENG-151]: Check if it works with overwrite=True or if we need to
        #  manually remove it first
        self.filesystem.copy(path1=src, path2=dst)

    def exists(self, path: PathType) -> bool:
        """Check whether a path exists.

        Args:
            path: The path to check.

        Returns:
            True if the path exists, False otherwise.
        """
        return self.filesystem.exists(path=path)  # type: ignore[no-any-return]

    def glob(self, pattern: PathType) -> List[PathType]:
        """Return all paths that match the given glob pattern.

        The glob pattern may include:
        - '*' to match any number of characters
        - '?' to match a single character
        - '[...]' to match one of the characters inside the brackets
        - '**' as the full name of a path component to match to search
            in subdirectories of any depth (e.g. '/some_dir/**/some_file)

        Args:
            pattern: The glob pattern to match, see details above.

        Returns:
            A list of paths that match the given glob pattern.
        """
        prefix, _ = self._split_path(pattern)
        return [
            f"{prefix}{path}" for path in self.filesystem.glob(path=pattern)
        ]

    def isdir(self, path: PathType) -> bool:
        """Check whether a path is a directory.

        Args:
            path: The path to check.

        Returns:
            True if the path is a directory, False otherwise.
        """
        return self.filesystem.isdir(path=path)  # type: ignore[no-any-return]

    def listdir(self, path: PathType) -> List[PathType]:
        """Return a list of files in a directory.

        Args:
            path: The path to list.

        Returns:
            A list of files in the given directory.
        """
        _, path = self._split_path(path)

        def _extract_basename(file_dict: Dict[str, Any]) -> str:
            """Extracts the basename from a dictionary returned by the Azure filesystem.

            Args:
                file_dict: A dictionary returned by the Azure filesystem.

            Returns:
                The basename of the file.
            """
            file_path = cast(str, file_dict["name"])
            base_name = file_path[len(path) :]
            return base_name.lstrip("/")

        return [
            _extract_basename(dict_)
            for dict_ in self.filesystem.listdir(path=path)
        ]

    def makedirs(self, path: PathType) -> None:
        """Create a directory at the given path.

        If needed also create missing parent directories.

        Args:
            path: The path to create.
        """
        self.filesystem.makedirs(path=path, exist_ok=True)

    def mkdir(self, path: PathType) -> None:
        """Create a directory at the given path.

        Args:
            path: The path to create.
        """
        self.filesystem.makedir(path=path, exist_ok=True)

    def remove(self, path: PathType) -> None:
        """Remove the file at the given path.

        Args:
            path: The path to remove.
        """
        self.filesystem.rm_file(path=path)

    def rename(
        self, src: PathType, dst: PathType, overwrite: bool = False
    ) -> None:
        """Rename source file to destination file.

        Args:
            src: The path of the file to rename.
            dst: The path to rename the source file to.
            overwrite: If a file already exists at the destination, this
                method will overwrite it if overwrite=`True` and
                raise a FileExistsError otherwise.

        Raises:
            FileExistsError: If a file already exists at the destination
                and overwrite is not set to `True`.
        """
        if not overwrite and self.filesystem.exists(dst):
            raise FileExistsError(
                f"Unable to rename file to '{convert_to_str(dst)}', "
                f"file already exists. Set `overwrite=True` to rename anyway."
            )

        # TODO [ENG-152]: Check if it works with overwrite=True or if we need
        #  to manually remove it first
        self.filesystem.rename(path1=src, path2=dst)

    def rmtree(self, path: PathType) -> None:
        """Remove the given directory.

        Args:
            path: The path of the directory to remove.
        """
        self.filesystem.delete(path=path, recursive=True)

    def stat(self, path: PathType) -> Dict[str, Any]:
        """Return stat info for the given path.

        Args:
            path: The path to get stat info for.

        Returns:
            Stat info.
        """
        return self.filesystem.stat(path=path)  # type: ignore[no-any-return]

    def size(self, path: PathType) -> int:
        """Get the size of a file in bytes.

        Args:
            path: The path to the file.

        Returns:
            The size of the file in bytes.
        """
        return self.filesystem.size(path=path)  # type: ignore[no-any-return]

    def walk(
        self,
        top: PathType,
        topdown: bool = True,
        onerror: Optional[Callable[..., None]] = None,
    ) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
        """Return an iterator that walks the contents of the given directory.

        Args:
            top: Path of directory to walk.
            topdown: Unused argument to conform to interface.
            onerror: Unused argument to conform to interface.

        Yields:
            An Iterable of Tuples, each of which contain the path of the current
            directory path, a list of directories inside the current directory
            and a list of files inside the current directory.
        """
        # TODO [ENG-153]: Additional params
        prefix, _ = self._split_path(top)
        for (
            directory,
            subdirectories,
            files,
        ) in self.filesystem.walk(path=top):
            yield f"{prefix}{directory}", subdirectories, files
config: AzureArtifactStoreConfig property readonly

Returns the AzureArtifactStoreConfig config.

Returns:

Type Description
AzureArtifactStoreConfig

The configuration.

filesystem: adlfs.AzureBlobFileSystem property readonly

The adlfs filesystem to access this artifact store.

Returns:

Type Description
adlfs.AzureBlobFileSystem

The adlfs filesystem to access this artifact store.

copyfile(self, src, dst, overwrite=False)

Copy a file.

Parameters:

Name Type Description Default
src Union[bytes, str]

The path to copy from.

required
dst Union[bytes, str]

The path to copy to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Exceptions:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def copyfile(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Copy a file.

    Args:
        src: The path to copy from.
        dst: The path to copy to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to copy to destination '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to copy anyway."
        )

    # TODO [ENG-151]: Check if it works with overwrite=True or if we need to
    #  manually remove it first
    self.filesystem.copy(path1=src, path2=dst)
exists(self, path)

Check whether a path exists.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to check.

required

Returns:

Type Description
bool

True if the path exists, False otherwise.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def exists(self, path: PathType) -> bool:
    """Check whether a path exists.

    Args:
        path: The path to check.

    Returns:
        True if the path exists, False otherwise.
    """
    return self.filesystem.exists(path=path)  # type: ignore[no-any-return]
get_credentials(self)

Returns the credentials for the Azure Artifact Store if configured.

Returns:

Type Description
Optional[zenml.secret.schemas.azure_secret_schema.AzureSecretSchema]

The credentials.

Exceptions:

Type Description
RuntimeError

If the connector is not configured with Azure service principal credentials.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def get_credentials(self) -> Optional[AzureSecretSchema]:
    """Returns the credentials for the Azure Artifact Store if configured.

    Returns:
        The credentials.

    Raises:
        RuntimeError: If the connector is not configured with Azure service
            principal credentials.
    """
    connector = self.get_connector()
    if connector:
        from azure.identity import ClientSecretCredential
        from azure.storage.blob import BlobServiceClient

        client = connector.connect()
        if not isinstance(client, BlobServiceClient):
            raise RuntimeError(
                f"Expected a {BlobServiceClient.__module__}."
                f"{BlobServiceClient.__name__} object while "
                f"trying to use the linked connector, but got "
                f"{type(client)}."
            )
        # Get the credentials from the client
        credentials = client.credential
        if not isinstance(credentials, ClientSecretCredential):
            raise RuntimeError(
                "The Azure Artifact Store connector can only be used "
                "with a service connector that is configured with "
                "Azure service principal credentials."
            )
        return AzureSecretSchema(
            client_id=credentials._client_id,
            client_secret=credentials._client_credential,
            tenant_id=credentials._tenant_id,
            account_name=client.account_name,
        )

    secret = self.get_typed_authentication_secret(
        expected_schema_type=AzureSecretSchema
    )
    return secret
glob(self, pattern)

Return all paths that match the given glob pattern.

The glob pattern may include: - '' to match any number of characters - '?' to match a single character - '[...]' to match one of the characters inside the brackets - '' as the full name of a path component to match to search in subdirectories of any depth (e.g. '/some_dir/*/some_file)

Parameters:

Name Type Description Default
pattern Union[bytes, str]

The glob pattern to match, see details above.

required

Returns:

Type Description
List[Union[bytes, str]]

A list of paths that match the given glob pattern.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def glob(self, pattern: PathType) -> List[PathType]:
    """Return all paths that match the given glob pattern.

    The glob pattern may include:
    - '*' to match any number of characters
    - '?' to match a single character
    - '[...]' to match one of the characters inside the brackets
    - '**' as the full name of a path component to match to search
        in subdirectories of any depth (e.g. '/some_dir/**/some_file)

    Args:
        pattern: The glob pattern to match, see details above.

    Returns:
        A list of paths that match the given glob pattern.
    """
    prefix, _ = self._split_path(pattern)
    return [
        f"{prefix}{path}" for path in self.filesystem.glob(path=pattern)
    ]
isdir(self, path)

Check whether a path is a directory.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to check.

required

Returns:

Type Description
bool

True if the path is a directory, False otherwise.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def isdir(self, path: PathType) -> bool:
    """Check whether a path is a directory.

    Args:
        path: The path to check.

    Returns:
        True if the path is a directory, False otherwise.
    """
    return self.filesystem.isdir(path=path)  # type: ignore[no-any-return]
listdir(self, path)

Return a list of files in a directory.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to list.

required

Returns:

Type Description
List[Union[bytes, str]]

A list of files in the given directory.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def listdir(self, path: PathType) -> List[PathType]:
    """Return a list of files in a directory.

    Args:
        path: The path to list.

    Returns:
        A list of files in the given directory.
    """
    _, path = self._split_path(path)

    def _extract_basename(file_dict: Dict[str, Any]) -> str:
        """Extracts the basename from a dictionary returned by the Azure filesystem.

        Args:
            file_dict: A dictionary returned by the Azure filesystem.

        Returns:
            The basename of the file.
        """
        file_path = cast(str, file_dict["name"])
        base_name = file_path[len(path) :]
        return base_name.lstrip("/")

    return [
        _extract_basename(dict_)
        for dict_ in self.filesystem.listdir(path=path)
    ]
makedirs(self, path)

Create a directory at the given path.

If needed also create missing parent directories.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to create.

required
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def makedirs(self, path: PathType) -> None:
    """Create a directory at the given path.

    If needed also create missing parent directories.

    Args:
        path: The path to create.
    """
    self.filesystem.makedirs(path=path, exist_ok=True)
mkdir(self, path)

Create a directory at the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to create.

required
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def mkdir(self, path: PathType) -> None:
    """Create a directory at the given path.

    Args:
        path: The path to create.
    """
    self.filesystem.makedir(path=path, exist_ok=True)
open(self, path, mode='r')

Open a file at the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

Path of the file to open.

required
mode str

Mode in which to open the file. Currently, only 'rb' and 'wb' to read and write binary files are supported.

'r'

Returns:

Type Description
Any

A file-like object.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def open(self, path: PathType, mode: str = "r") -> Any:
    """Open a file at the given path.

    Args:
        path: Path of the file to open.
        mode: Mode in which to open the file. Currently, only
            'rb' and 'wb' to read and write binary files are supported.

    Returns:
        A file-like object.
    """
    return self.filesystem.open(path=path, mode=mode)
remove(self, path)

Remove the file at the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to remove.

required
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def remove(self, path: PathType) -> None:
    """Remove the file at the given path.

    Args:
        path: The path to remove.
    """
    self.filesystem.rm_file(path=path)
rename(self, src, dst, overwrite=False)

Rename source file to destination file.

Parameters:

Name Type Description Default
src Union[bytes, str]

The path of the file to rename.

required
dst Union[bytes, str]

The path to rename the source file to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Exceptions:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def rename(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Rename source file to destination file.

    Args:
        src: The path of the file to rename.
        dst: The path to rename the source file to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to rename file to '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to rename anyway."
        )

    # TODO [ENG-152]: Check if it works with overwrite=True or if we need
    #  to manually remove it first
    self.filesystem.rename(path1=src, path2=dst)
rmtree(self, path)

Remove the given directory.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path of the directory to remove.

required
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def rmtree(self, path: PathType) -> None:
    """Remove the given directory.

    Args:
        path: The path of the directory to remove.
    """
    self.filesystem.delete(path=path, recursive=True)
size(self, path)

Get the size of a file in bytes.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to the file.

required

Returns:

Type Description
int

The size of the file in bytes.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def size(self, path: PathType) -> int:
    """Get the size of a file in bytes.

    Args:
        path: The path to the file.

    Returns:
        The size of the file in bytes.
    """
    return self.filesystem.size(path=path)  # type: ignore[no-any-return]
stat(self, path)

Return stat info for the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to get stat info for.

required

Returns:

Type Description
Dict[str, Any]

Stat info.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def stat(self, path: PathType) -> Dict[str, Any]:
    """Return stat info for the given path.

    Args:
        path: The path to get stat info for.

    Returns:
        Stat info.
    """
    return self.filesystem.stat(path=path)  # type: ignore[no-any-return]
walk(self, top, topdown=True, onerror=None)

Return an iterator that walks the contents of the given directory.

Parameters:

Name Type Description Default
top Union[bytes, str]

Path of directory to walk.

required
topdown bool

Unused argument to conform to interface.

True
onerror Optional[Callable[..., NoneType]]

Unused argument to conform to interface.

None

Yields:

Type Description
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]]

An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def walk(
    self,
    top: PathType,
    topdown: bool = True,
    onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
    """Return an iterator that walks the contents of the given directory.

    Args:
        top: Path of directory to walk.
        topdown: Unused argument to conform to interface.
        onerror: Unused argument to conform to interface.

    Yields:
        An Iterable of Tuples, each of which contain the path of the current
        directory path, a list of directories inside the current directory
        and a list of files inside the current directory.
    """
    # TODO [ENG-153]: Additional params
    prefix, _ = self._split_path(top)
    for (
        directory,
        subdirectories,
        files,
    ) in self.filesystem.walk(path=top):
        yield f"{prefix}{directory}", subdirectories, files

flavors special

Azure integration flavors.

azure_artifact_store_flavor

Azure artifact store flavor.

AzureArtifactStoreConfig (BaseArtifactStoreConfig, AuthenticationConfigMixin)

Configuration class for Azure Artifact Store.

Source code in zenml/integrations/azure/flavors/azure_artifact_store_flavor.py
class AzureArtifactStoreConfig(
    BaseArtifactStoreConfig, AuthenticationConfigMixin
):
    """Configuration class for Azure Artifact Store."""

    SUPPORTED_SCHEMES: ClassVar[Set[str]] = {"abfs://", "az://"}
AzureArtifactStoreFlavor (BaseArtifactStoreFlavor)

Azure Artifact Store flavor.

Source code in zenml/integrations/azure/flavors/azure_artifact_store_flavor.py
class AzureArtifactStoreFlavor(BaseArtifactStoreFlavor):
    """Azure Artifact Store flavor."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return AZURE_ARTIFACT_STORE_FLAVOR

    @property
    def service_connector_requirements(
        self,
    ) -> Optional[ServiceConnectorRequirements]:
        """Service connector resource requirements for service connectors.

        Specifies resource requirements that are used to filter the available
        service connector types that are compatible with this flavor.

        Returns:
            Requirements for compatible service connectors, if a service
            connector is required for this flavor.
        """
        return ServiceConnectorRequirements(
            connector_type=AZURE_CONNECTOR_TYPE,
            resource_type=BLOB_RESOURCE_TYPE,
            resource_id_attr="path",
        )

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/artifact_store/azure.png"

    @property
    def config_class(self) -> Type[AzureArtifactStoreConfig]:
        """Returns AzureArtifactStoreConfig config class.

        Returns:
            The config class.
        """
        return AzureArtifactStoreConfig

    @property
    def implementation_class(self) -> Type["AzureArtifactStore"]:
        """Implementation class.

        Returns:
            The implementation class.
        """
        from zenml.integrations.azure.artifact_stores import AzureArtifactStore

        return AzureArtifactStore
config_class: Type[zenml.integrations.azure.flavors.azure_artifact_store_flavor.AzureArtifactStoreConfig] property readonly

Returns AzureArtifactStoreConfig config class.

Returns:

Type Description
Type[zenml.integrations.azure.flavors.azure_artifact_store_flavor.AzureArtifactStoreConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[AzureArtifactStore] property readonly

Implementation class.

Returns:

Type Description
Type[AzureArtifactStore]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements] property readonly

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service connector is required for this flavor.

azureml_step_operator_flavor

AzureML step operator flavor.

AzureMLStepOperatorConfig (BaseStepOperatorConfig, AzureMLStepOperatorSettings)

Config for the AzureML step operator.

Attributes:

Name Type Description
subscription_id str

The Azure account's subscription ID

resource_group str

The resource group to which the AzureML workspace is deployed.

workspace_name str

The name of the AzureML Workspace.

compute_target_name str

The name of the configured ComputeTarget. An instance of it has to be created on the portal if it doesn't exist already.

tenant_id Optional[str]

The Azure Tenant ID.

service_principal_id Optional[str]

The ID for the service principal that is created to allow apps to access secure resources.

service_principal_password Optional[str]

Password for the service principal.

Source code in zenml/integrations/azure/flavors/azureml_step_operator_flavor.py
class AzureMLStepOperatorConfig(
    BaseStepOperatorConfig, AzureMLStepOperatorSettings
):
    """Config for the AzureML step operator.

    Attributes:
        subscription_id: The Azure account's subscription ID
        resource_group: The resource group to which the AzureML workspace
            is deployed.
        workspace_name: The name of the AzureML Workspace.
        compute_target_name: The name of the configured ComputeTarget.
            An instance of it has to be created on the portal if it doesn't
            exist already.
        tenant_id: The Azure Tenant ID.
        service_principal_id: The ID for the service principal that is created
            to allow apps to access secure resources.
        service_principal_password: Password for the service principal.
    """

    subscription_id: str
    resource_group: str
    workspace_name: str
    compute_target_name: str

    # Service principal authentication
    # https://docs.microsoft.com/en-us/azure/machine-learning/how-to-setup-authentication#configure-a-service-principal
    tenant_id: Optional[str] = SecretField(default=None)
    service_principal_id: Optional[str] = SecretField(default=None)
    service_principal_password: Optional[str] = SecretField(default=None)

    @property
    def is_remote(self) -> bool:
        """Checks if this stack component is running remotely.

        This designation is used to determine if the stack component can be
        used with a local ZenML database or if it requires a remote ZenML
        server.

        Returns:
            True if this config is for a remote component, False otherwise.
        """
        return True
is_remote: bool property readonly

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

AzureMLStepOperatorFlavor (BaseStepOperatorFlavor)

Flavor for the AzureML step operator.

Source code in zenml/integrations/azure/flavors/azureml_step_operator_flavor.py
class AzureMLStepOperatorFlavor(BaseStepOperatorFlavor):
    """Flavor for the AzureML step operator."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return AZUREML_STEP_OPERATOR_FLAVOR

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/step_operator/azureml.png"

    @property
    def config_class(self) -> Type[AzureMLStepOperatorConfig]:
        """Returns AzureMLStepOperatorConfig config class.

        Returns:
                The config class.
        """
        return AzureMLStepOperatorConfig

    @property
    def implementation_class(self) -> Type["AzureMLStepOperator"]:
        """Implementation class.

        Returns:
            The implementation class.
        """
        from zenml.integrations.azure.step_operators import AzureMLStepOperator

        return AzureMLStepOperator
config_class: Type[zenml.integrations.azure.flavors.azureml_step_operator_flavor.AzureMLStepOperatorConfig] property readonly

Returns AzureMLStepOperatorConfig config class.

Returns:

Type Description
Type[zenml.integrations.azure.flavors.azureml_step_operator_flavor.AzureMLStepOperatorConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[AzureMLStepOperator] property readonly

Implementation class.

Returns:

Type Description
Type[AzureMLStepOperator]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

AzureMLStepOperatorSettings (BaseSettings)

Settings for the AzureML step operator.

Attributes:

Name Type Description
environment_name Optional[str]

The name of the environment if there already exists one.

Source code in zenml/integrations/azure/flavors/azureml_step_operator_flavor.py
class AzureMLStepOperatorSettings(BaseSettings):
    """Settings for the AzureML step operator.

    Attributes:
        environment_name: The name of the environment if there
            already exists one.
    """

    environment_name: Optional[str] = None

service_connectors special

Azure Service Connector.

azure_service_connector

Azure Service Connector.

AzureAccessToken (AuthenticationConfig)

Azure access token credentials.

Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureAccessToken(AuthenticationConfig):
    """Azure access token credentials."""

    token: PlainSerializedSecretStr = Field(
        title="Azure Access Token",
        description="The Azure access token to use for authentication",
    )
AzureAccessTokenConfig (AzureBaseConfig, AzureAccessToken)

Azure token configuration.

Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureAccessTokenConfig(AzureBaseConfig, AzureAccessToken):
    """Azure token configuration."""
AzureAuthenticationMethods (StrEnum)

Azure Authentication methods.

Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureAuthenticationMethods(StrEnum):
    """Azure Authentication methods."""

    IMPLICIT = "implicit"
    SERVICE_PRINCIPAL = "service-principal"
    ACCESS_TOKEN = "access-token"
AzureBaseConfig (AuthenticationConfig)

Azure base configuration.

Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureBaseConfig(AuthenticationConfig):
    """Azure base configuration."""

    subscription_id: Optional[UUID] = Field(
        default=None,
        title="Azure Subscription ID",
        description="The subscription ID of the Azure account. If not "
        "specified, ZenML will attempt to retrieve the subscription ID from "
        "Azure using the configured credentials.",
    )
    tenant_id: Optional[UUID] = Field(
        default=None,
        title="Azure Tenant ID",
        description="The tenant ID of the Azure account. If not specified, "
        "ZenML will attempt to retrieve the tenant from Azure using the "
        "configured credentials.",
    )
    resource_group: Optional[str] = Field(
        default=None,
        title="Azure Resource Group",
        description="A resource group may be used to restrict the scope of "
        "Azure resources like AKS clusters and ACR repositories. If not "
        "specified, ZenML will retrieve resources from all resource groups "
        "accessible with the configured credentials.",
    )
    storage_account: Optional[str] = Field(
        default=None,
        title="Azure Storage Account",
        description="The name of an Azure storage account may be used to "
        "restrict the scope of Azure Blob storage containers. If not "
        "specified, ZenML will retrieve blob containers from all storage "
        "accounts accessible with the configured credentials.",
    )
AzureClientConfig (AzureBaseConfig)

Azure client configuration.

Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureClientConfig(AzureBaseConfig):
    """Azure client configuration."""

    client_id: UUID = Field(
        title="Azure Client ID",
        description="The service principal's client ID",
    )
    tenant_id: UUID = Field(
        title="Azure Tenant ID",
        description="ID of the service principal's tenant. Also called its "
        "'directory' ID.",
    )
AzureClientSecret (AuthenticationConfig)

Azure client secret credentials.

Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureClientSecret(AuthenticationConfig):
    """Azure client secret credentials."""

    client_secret: PlainSerializedSecretStr = Field(
        title="Service principal client secret",
        description="The client secret of the service principal",
    )
AzureServiceConnector (ServiceConnector)

Azure service connector.

Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureServiceConnector(ServiceConnector):
    """Azure service connector."""

    config: AzureBaseConfig

    _subscription_id: Optional[str] = None
    _subscription_name: Optional[str] = None
    _tenant_id: Optional[str] = None
    _session_cache: Dict[
        str,
        Tuple[TokenCredential, Optional[datetime.datetime]],
    ] = {}

    @classmethod
    def _get_connector_type(cls) -> ServiceConnectorTypeModel:
        """Get the service connector type specification.

        Returns:
            The service connector type specification.
        """
        return AZURE_SERVICE_CONNECTOR_TYPE_SPEC

    @property
    def subscription(self) -> Tuple[str, str]:
        """Get the Azure subscription ID and name.

        Returns:
            The Azure subscription ID and name.

        Raises:
            AuthorizationException: If the Azure subscription could not be
                determined or doesn't match the configured subscription ID.
        """
        if self._subscription_id is None or self._subscription_name is None:
            logger.debug("Getting subscription from Azure...")
            try:
                credential, _ = self.get_azure_credential(self.auth_method)
                subscription_client = SubscriptionClient(credential)
                if self.config.subscription_id is not None:
                    subscription = subscription_client.subscriptions.get(
                        str(self.config.subscription_id)
                    )
                    self._subscription_name = subscription.display_name
                    self._subscription_id = subscription.subscription_id
                else:
                    subscriptions = list(
                        subscription_client.subscriptions.list()
                    )
                    if len(subscriptions) == 0:
                        raise AuthorizationException(
                            "no Azure subscriptions found for the configured "
                            "credentials."
                        )
                    if len(subscriptions) > 1:
                        raise AuthorizationException(
                            "multiple Azure subscriptions found for the "
                            "configured credentials. Please configure the "
                            "subscription ID explicitly in the connector."
                        )

                    subscription = subscriptions[0]
                    self._subscription_id = subscription.subscription_id
                    self._subscription_name = subscription.display_name
            except AzureError as e:
                raise AuthorizationException(
                    f"failed to fetch the Azure subscription: {e}"
                ) from e

        assert self._subscription_id is not None
        assert self._subscription_name is not None
        return self._subscription_id, self._subscription_name

    @property
    def tenant_id(self) -> str:
        """Get the Azure tenant ID.

        Returns:
            The Azure tenant ID.

        Raises:
            AuthorizationException: If the Azure tenant ID could not be
                determined or doesn't match the configured tenant ID.
        """
        if self._tenant_id is None:
            logger.debug("Getting tenant ID from Azure...")
            try:
                credential, _ = self.get_azure_credential(self.auth_method)
                subscription_client = SubscriptionClient(credential)
                tenants = subscription_client.tenants.list()

                if self.config.tenant_id is not None:
                    for tenant in tenants:
                        if str(tenant.tenant_id) == str(self.config.tenant_id):
                            self._tenant_id = str(tenant.tenant_id)
                            break
                    else:
                        raise AuthorizationException(
                            "the configured tenant ID is not associated with "
                            "the configured credentials."
                        )
                else:
                    tenants = list(tenants)
                    if len(tenants) == 0:
                        raise AuthorizationException(
                            "no Azure tenants found for the configured "
                            "credentials."
                        )
                    if len(tenants) > 1:
                        raise AuthorizationException(
                            "multiple Azure tenants found for the "
                            "configured credentials. Please configure the "
                            "tenant ID explicitly in the connector."
                        )

                    tenant = tenants[0]
                    self._tenant_id = tenant.tenant_id
            except AzureError as e:
                raise AuthorizationException(
                    f"failed to fetch the Azure tenant: {e}"
                ) from e
        assert self._tenant_id is not None
        return self._tenant_id

    def get_azure_credential(
        self,
        auth_method: str,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
    ) -> Tuple[TokenCredential, Optional[datetime.datetime]]:
        """Get an Azure credential for the specified resource.

        Args:
            auth_method: The authentication method to use.
            resource_type: The resource type to get a credential for.
            resource_id: The resource ID to get a credential for.

        Returns:
            An Azure credential for the specified resource and its expiration
            timestamp, if applicable.
        """
        # We maintain a cache of all sessions to avoid re-authenticating
        # multiple times for the same resource
        key = auth_method
        if key in self._session_cache:
            session, expires_at = self._session_cache[key]
            if expires_at is None:
                return session, None

            # Refresh expired sessions
            now = datetime.datetime.now(datetime.timezone.utc)
            expires_at = expires_at.replace(tzinfo=datetime.timezone.utc)

            # check if the token expires in the near future
            if expires_at > now + datetime.timedelta(
                minutes=AZURE_SESSION_EXPIRATION_BUFFER
            ):
                return session, expires_at

        logger.debug(
            f"Creating Azure credential for auth method '{auth_method}', "
            f"resource type '{resource_type}' and resource ID "
            f"'{resource_id}'..."
        )
        session, expires_at = self._authenticate(
            auth_method, resource_type, resource_id
        )
        self._session_cache[key] = (session, expires_at)
        return session, expires_at

    def _authenticate(
        self,
        auth_method: str,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
    ) -> Tuple[TokenCredential, Optional[datetime.datetime]]:
        """Authenticate to Azure and return a token credential.

        Args:
            auth_method: The authentication method to use.
            resource_type: The resource type to authenticate for.
            resource_id: The resource ID to authenticate for.

        Returns:
            An Azure token credential and the expiration time of the
            temporary credentials if applicable.

        Raises:
            NotImplementedError: If the authentication method is not supported.
            AuthorizationException: If the authentication failed.
        """
        cfg = self.config
        credential: TokenCredential
        if auth_method == AzureAuthenticationMethods.IMPLICIT:
            self._check_implicit_auth_method_allowed()

            try:
                credential = DefaultAzureCredential()
            except AzureError as e:
                raise AuthorizationException(
                    f"Failed to authenticate to Azure using implicit "
                    f"authentication. Please check your local Azure CLI "
                    f"configuration or managed identity setup: {e}"
                )

            return credential, None

        if auth_method == AzureAuthenticationMethods.SERVICE_PRINCIPAL:
            assert isinstance(cfg, AzureServicePrincipalConfig)
            try:
                credential = ClientSecretCredential(
                    tenant_id=str(cfg.tenant_id),
                    client_id=str(cfg.client_id),
                    client_secret=cfg.client_secret.get_secret_value(),
                )
            except AzureError as e:
                raise AuthorizationException(
                    f"Failed to authenticate to Azure using the provided "
                    f"service principal credentials. Please check your Azure "
                    f"configuration: {e}"
                )

            return credential, None

        if auth_method == AzureAuthenticationMethods.ACCESS_TOKEN:
            assert isinstance(cfg, AzureAccessTokenConfig)

            expires_at = self.expires_at
            assert expires_at is not None

            credential = ZenMLAzureTokenCredential(
                token=cfg.token.get_secret_value(),
                expires_at=expires_at,
            )

            return credential, expires_at

        raise NotImplementedError(
            f"Authentication method '{auth_method}' is not supported by "
            "the Azure connector."
        )

    def _parse_blob_container_resource_id(self, resource_id: str) -> str:
        """Validate and convert an Azure blob resource ID to a container name.

        The resource ID could mean different things:

        - Azure blob container URI: `{az|abfs}://{container-name}`
        - Azure blob container name: `{container-name}`

        This method extracts the container name from the provided resource ID.

        Args:
            resource_id: The resource ID to convert.

        Returns:
            The container name.

        Raises:
            ValueError: If the provided resource ID is not a valid Azure blob
                resource ID.
        """
        container_name: Optional[str] = None
        if re.match(
            r"^(az|abfs)://[a-z0-9](?!.*--)[a-z0-9-]{1,61}[a-z0-9](/.*)*$",
            resource_id,
        ):
            # The resource ID is an Azure blob container URI
            container_name = resource_id.split("/")[2]
        elif re.match(
            r"^[a-z0-9](?!.*--)[a-z0-9-]{1,61}[a-z0-9]$",
            resource_id,
        ):
            # The resource ID is the Azure blob container name
            container_name = resource_id
        else:
            raise ValueError(
                f"Invalid resource ID for an Azure blob storage container: "
                f"{resource_id}. "
                "Supported formats are:\n"
                "Azure blob container URI: {az|abfs}://<container-name>\n"
                "Azure blob container name: <container-name>"
            )

        return container_name

    def _parse_acr_resource_id(
        self,
        resource_id: str,
    ) -> str:
        """Validate and convert an ACR resource ID to an ACR registry name.

        The resource ID could mean different things:

        - ACR registry URI: `[https://]{registry-name}.azurecr.io`
        - ACR registry name: `{registry-name}`

        This method extracts the registry name from the provided resource ID.

        Args:
            resource_id: The resource ID to convert.

        Returns:
            The ACR registry name.

        Raises:
            ValueError: If the provided resource ID is not a valid ACR
                resource ID.
        """
        registry_name: Optional[str] = None
        if re.match(
            r"^(https?://)?[a-zA-Z0-9]+\.azurecr\.io(/.+)*$",
            resource_id,
        ):
            # The resource ID is an ACR registry URI
            registry_name = resource_id.split(".")[0].split("/")[-1]
        elif re.match(
            r"^[a-zA-Z0-9]+$",
            resource_id,
        ):
            # The resource ID is an ACR registry name
            registry_name = resource_id
        else:
            raise ValueError(
                f"Invalid resource ID for a ACR registry: {resource_id}. "
                f"Supported formats are:\n"
                "ACR registry URI: [https://]{registry-name}.azurecr.io\n"
                "ACR registry name: {registry-name}"
            )

        return registry_name

    def _parse_aks_resource_id(
        self, resource_id: str
    ) -> Tuple[Optional[str], str]:
        """Validate and convert an AKS resource ID to an AKS cluster name.

        The resource ID could mean different things:

        - resource group scoped AKS cluster name (canonical): `{resource-group}/{cluster-name}`
        - AKS cluster name: `{cluster-name}`

        This method extracts the resource group name and cluster name from the
        provided resource ID.

        Args:
            resource_id: The resource ID to convert.

        Returns:
            The Azure resource group and AKS cluster name.

        Raises:
            ValueError: If the provided resource ID is not a valid AKS cluster
                name.
        """
        resource_group: Optional[str] = self.config.resource_group
        if re.match(
            r"^[a-zA-Z0-9_.()-]+/[a-zA-Z0-9]+[a-zA-Z0-9_-]*[a-zA-Z0-9]+$",
            resource_id,
        ):
            # The resource ID is an AKS cluster name including the resource
            # group
            resource_group, cluster_name = resource_id.split("/")
        elif re.match(
            r"^[a-zA-Z0-9]+[a-zA-Z0-9_-]*[a-zA-Z0-9]+$",
            resource_id,
        ):
            # The resource ID is an AKS cluster name without the resource group
            cluster_name = resource_id
        else:
            raise ValueError(
                f"Invalid resource ID for a AKS cluster: {resource_id}. "
                f"Supported formats are:\n"
                "resource group scoped AKS cluster name (canonical): {resource-group}/{cluster-name}\n"
                "AKS cluster name: {cluster-name}"
            )

        if (
            self.config.resource_group
            and self.config.resource_group != resource_group
        ):
            raise ValueError(
                f"Invalid resource ID for an AKS cluster: {resource_id}. "
                f"The resource group '{resource_group}' does not match the "
                f"resource group configured in the connector: "
                f"'{self.config.resource_group}'."
            )

        return resource_group, cluster_name

    def _canonical_resource_id(
        self, resource_type: str, resource_id: str
    ) -> str:
        """Convert a resource ID to its canonical form.

        Args:
            resource_type: The resource type to canonicalize.
            resource_id: The resource ID to canonicalize.

        Returns:
            The canonical resource ID.
        """
        if resource_type == BLOB_RESOURCE_TYPE:
            container_name = self._parse_blob_container_resource_id(
                resource_id
            )
            return f"az://{container_name}"
        elif resource_type == KUBERNETES_CLUSTER_RESOURCE_TYPE:
            resource_group, cluster_name = self._parse_aks_resource_id(
                resource_id
            )
            if resource_group:
                return f"{resource_group}/{cluster_name}"
            return cluster_name
        elif resource_type == DOCKER_REGISTRY_RESOURCE_TYPE:
            registry_name = self._parse_acr_resource_id(
                resource_id,
            )
            return f"{registry_name}.azurecr.io"
        else:
            return resource_id

    def _get_default_resource_id(self, resource_type: str) -> str:
        """Get the default resource ID for a resource type.

        Args:
            resource_type: The type of the resource to get a default resource ID
                for. Only called with resource types that do not support
                multiple instances.

        Returns:
            The default resource ID for the resource type.

        Raises:
            RuntimeError: If the ECR registry ID (Azure account ID)
                cannot be retrieved from Azure because the connector is not
                authorized.
        """
        if resource_type == AZURE_RESOURCE_TYPE:
            _, subscription_name = self.subscription
            return subscription_name

        raise RuntimeError(
            f"Default resource ID for '{resource_type}' not available."
        )

    def _connect_to_resource(
        self,
        **kwargs: Any,
    ) -> Any:
        """Authenticate and connect to an Azure resource.

        Initialize and return a session or client object depending on the
        connector configuration:

        - initialize and return an Azure TokenCredential if the resource type
        is a generic Azure resource
        - initialize and return an Azure BlobServiceClient for an Azure blob
        storage container resource type

        For the Docker and Kubernetes resource types, the connector does not
        support connecting to the resource directly. Instead, the connector
        supports generating a connector client object for the resource type
        in question.

        Args:
            kwargs: Additional implementation specific keyword arguments to pass
                to the session or client constructor.

        Returns:
            A TokenCredential for Azure generic resources and a
            BlobServiceClient client for an Azure blob storage container.

        Raises:
            NotImplementedError: If the connector instance does not support
                directly connecting to the indicated resource type.
        """
        resource_type = self.resource_type
        resource_id = self.resource_id

        assert resource_type is not None
        assert resource_id is not None

        # Regardless of the resource type, we must authenticate to Azure first
        # before we can connect to any Azure resource
        credential, _ = self.get_azure_credential(
            self.auth_method,
            resource_type=resource_type,
            resource_id=resource_id,
        )

        if resource_type == BLOB_RESOURCE_TYPE:
            container_name = self._parse_blob_container_resource_id(
                resource_id
            )

            containers = self._list_blob_containers(
                credential,
                container_name=container_name,
            )

            container_name, storage_account = containers.popitem()
            account_url = f"https://{storage_account}.blob.core.windows.net/"

            blob_client = BlobServiceClient(
                account_url=account_url,
                credential=credential,
            )

            return blob_client

        if resource_type == AZURE_RESOURCE_TYPE:
            return credential

        raise NotImplementedError(
            f"Connecting to {resource_type} resources is not directly "
            "supported by the Azure connector. Please call the "
            f"`get_connector_client` method to get a {resource_type} connector "
            "instance for the resource."
        )

    def _configure_local_client(
        self,
        **kwargs: Any,
    ) -> None:
        """Configure a local client to authenticate and connect to a resource.

        This method uses the connector's configuration to configure a local
        client or SDK installed on the localhost for the indicated resource.

        Args:
            kwargs: Additional implementation specific keyword arguments to use
                to configure the client.

        Raises:
            NotImplementedError: If the connector instance does not support
                local configuration for the configured resource type or
                authentication method.registry
            AuthorizationException: If the connector instance does not support
                local configuration for the configured authentication method.
        """
        resource_type = self.resource_type

        if resource_type in [AZURE_RESOURCE_TYPE, BLOB_RESOURCE_TYPE]:
            if (
                self.auth_method
                == AzureAuthenticationMethods.SERVICE_PRINCIPAL
            ):
                # Use the service principal credentials to configure the local
                # Azure CLI
                assert isinstance(self.config, AzureServicePrincipalConfig)

                command = [
                    "az",
                    "login",
                    "--service-principal",
                    "-u",
                    str(self.config.client_id),
                    "-p",
                    self.config.client_secret.get_secret_value(),
                    "--tenant",
                    str(self.config.tenant_id),
                ]

                try:
                    subprocess.run(command, check=True)
                except subprocess.CalledProcessError as e:
                    raise AuthorizationException(
                        f"Failed to update the local Azure CLI with the "
                        f"connector service principal credentials: {e}"
                    ) from e

                logger.info(
                    "Updated the local Azure CLI configuration with the "
                    "connector's service principal credentials."
                )

                return

            raise NotImplementedError(
                f"Local Azure client configuration for resource type "
                f"{resource_type} is only supported if the "
                f"'{AzureAuthenticationMethods.SERVICE_PRINCIPAL}' "
                f"authentication method is used."
            )

        raise NotImplementedError(
            f"Configuring the local client for {resource_type} resources is "
            "not directly supported by the Azure connector. Please call the "
            f"`get_connector_client` method to get a {resource_type} connector "
            "instance for the resource."
        )

    @classmethod
    def _auto_configure(
        cls,
        auth_method: Optional[str] = None,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
        resource_group: Optional[str] = None,
        storage_account: Optional[str] = None,
        **kwargs: Any,
    ) -> "AzureServiceConnector":
        """Auto-configure the connector.

        Instantiate an Azure connector with a configuration extracted from the
        authentication configuration available in the environment (e.g.
        environment variables or local Azure client/SDK configuration files).

        Args:
            auth_method: The particular authentication method to use. If not
                specified, the connector implementation must decide which
                authentication method to use or raise an exception.
            resource_type: The type of resource to configure.
            resource_id: The ID of the resource to configure. The implementation
                may choose to either require or ignore this parameter if it does
                not support or detect an resource type that supports multiple
                instances.
            resource_group: A resource group may be used to restrict the scope
                of Azure resources like AKS clusters and ACR repositories. If
                not specified, ZenML will retrieve resources from all resource
                groups accessible with the discovered credentials.
            storage_account: The name of an Azure storage account may be used to
                restrict the scope of Azure Blob storage containers. If not
                specified, ZenML will retrieve blob containers from all storage
                accounts accessible with the discovered credentials.
            kwargs: Additional implementation specific keyword arguments to use.

        Returns:
            An Azure connector instance configured with authentication
            credentials automatically extracted from the environment.

        Raises:
            NotImplementedError: If the connector implementation does not
                support auto-configuration for the specified authentication
                method.
            AuthorizationException: If no Azure credentials can be loaded from
                the environment.
        """
        auth_config: AzureBaseConfig
        expiration_seconds: Optional[int] = None
        expires_at: Optional[datetime.datetime] = None
        if auth_method == AzureAuthenticationMethods.IMPLICIT:
            auth_config = AzureBaseConfig(
                resource_group=resource_group,
                storage_account=storage_account,
            )
        else:
            # Initialize an Azure credential with the default configuration
            # loaded from the environment.
            try:
                credential = DefaultAzureCredential()
            except AzureError as e:
                raise AuthorizationException(
                    f"Failed to authenticate to Azure using implicit "
                    f"authentication. Please check your local Azure CLI "
                    f"configuration: {e}"
                )

            if (
                auth_method
                and auth_method != AzureAuthenticationMethods.ACCESS_TOKEN
            ):
                raise NotImplementedError(
                    f"The specified authentication method '{auth_method}' "
                    "could not be used to auto-configure the connector. "
                )
            auth_method = AzureAuthenticationMethods.ACCESS_TOKEN

            if resource_type == BLOB_RESOURCE_TYPE:
                raise AuthorizationException(
                    f"Auto-configuration for {resource_type} resources is not "
                    "supported by the Azure connector."
                )
            else:
                token = credential.get_token(
                    AZURE_MANAGEMENT_TOKEN_SCOPE,
                )

            # Convert the expiration timestamp from Unix time to datetime
            # format.
            expires_at = datetime.datetime.fromtimestamp(token.expires_on)
            # Convert the expiration timestamp from local time to UTC time.
            expires_at = expires_at.astimezone(datetime.timezone.utc)

            auth_config = AzureAccessTokenConfig(
                token=token.token,
                resource_group=resource_group,
                storage_account=storage_account,
            )

        return cls(
            auth_method=auth_method,
            resource_type=resource_type,
            resource_id=resource_id
            if resource_type not in [AZURE_RESOURCE_TYPE, None]
            else None,
            expiration_seconds=expiration_seconds,
            expires_at=expires_at,
            config=auth_config,
        )

    @classmethod
    def _get_resource_group(cls, resource_id: str) -> str:
        """Get the resource group of an Azure resource.

        Args:
            resource_id: The ID of the Azure resource.

        Returns:
            The resource group of the Azure resource.
        """
        # The resource group is the fourth component of the resource ID.
        return resource_id.split("/")[4]

    def _list_blob_containers(
        self, credential: TokenCredential, container_name: Optional[str] = None
    ) -> Dict[str, str]:
        """Get the list of blob storage containers that the connector can access.

        Args:
            credential: The Azure credential to use to access the blob storage
                containers.
            container_name: The name of the blob container to get. If omitted,
                all accessible blob containers are returned.

        Returns:
            The list of blob storage containers that the connector can access
            as a dictionary mapping container names to their corresponding
            storage account names. If the `container_name` argument was
            specified, the dictionary will contain a single entry.

        Raises:
            AuthorizationException: If the connector does not have access to
                access the target blob storage containers.
        """
        subscription_id, _ = self.subscription
        # Azure blob storage containers are scoped to a storage account. We
        # need to figure out which storage accounts the connector can access
        # and then list the containers in each of them. If a container name
        # is provided, we only need to find the storage account that contains
        # it.

        storage_accounts: List[str] = []
        if self.config.storage_account:
            storage_accounts = [self.config.storage_account]
        else:
            try:
                storage_client = StorageManagementClient(
                    credential, subscription_id
                )
                accounts = list(storage_client.storage_accounts.list())
            except AzureError as e:
                raise AuthorizationException(
                    f"failed to list available Azure storage accounts. Please "
                    f"check that the credentials have permissions to list "
                    f"storage accounts or consider configuring a storage "
                    f"account in the connector: {e}"
                ) from e

            if not accounts:
                raise AuthorizationException(
                    "no storage accounts were found. Please check that the "
                    "credentials have permissions to access one or more "
                    "storage accounts."
                )

            if self.config.resource_group:
                # Keep only the storage accounts in the specified resource
                # group.
                accounts = [
                    account
                    for account in accounts
                    if self._get_resource_group(account.id)
                    == self.config.resource_group
                ]
                if not accounts:
                    raise AuthorizationException(
                        f"no storage accounts were found in the "
                        f"'{self.config.resource_group}' resource group "
                        f"specified in the connector configuration. Please "
                        f"check that resource group contains one or more "
                        f"storage accounts and that the connector credentials "
                        f"have permissions to access them."
                    )

            storage_accounts = [
                account.name for account in accounts if account.name
            ]

        containers: Dict[str, str] = {}
        for storage_account in storage_accounts:
            account_url = f"https://{storage_account}.blob.core.windows.net/"

            try:
                blob_client = BlobServiceClient(
                    account_url, credential=credential
                )
                response = blob_client.list_containers()
                account_containers = [
                    container.name for container in response if container.name
                ]
            except AzureError as e:
                raise AuthorizationException(
                    f"failed to fetch Azure blob storage containers in the "
                    f"'{storage_account}' storage account. Please check that "
                    f"the credentials have permissions to list containers in "
                    f"that storage account: {e}"
                ) from e

            if container_name:
                if container_name not in account_containers:
                    continue
                containers = {container_name: storage_account}
                break

            containers.update(
                {
                    container: storage_account
                    for container in account_containers
                }
            )

        if not containers:
            if container_name:
                if self.config.storage_account:
                    raise AuthorizationException(
                        f"the '{container_name}' container was not found in "
                        f"the '{self.config.storage_account}' storage "
                        f"account. Please check that container exists and the "
                        f"credentials have permissions to access that "
                        f"container."
                    )

                raise AuthorizationException(
                    f"the '{container_name}' container was not found in any "
                    f"of the storage accounts accessible to the connector. "
                    f"Please check that container exists and the credentials "
                    f"have permissions to access the storage account it "
                    f"belongs to."
                )

            raise AuthorizationException(
                "no Azure blob storage containers were found in any of the "
                "storage accounts accessible to the connector. Please check "
                "that the credentials have permissions to access one or more "
                "storage accounts."
            )

        return containers

    def _list_acr_registries(
        self, credential: TokenCredential, registry_name: Optional[str] = None
    ) -> Dict[str, str]:
        """Get the list of ACR registries that the connector can access.

        Args:
            credential: The Azure credential to use.
            registry_name: The name of the registry to get. If omitted,
                all accessible registries are returned.

        Returns:
            The list of ACR registries that the connector can access as a
            dictionary mapping registry names to their corresponding
            resource group names. If the `registry_name` argument was
            specified, the dictionary will contain a single entry.

        Raises:
            AuthorizationException: If the connector does not have access to
                access the target ACR registries.
        """
        subscription_id, _ = self.subscription

        container_registries: Dict[str, str] = {}
        if registry_name and self.config.resource_group:
            try:
                container_client = ContainerRegistryManagementClient(
                    credential, subscription_id
                )
                registry = container_client.registries.get(
                    resource_group_name=self.config.resource_group,
                    registry_name=registry_name,
                )
                if registry.name:
                    container_registries = {
                        registry.name: self.config.resource_group
                    }
            except AzureError as e:
                raise AuthorizationException(
                    f"failed to fetch the Azure container registry "
                    f"'{registry_name}' in the '{self.config.resource_group}' "
                    f"resource group specified in the connector configuration. "
                    f"Please check that the registry exists in that resource "
                    f"group and that the connector credentials have "
                    f"permissions to access that registry: {e}"
                ) from e
        else:
            try:
                container_client = ContainerRegistryManagementClient(
                    credential, subscription_id
                )
                registries = list(container_client.registries.list())
            except AzureError as e:
                raise AuthorizationException(
                    "failed to list available Azure container registries. "
                    "Please check that the credentials have permissions to "
                    f"list container registries: {e}"
                ) from e

            if not registries:
                raise AuthorizationException(
                    "no container registries were found. Please check that the "
                    "credentials have permissions to access one or more "
                    "container registries."
                )

            if self.config.resource_group:
                # Keep only the registries in the specified resource
                # group.
                registries = [
                    registry
                    for registry in registries
                    if self._get_resource_group(registry.id)
                    == self.config.resource_group
                ]

                if not registries:
                    raise AuthorizationException(
                        f"no container registries were found in the "
                        f"'{self.config.resource_group}' resource group "
                        f"specified in the connector configuration. Please "
                        f"check that resource group contains one or more "
                        f"container registries and that the connector "
                        f"credentials have permissions to access them."
                    )

            container_registries = {
                registry.name: self._get_resource_group(registry.id)
                for registry in registries
                if registry.name
            }

            if registry_name:
                if registry_name not in container_registries:
                    if self.config.resource_group:
                        raise AuthorizationException(
                            f"the '{registry_name}' registry was not found in "
                            f"the '{self.config.resource_group}' resource "
                            f"group specified in the connector configuration. "
                            f"Please check that registry exists in that "
                            f"resource group and that the connector "
                            f"credentials have permissions to access that "
                            f"registry."
                        )

                    raise AuthorizationException(
                        f"the '{registry_name}' registry was not found or is "
                        "not accessible. Please check that registry exists and "
                        "the credentials have permissions to access it."
                    )

                container_registries = {
                    registry_name: container_registries[registry_name]
                }

        return container_registries

    def _list_aks_clusters(
        self,
        credential: TokenCredential,
        cluster_name: Optional[str] = None,
        resource_group: Optional[str] = None,
    ) -> List[Tuple[str, str]]:
        """Get the list of AKS clusters that the connector can access.

        Args:
            credential: The Azure credential to use.
            cluster_name: The name of the cluster to get. If omitted,
                all accessible clusters are returned.
            resource_group: The name of the resource group to which the
                search should be limited. If omitted, all accessible
                clusters are returned.

        Returns:
            The list of AKS clusters that the connector can access as a
            list of cluster names and their corresponding resource group names.
            If the `cluster_name` argument was specified, the dictionary will
            contain a single entry.

        Raises:
            AuthorizationException: If the connector does not have access to
                access the target AKS clusters.
        """
        subscription_id, _ = self.subscription

        clusters: List[Tuple[str, str]] = []
        if cluster_name and resource_group:
            try:
                container_client = ContainerServiceClient(
                    credential, subscription_id
                )
                cluster = container_client.managed_clusters.get(
                    resource_group_name=resource_group,
                    resource_name=cluster_name,
                )
                if cluster.name:
                    clusters = [(cluster.name, resource_group)]
            except AzureError as e:
                raise AuthorizationException(
                    f"failed to fetch the Azure AKS cluster '{cluster_name}' "
                    f"in the '{resource_group}' resource group. Please check "
                    f"that the cluster exists in that resource group and that "
                    f"the connector credentials have permissions to access "
                    f"that cluster: {e}"
                ) from e
        else:
            try:
                container_client = ContainerServiceClient(
                    credential, subscription_id
                )
                aks_clusters = list(container_client.managed_clusters.list())
            except AzureError as e:
                raise AuthorizationException(
                    f"failed to list available Azure AKS clusters. Please "
                    f"check that the credentials have permissions to list "
                    f"AKS clusters: {e}"
                ) from e

            if not aks_clusters:
                raise AuthorizationException(
                    "no AKS clusters were found. Please check that the "
                    "credentials have permissions to access one or more "
                    "AKS clusters."
                )

            if self.config.resource_group:
                # Keep only the clusters in the specified resource
                # group.
                aks_clusters = [
                    cluster
                    for cluster in aks_clusters
                    if self._get_resource_group(cluster.id)
                    == self.config.resource_group
                ]

                if not aks_clusters:
                    raise AuthorizationException(
                        f"no AKS clusters were found in the "
                        f"'{self.config.resource_group}' resource group "
                        f"specified in the connector configuration. Please "
                        f"check that resource group contains one or more "
                        f"AKS clusters and that the connector credentials "
                        f"have permissions to access them."
                    )

            clusters = [
                (cluster.name, self._get_resource_group(cluster.id))
                for cluster in aks_clusters
                if cluster.name
                and (not cluster_name or cluster.name == cluster_name)
            ]

            if cluster_name:
                if not clusters:
                    if self.config.resource_group:
                        raise AuthorizationException(
                            f"the '{cluster_name}' AKS cluster was not found "
                            f"in the '{self.config.resource_group}' resource "
                            f"group specified in the connector configuration. "
                            f"Please check that cluster exists in that "
                            f"resource group and that the connector "
                            f"credentials have permissions to access that "
                            f"cluster."
                        )

                    raise AuthorizationException(
                        f"the '{cluster_name}' AKS cluster was not found or "
                        "is not accessible. Please check that cluster exists "
                        "and the credentials have permissions to access it."
                    )

                if len(clusters) > 1:
                    resource_groups = [cluster[1] for cluster in clusters]
                    raise AuthorizationException(
                        f"the '{cluster_name}' AKS cluster was found in "
                        f"multiple resource groups: "
                        f"{', '.join(resource_groups)}. "
                        f"Please specify a resource group in the connector "
                        f"configuration or include the resource group in the "
                        "resource name to disambiguate."
                    )

        return clusters

    def _verify(
        self,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
    ) -> List[str]:
        """Verify and list all the resources that the connector can access.

        Args:
            resource_type: The type of the resource to verify. If omitted and
                if the connector supports multiple resource types, the
                implementation must verify that it can authenticate and connect
                to any and all of the supported resource types.
            resource_id: The ID of the resource to connect to. Omitted if a
                resource type is not specified. It has the same value as the
                default resource ID if the supplied resource type doesn't
                support multiple instances. If the supplied resource type does
                allows multiple instances, this parameter may still be omitted
                to fetch a list of resource IDs identifying all the resources
                of the indicated type that the connector can access.

        Returns:
            The list of resources IDs in canonical format identifying the
            resources that the connector can access. This list is empty only
            if the resource type is not specified (i.e. for multi-type
            connectors).

        Raises:
            AuthorizationException: If the connector cannot authenticate or
                access the specified resource.
        """
        # If the resource type is not specified, treat this the
        # same as a generic Azure connector.
        if not resource_type:
            # fetch the Azure subscription and tenant IDs to verify the
            # credentials globally
            subscription_id, subscription_name = self.subscription
            return []

        if resource_type == AZURE_RESOURCE_TYPE:
            assert resource_id is not None
            return [resource_id]

        credential, _ = self.get_azure_credential(
            self.auth_method,
            resource_type=resource_type or AZURE_RESOURCE_TYPE,
            resource_id=resource_id,
        )

        if resource_type == BLOB_RESOURCE_TYPE:
            if self.auth_method == AzureAuthenticationMethods.ACCESS_TOKEN:
                raise AuthorizationException(
                    f"the '{self.auth_method}' authentication method is not "
                    "supported for blob storage resources"
                )

            container_name: Optional[str] = None
            if resource_id:
                container_name = self._parse_blob_container_resource_id(
                    resource_id
                )

            containers = self._list_blob_containers(
                credential,
                container_name=container_name,
            )

            return [f"az://{container}" for container in containers.keys()]

        if resource_type == DOCKER_REGISTRY_RESOURCE_TYPE:
            registry_name: Optional[str] = None
            if resource_id:
                registry_name = self._parse_acr_resource_id(resource_id)

            registries = self._list_acr_registries(
                credential,
                registry_name=registry_name,
            )

            return [f"{registry}.azurecr.io" for registry in registries.keys()]

        if resource_type == KUBERNETES_CLUSTER_RESOURCE_TYPE:
            cluster_name: Optional[str] = None
            resource_group = self.config.resource_group
            if resource_id:
                resource_group, cluster_name = self._parse_aks_resource_id(
                    resource_id
                )

            clusters = self._list_aks_clusters(
                credential,
                cluster_name=cluster_name,
                resource_group=resource_group,
            )

            return [
                f"{resource_group}/{cluster_name}"
                for cluster_name, resource_group in clusters
            ]

        return []

    def _get_connector_client(
        self,
        resource_type: str,
        resource_id: str,
    ) -> "ServiceConnector":
        """Get a connector instance that can be used to connect to a resource.

        This method generates a client-side connector instance that can be used
        to connect to a resource of the given type. The client-side connector
        is configured with temporary Azure credentials extracted from the
        current connector and, depending on resource type, it may also be
        of a different connector type:

        - a Kubernetes connector for Kubernetes clusters
        - a Docker connector for Docker registries

        Args:
            resource_type: The type of the resources to connect to.
            resource_id: The ID of a particular resource to connect to.

        Returns:
            An Azure, Kubernetes or Docker connector instance that can be used to
            connect to the specified resource.

        Raises:
            AuthorizationException: If authentication failed.
            ValueError: If the resource type is not supported.
            RuntimeError: If the Kubernetes connector is not installed and the
                resource type is Kubernetes.
        """
        connector_name = ""
        if self.name:
            connector_name = self.name
        if resource_id:
            connector_name += f" ({resource_type} | {resource_id} client)"
        else:
            connector_name += f" ({resource_type} client)"

        logger.debug(f"Getting connector client for {connector_name}")

        if resource_type in [AZURE_RESOURCE_TYPE, BLOB_RESOURCE_TYPE]:
            auth_method = self.auth_method
            if (
                self.resource_type == resource_type
                and self.resource_id == resource_id
            ):
                # If the requested type and resource ID are the same as
                # those configured, we can return the current connector
                # instance because it's fully formed and ready to use
                # to connect to the specified resource
                return self

            config = self.config
            expires_at = self.expires_at

            # Create a client-side Azure connector instance that is fully formed
            # and ready to use to connect to the specified resource (i.e. has
            # all the necessary configuration and credentials, a resource type
            # and a resource ID where applicable)
            return AzureServiceConnector(
                id=self.id,
                name=connector_name,
                auth_method=auth_method,
                resource_type=resource_type,
                resource_id=resource_id,
                config=config,
                expires_at=expires_at,
            )

        subscription_id, _ = self.subscription
        credential, expires_at = self.get_azure_credential(
            self.auth_method,
            resource_type=resource_type,
            resource_id=resource_id,
        )
        resource_group: Optional[str] = None

        if resource_type == DOCKER_REGISTRY_RESOURCE_TYPE:
            registry_name = self._parse_acr_resource_id(resource_id)

            # If a service principal is used for authentication, the client ID
            # and client secret can be used to authenticate to the registry, if
            # configured.
            # https://learn.microsoft.com/en-us/azure/container-registry/container-registry-auth-service-principal#authenticate-with-the-service-principal
            if (
                self.auth_method
                == AzureAuthenticationMethods.SERVICE_PRINCIPAL
            ):
                assert isinstance(self.config, AzureServicePrincipalConfig)
                username = str(self.config.client_id)
                password = self.config.client_secret

            # Without a service principal, this only works if the admin account
            # is enabled for the registry, but this is not recommended and
            # disabled by default.
            # https://docs.microsoft.com/en-us/azure/container-registry/container-registry-authentication#admin-account

            else:
                registries = self._list_acr_registries(
                    credential,
                    registry_name=registry_name,
                )
                registry_name, resource_group = registries.popitem()

                try:
                    client = ContainerRegistryManagementClient(
                        credential, subscription_id
                    )

                    registry_credentials = client.registries.list_credentials(
                        resource_group, registry_name
                    )
                    username = registry_credentials.username
                    password = registry_credentials.passwords[0].value
                except AzureError as e:
                    raise AuthorizationException(
                        f"failed to list admin credentials for Azure Container "
                        f"Registry '{registry_name}' in resource group "
                        f"'{resource_group}'. Make sure the registry is "
                        f"configured with an admin account: {e}"
                    ) from e

            # Create a client-side Docker connector instance with the temporary
            # Docker credentials
            return DockerServiceConnector(
                id=self.id,
                name=connector_name,
                auth_method=DockerAuthenticationMethods.PASSWORD,
                resource_type=resource_type,
                config=DockerConfiguration(
                    username=username,
                    password=password,
                    registry=f"{registry_name}.azurecr.io",
                ),
                expires_at=expires_at,
            )

        if resource_type == KUBERNETES_CLUSTER_RESOURCE_TYPE:
            resource_group, cluster_name = self._parse_aks_resource_id(
                resource_id
            )
            clusters = self._list_aks_clusters(
                credential,
                cluster_name=cluster_name,
                resource_group=resource_group,
            )
            cluster_name, resource_group = clusters[0]

            try:
                client = ContainerServiceClient(credential, subscription_id)

                creds = client.managed_clusters.list_cluster_admin_credentials(
                    resource_group_name=resource_group,
                    resource_name=cluster_name,
                )

                kubeconfig_yaml = creds.kubeconfigs[0].value.decode(
                    encoding="UTF-8"
                )
            except AzureError as e:
                raise AuthorizationException(
                    f"failed to list credentials for Azure Kubernetes "
                    f"Service cluster '{cluster_name}' in resource group "
                    f"'{resource_group}': {e}"
                ) from e

            kubeconfig = yaml.safe_load(kubeconfig_yaml)

            # Create a client-side Kubernetes connector instance with the
            # Kubernetes credentials
            try:
                # Import libraries only when needed
                from zenml.integrations.kubernetes.service_connectors.kubernetes_service_connector import (
                    KubernetesAuthenticationMethods,
                    KubernetesServiceConnector,
                    KubernetesTokenConfig,
                )
            except ImportError as e:
                raise RuntimeError(
                    f"The Kubernetes Service Connector functionality could not "
                    f"be used due to missing dependencies: {e}"
                )
            cluster_name = kubeconfig["clusters"][0]["name"]
            cluster = kubeconfig["clusters"][0]["cluster"]
            user = kubeconfig["users"][0]["user"]
            return KubernetesServiceConnector(
                id=self.id,
                name=connector_name,
                auth_method=KubernetesAuthenticationMethods.TOKEN,
                resource_type=resource_type,
                config=KubernetesTokenConfig(
                    cluster_name=cluster_name,
                    certificate_authority=cluster.get(
                        "certificate-authority-data"
                    ),
                    server=cluster["server"],
                    token=user["token"],
                    client_certificate=user.get("client-certificate-data"),
                    client_key=user.get("client-key-data"),
                ),
                expires_at=expires_at,
            )

        raise ValueError(f"Unsupported resource type: {resource_type}")
subscription: Tuple[str, str] property readonly

Get the Azure subscription ID and name.

Returns:

Type Description
Tuple[str, str]

The Azure subscription ID and name.

Exceptions:

Type Description
AuthorizationException

If the Azure subscription could not be determined or doesn't match the configured subscription ID.

tenant_id: str property readonly

Get the Azure tenant ID.

Returns:

Type Description
str

The Azure tenant ID.

Exceptions:

Type Description
AuthorizationException

If the Azure tenant ID could not be determined or doesn't match the configured tenant ID.

get_azure_credential(self, auth_method, resource_type=None, resource_id=None)

Get an Azure credential for the specified resource.

Parameters:

Name Type Description Default
auth_method str

The authentication method to use.

required
resource_type Optional[str]

The resource type to get a credential for.

None
resource_id Optional[str]

The resource ID to get a credential for.

None

Returns:

Type Description
Tuple[azure.core.credentials.TokenCredential, Optional[datetime.datetime]]

An Azure credential for the specified resource and its expiration timestamp, if applicable.

Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
def get_azure_credential(
    self,
    auth_method: str,
    resource_type: Optional[str] = None,
    resource_id: Optional[str] = None,
) -> Tuple[TokenCredential, Optional[datetime.datetime]]:
    """Get an Azure credential for the specified resource.

    Args:
        auth_method: The authentication method to use.
        resource_type: The resource type to get a credential for.
        resource_id: The resource ID to get a credential for.

    Returns:
        An Azure credential for the specified resource and its expiration
        timestamp, if applicable.
    """
    # We maintain a cache of all sessions to avoid re-authenticating
    # multiple times for the same resource
    key = auth_method
    if key in self._session_cache:
        session, expires_at = self._session_cache[key]
        if expires_at is None:
            return session, None

        # Refresh expired sessions
        now = datetime.datetime.now(datetime.timezone.utc)
        expires_at = expires_at.replace(tzinfo=datetime.timezone.utc)

        # check if the token expires in the near future
        if expires_at > now + datetime.timedelta(
            minutes=AZURE_SESSION_EXPIRATION_BUFFER
        ):
            return session, expires_at

    logger.debug(
        f"Creating Azure credential for auth method '{auth_method}', "
        f"resource type '{resource_type}' and resource ID "
        f"'{resource_id}'..."
    )
    session, expires_at = self._authenticate(
        auth_method, resource_type, resource_id
    )
    self._session_cache[key] = (session, expires_at)
    return session, expires_at
model_post_init(self, __context)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Parameters:

Name Type Description Default
self BaseModel

The BaseModel instance.

required
__context Any

The context.

required
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
def init_private_attributes(self: BaseModel, __context: Any) -> None:
    """This function is meant to behave like a BaseModel method to initialise private attributes.

    It takes context as an argument since that's what pydantic-core passes when calling it.

    Args:
        self: The BaseModel instance.
        __context: The context.
    """
    if getattr(self, '__pydantic_private__', None) is None:
        pydantic_private = {}
        for name, private_attr in self.__private_attributes__.items():
            default = private_attr.get_default()
            if default is not PydanticUndefined:
                pydantic_private[name] = default
        object_setattr(self, '__pydantic_private__', pydantic_private)
AzureServicePrincipalConfig (AzureClientConfig, AzureClientSecret)

Azure service principal configuration.

Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureServicePrincipalConfig(AzureClientConfig, AzureClientSecret):
    """Azure service principal configuration."""
ZenMLAzureTokenCredential (TokenCredential)

ZenML Azure token credential.

This class is used to provide a pre-configured token credential to Azure clients. Tokens are generated from other Azure credentials and are served to Azure clients to authenticate requests.

Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class ZenMLAzureTokenCredential(TokenCredential):
    """ZenML Azure token credential.

    This class is used to provide a pre-configured token credential to Azure
    clients. Tokens are generated from other Azure credentials and are served
    to Azure clients to authenticate requests.
    """

    def __init__(self, token: str, expires_at: datetime.datetime):
        """Initialize ZenML Azure token credential.

        Args:
            token: The token to use for authentication
            expires_at: The expiration time of the token
        """
        self.token = token

        # Convert the expiration time from UTC to local time
        expires_at.replace(tzinfo=datetime.timezone.utc)
        expires_at = expires_at.astimezone(
            datetime.datetime.now().astimezone().tzinfo
        )

        self.expires_on = int(expires_at.timestamp())

    def get_token(self, *scopes: str, **kwargs: Any) -> Any:
        """Get token.

        Args:
            *scopes: Scopes
            **kwargs: Keyword arguments

        Returns:
            Token
        """
        return AccessToken(token=self.token, expires_on=self.expires_on)
__init__(self, token, expires_at) special

Initialize ZenML Azure token credential.

Parameters:

Name Type Description Default
token str

The token to use for authentication

required
expires_at datetime

The expiration time of the token

required
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
def __init__(self, token: str, expires_at: datetime.datetime):
    """Initialize ZenML Azure token credential.

    Args:
        token: The token to use for authentication
        expires_at: The expiration time of the token
    """
    self.token = token

    # Convert the expiration time from UTC to local time
    expires_at.replace(tzinfo=datetime.timezone.utc)
    expires_at = expires_at.astimezone(
        datetime.datetime.now().astimezone().tzinfo
    )

    self.expires_on = int(expires_at.timestamp())
get_token(self, *scopes, **kwargs)

Get token.

Parameters:

Name Type Description Default
*scopes str

Scopes

()
**kwargs Any

Keyword arguments

{}

Returns:

Type Description
Any

Token

Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
def get_token(self, *scopes: str, **kwargs: Any) -> Any:
    """Get token.

    Args:
        *scopes: Scopes
        **kwargs: Keyword arguments

    Returns:
        Token
    """
    return AccessToken(token=self.token, expires_on=self.expires_on)

step_operators special

Initialization of AzureML Step Operator integration.

azureml_step_operator

Implementation of the ZenML AzureML Step Operator.

AzureMLStepOperator (BaseStepOperator)

Step operator to run a step on AzureML.

This class defines code that can set up an AzureML environment and run the ZenML entrypoint command in it.

Source code in zenml/integrations/azure/step_operators/azureml_step_operator.py
class AzureMLStepOperator(BaseStepOperator):
    """Step operator to run a step on AzureML.

    This class defines code that can set up an AzureML environment and run the
    ZenML entrypoint command in it.
    """

    @property
    def config(self) -> AzureMLStepOperatorConfig:
        """Returns the `AzureMLStepOperatorConfig` config.

        Returns:
            The configuration.
        """
        return cast(AzureMLStepOperatorConfig, self._config)

    @property
    def settings_class(self) -> Optional[Type["BaseSettings"]]:
        """Settings class for the AzureML step operator.

        Returns:
            The settings class.
        """
        return AzureMLStepOperatorSettings

    @property
    def validator(self) -> Optional[StackValidator]:
        """Validates the stack.

        Returns:
            A validator that checks that the stack contains a remote artifact
            store.
        """

        def _validate_remote_artifact_store(
            stack: "Stack",
        ) -> Tuple[bool, str]:
            if stack.artifact_store.config.is_local:
                return False, (
                    "The AzureML step operator runs code remotely and "
                    "needs to write files into the artifact store, but the "
                    f"artifact store `{stack.artifact_store.name}` of the "
                    "active stack is local. Please ensure that your stack "
                    "contains a remote artifact store when using the AzureML "
                    "step operator."
                )

            return True, ""

        return StackValidator(
            custom_validation_function=_validate_remote_artifact_store,
        )

    def _get_authentication(self) -> Optional[AbstractAuthentication]:
        """Returns the authentication object for the AzureML environment.

        Returns:
            The authentication object for the AzureML environment.
        """
        if (
            self.config.tenant_id
            and self.config.service_principal_id
            and self.config.service_principal_password
        ):
            return ServicePrincipalAuthentication(
                tenant_id=self.config.tenant_id,
                service_principal_id=self.config.service_principal_id,
                service_principal_password=self.config.service_principal_password,
            )
        return None

    def _prepare_environment(
        self,
        workspace: Workspace,
        docker_settings: "DockerSettings",
        run_name: str,
        environment_variables: Dict[str, str],
        environment_name: Optional[str] = None,
    ) -> Environment:
        """Prepares the environment in which Azure will run all jobs.

        Args:
            workspace: The AzureML Workspace that has configuration
                for a storage account, container registry among other
                things.
            docker_settings: The Docker settings for this step.
            run_name: The name of the pipeline run that can be used
                for naming environments and runs.
            environment_variables: Environment variables to set in the
                environment.
            environment_name: Optional name of an existing environment to use.

        Returns:
            The AzureML Environment object.
        """
        docker_image_builder = PipelineDockerImageBuilder()
        requirements_files = docker_image_builder.gather_requirements_files(
            docker_settings=docker_settings,
            stack=Client().active_stack,
            log=False,
        )
        requirements = list(
            itertools.chain.from_iterable(
                r[1].split("\n") for r in requirements_files
            )
        )
        requirements.append(f"zenml=={zenml.__version__}")
        logger.info(
            "Using requirements for AzureML step operator environment: %s",
            requirements,
        )
        if environment_name:
            environment = Environment.get(
                workspace=workspace, name=environment_name
            )
            if not environment.python.conda_dependencies:
                environment.python.conda_dependencies = (
                    CondaDependencies.create(
                        python_version=ZenMLEnvironment.python_version()
                    )
                )

            for requirement in requirements:
                environment.python.conda_dependencies.add_pip_package(
                    requirement
                )
        else:
            environment = Environment(name=f"zenml-{run_name}")
            environment.python.conda_dependencies = CondaDependencies.create(
                pip_packages=requirements,
                python_version=ZenMLEnvironment.python_version(),
            )

            if docker_settings.parent_image:
                # replace the default azure base image
                environment.docker.base_image = docker_settings.parent_image

        # set credentials to access azure storage
        for key in [
            "AZURE_STORAGE_ACCOUNT_KEY",
            "AZURE_STORAGE_ACCOUNT_NAME",
            "AZURE_STORAGE_CONNECTION_STRING",
            "AZURE_STORAGE_SAS_TOKEN",
        ]:
            value = os.getenv(key)
            if value:
                environment_variables[key] = value

        environment_variables[ENV_ZENML_CONFIG_PATH] = (
            f"./{DOCKER_IMAGE_ZENML_CONFIG_DIR}"
        )
        environment_variables.update(docker_settings.environment)
        environment.environment_variables = environment_variables
        return environment

    def launch(
        self,
        info: "StepRunInfo",
        entrypoint_command: List[str],
        environment: Dict[str, str],
    ) -> None:
        """Launches a step on AzureML.

        Args:
            info: Information about the step run.
            entrypoint_command: Command that executes the step.
            environment: Environment variables to set in the step operator
                environment.
        """
        if not info.config.resource_settings.empty:
            logger.warning(
                "Specifying custom step resources is not supported for "
                "the AzureML step operator. If you want to run this step "
                "operator on specific resources, you can do so by creating an "
                "Azure compute target (https://docs.microsoft.com/en-us/azure/machine-learning/concept-compute-target) "
                "with a specific machine type and then updating this step "
                "operator: `zenml step-operator update %s "
                "--compute_target_name=<COMPUTE_TARGET_NAME>`",
                self.name,
            )

        unused_docker_fields = [
            "dockerfile",
            "build_context_root",
            "build_options",
            "skip_build",
            "target_repository",
            "dockerignore",
            "copy_files",
            "copy_global_config",
            "apt_packages",
            "user",
            "source_files",
        ]
        docker_settings = info.config.docker_settings
        ignored_docker_fields = docker_settings.model_fields_set.intersection(
            unused_docker_fields
        )

        if ignored_docker_fields:
            logger.warning(
                "The AzureML step operator currently does not support all "
                "options defined in your Docker settings. Ignoring all "
                "values set for the attributes: %s",
                ignored_docker_fields,
            )

        settings = cast(AzureMLStepOperatorSettings, self.get_settings(info))

        workspace = Workspace.get(
            subscription_id=self.config.subscription_id,
            resource_group=self.config.resource_group,
            name=self.config.workspace_name,
            auth=self._get_authentication(),
        )

        source_directory = source_utils.get_source_root()

        environment = self._prepare_environment(
            workspace=workspace,
            docker_settings=docker_settings,
            run_name=info.run_name,
            environment_variables=environment,
            environment_name=settings.environment_name,
        )
        compute_target = ComputeTarget(
            workspace=workspace, name=self.config.compute_target_name
        )

        run_config = ScriptRunConfig(
            source_directory=source_directory,
            environment=environment,
            compute_target=compute_target,
            command=entrypoint_command,
        )

        experiment = Experiment(workspace=workspace, name=info.pipeline.name)
        run = experiment.submit(config=run_config)

        run.display_name = info.run_name
        info.force_write_logs()
        run.wait_for_completion(show_output=True)
config: AzureMLStepOperatorConfig property readonly

Returns the AzureMLStepOperatorConfig config.

Returns:

Type Description
AzureMLStepOperatorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property readonly

Settings class for the AzureML step operator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[zenml.stack.stack_validator.StackValidator] property readonly

Validates the stack.

Returns:

Type Description
Optional[zenml.stack.stack_validator.StackValidator]

A validator that checks that the stack contains a remote artifact store.

launch(self, info, entrypoint_command, environment)

Launches a step on AzureML.

Parameters:

Name Type Description Default
info StepRunInfo

Information about the step run.

required
entrypoint_command List[str]

Command that executes the step.

required
environment Dict[str, str]

Environment variables to set in the step operator environment.

required
Source code in zenml/integrations/azure/step_operators/azureml_step_operator.py
def launch(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
    environment: Dict[str, str],
) -> None:
    """Launches a step on AzureML.

    Args:
        info: Information about the step run.
        entrypoint_command: Command that executes the step.
        environment: Environment variables to set in the step operator
            environment.
    """
    if not info.config.resource_settings.empty:
        logger.warning(
            "Specifying custom step resources is not supported for "
            "the AzureML step operator. If you want to run this step "
            "operator on specific resources, you can do so by creating an "
            "Azure compute target (https://docs.microsoft.com/en-us/azure/machine-learning/concept-compute-target) "
            "with a specific machine type and then updating this step "
            "operator: `zenml step-operator update %s "
            "--compute_target_name=<COMPUTE_TARGET_NAME>`",
            self.name,
        )

    unused_docker_fields = [
        "dockerfile",
        "build_context_root",
        "build_options",
        "skip_build",
        "target_repository",
        "dockerignore",
        "copy_files",
        "copy_global_config",
        "apt_packages",
        "user",
        "source_files",
    ]
    docker_settings = info.config.docker_settings
    ignored_docker_fields = docker_settings.model_fields_set.intersection(
        unused_docker_fields
    )

    if ignored_docker_fields:
        logger.warning(
            "The AzureML step operator currently does not support all "
            "options defined in your Docker settings. Ignoring all "
            "values set for the attributes: %s",
            ignored_docker_fields,
        )

    settings = cast(AzureMLStepOperatorSettings, self.get_settings(info))

    workspace = Workspace.get(
        subscription_id=self.config.subscription_id,
        resource_group=self.config.resource_group,
        name=self.config.workspace_name,
        auth=self._get_authentication(),
    )

    source_directory = source_utils.get_source_root()

    environment = self._prepare_environment(
        workspace=workspace,
        docker_settings=docker_settings,
        run_name=info.run_name,
        environment_variables=environment,
        environment_name=settings.environment_name,
    )
    compute_target = ComputeTarget(
        workspace=workspace, name=self.config.compute_target_name
    )

    run_config = ScriptRunConfig(
        source_directory=source_directory,
        environment=environment,
        compute_target=compute_target,
        command=entrypoint_command,
    )

    experiment = Experiment(workspace=workspace, name=info.pipeline.name)
    run = experiment.submit(config=run_config)

    run.display_name = info.run_name
    info.force_write_logs()
    run.wait_for_completion(show_output=True)