Skip to content

Azure

zenml.integrations.azure special

Initialization of the ZenML Azure integration.

The Azure integration submodule provides a way to run ZenML pipelines in a cloud environment. Specifically, it allows the use of cloud artifact stores, and an io module to handle file operations on Azure Blob Storage. The Azure Step Operator integration submodule provides a way to run ZenML steps in AzureML.

AzureIntegration (Integration)

Definition of Azure integration for ZenML.

Source code in zenml/integrations/azure/__init__.py
class AzureIntegration(Integration):
    """Definition of Azure integration for ZenML."""

    NAME = AZURE
    REQUIREMENTS = [
        "adlfs>=2021.10.0",
        "azure-keyvault-keys",
        "azure-keyvault-secrets",
        "azure-identity",
        "azureml-core==1.56.0",
        "azure-mgmt-containerservice>=20.0.0",
        "azure-storage-blob==12.17.0",  # temporary fix for https://github.com/Azure/azure-sdk-for-python/issues/32056
        "kubernetes",
        "azure-ai-ml==1.18.0",
        # In azureml/core/_metrics.py:212 of azureml-core 1.56.0, they use 
        # an attribute that was removed in Numpy 2.0. However, AzureML itself
        # does not have a limitation on numpy.
        "numpy<2.0",
    ]
    REQUIREMENTS_IGNORED_ON_UNINSTALL = ["kubernetes", "numpy"]

    @classmethod
    def activate(cls) -> None:
        """Activate the Azure integration."""
        from zenml.integrations.azure import service_connectors  # noqa

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declares the flavors for the integration.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.azure.flavors import (
            AzureArtifactStoreFlavor,
            AzureMLStepOperatorFlavor,
            AzureMLOrchestratorFlavor,
        )

        return [
            AzureArtifactStoreFlavor,
            AzureMLStepOperatorFlavor,
            AzureMLOrchestratorFlavor,
        ]

activate() classmethod

Activate the Azure integration.

Source code in zenml/integrations/azure/__init__.py
@classmethod
def activate(cls) -> None:
    """Activate the Azure integration."""
    from zenml.integrations.azure import service_connectors  # noqa

flavors() classmethod

Declares the flavors for the integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/azure/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declares the flavors for the integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.azure.flavors import (
        AzureArtifactStoreFlavor,
        AzureMLStepOperatorFlavor,
        AzureMLOrchestratorFlavor,
    )

    return [
        AzureArtifactStoreFlavor,
        AzureMLStepOperatorFlavor,
        AzureMLOrchestratorFlavor,
    ]

artifact_stores special

Initialization of the Azure Artifact Store integration.

azure_artifact_store

Implementation of the Azure Artifact Store integration.

AzureArtifactStore (BaseArtifactStore, AuthenticationMixin)

Artifact Store for Microsoft Azure based artifacts.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
class AzureArtifactStore(BaseArtifactStore, AuthenticationMixin):
    """Artifact Store for Microsoft Azure based artifacts."""

    _filesystem: Optional[adlfs.AzureBlobFileSystem] = None

    @property
    def config(self) -> AzureArtifactStoreConfig:
        """Returns the `AzureArtifactStoreConfig` config.

        Returns:
            The configuration.
        """
        return cast(AzureArtifactStoreConfig, self._config)

    def get_credentials(self) -> Optional[AzureSecretSchema]:
        """Returns the credentials for the Azure Artifact Store if configured.

        Returns:
            The credentials.

        Raises:
            RuntimeError: If the connector is not configured with Azure service
                principal credentials.
        """
        connector = self.get_connector()
        if connector:
            from azure.identity import ClientSecretCredential
            from azure.storage.blob import BlobServiceClient

            client = connector.connect()
            if not isinstance(client, BlobServiceClient):
                raise RuntimeError(
                    f"Expected a {BlobServiceClient.__module__}."
                    f"{BlobServiceClient.__name__} object while "
                    f"trying to use the linked connector, but got "
                    f"{type(client)}."
                )
            # Get the credentials from the client
            credentials = client.credential
            if not isinstance(credentials, ClientSecretCredential):
                raise RuntimeError(
                    "The Azure Artifact Store connector can only be used "
                    "with a service connector that is configured with "
                    "Azure service principal credentials."
                )
            return AzureSecretSchema(
                client_id=credentials._client_id,
                client_secret=credentials._client_credential,
                tenant_id=credentials._tenant_id,
                account_name=client.account_name,
            )

        secret = self.get_typed_authentication_secret(
            expected_schema_type=AzureSecretSchema
        )
        return secret

    @property
    def filesystem(self) -> adlfs.AzureBlobFileSystem:
        """The adlfs filesystem to access this artifact store.

        Returns:
            The adlfs filesystem to access this artifact store.
        """
        if not self._filesystem:
            secret = self.get_credentials()
            credentials = secret.get_values() if secret else {}

            self._filesystem = adlfs.AzureBlobFileSystem(
                **credentials,
                anon=False,
                use_listings_cache=False,
            )
        return self._filesystem

    def _split_path(self, path: PathType) -> Tuple[str, str]:
        """Splits a path into the filesystem prefix and remainder.

        Example:
        ```python
        prefix, remainder = ZenAzure._split_path("az://my_container/test.txt")
        print(prefix, remainder)  # "az://" "my_container/test.txt"
        ```

        Args:
            path: The path to split.

        Returns:
            A tuple of the filesystem prefix and the remainder.
        """
        path = convert_to_str(path)
        prefix = ""
        for potential_prefix in self.config.SUPPORTED_SCHEMES:
            if path.startswith(potential_prefix):
                prefix = potential_prefix
                path = path[len(potential_prefix) :]
                break

        return prefix, path

    def open(self, path: PathType, mode: str = "r") -> Any:
        """Open a file at the given path.

        Args:
            path: Path of the file to open.
            mode: Mode in which to open the file. Currently, only
                'rb' and 'wb' to read and write binary files are supported.

        Returns:
            A file-like object.
        """
        return self.filesystem.open(path=path, mode=mode)

    def copyfile(
        self, src: PathType, dst: PathType, overwrite: bool = False
    ) -> None:
        """Copy a file.

        Args:
            src: The path to copy from.
            dst: The path to copy to.
            overwrite: If a file already exists at the destination, this
                method will overwrite it if overwrite=`True` and
                raise a FileExistsError otherwise.

        Raises:
            FileExistsError: If a file already exists at the destination
                and overwrite is not set to `True`.
        """
        if not overwrite and self.filesystem.exists(dst):
            raise FileExistsError(
                f"Unable to copy to destination '{convert_to_str(dst)}', "
                f"file already exists. Set `overwrite=True` to copy anyway."
            )

        # TODO [ENG-151]: Check if it works with overwrite=True or if we need to
        #  manually remove it first
        self.filesystem.copy(path1=src, path2=dst)

    def exists(self, path: PathType) -> bool:
        """Check whether a path exists.

        Args:
            path: The path to check.

        Returns:
            True if the path exists, False otherwise.
        """
        return self.filesystem.exists(path=path)  # type: ignore[no-any-return]

    def glob(self, pattern: PathType) -> List[PathType]:
        """Return all paths that match the given glob pattern.

        The glob pattern may include:
        - '*' to match any number of characters
        - '?' to match a single character
        - '[...]' to match one of the characters inside the brackets
        - '**' as the full name of a path component to match to search
            in subdirectories of any depth (e.g. '/some_dir/**/some_file)

        Args:
            pattern: The glob pattern to match, see details above.

        Returns:
            A list of paths that match the given glob pattern.
        """
        prefix, _ = self._split_path(pattern)
        return [
            f"{prefix}{path}" for path in self.filesystem.glob(path=pattern)
        ]

    def isdir(self, path: PathType) -> bool:
        """Check whether a path is a directory.

        Args:
            path: The path to check.

        Returns:
            True if the path is a directory, False otherwise.
        """
        return self.filesystem.isdir(path=path)  # type: ignore[no-any-return]

    def listdir(self, path: PathType) -> List[PathType]:
        """Return a list of files in a directory.

        Args:
            path: The path to list.

        Returns:
            A list of files in the given directory.
        """
        _, path = self._split_path(path)

        def _extract_basename(file_dict: Dict[str, Any]) -> str:
            """Extracts the basename from a dictionary returned by the Azure filesystem.

            Args:
                file_dict: A dictionary returned by the Azure filesystem.

            Returns:
                The basename of the file.
            """
            file_path = cast(str, file_dict["name"])
            base_name = file_path[len(path) :]
            return base_name.lstrip("/")

        return [
            _extract_basename(dict_)
            for dict_ in self.filesystem.listdir(path=path)
        ]

    def makedirs(self, path: PathType) -> None:
        """Create a directory at the given path.

        If needed also create missing parent directories.

        Args:
            path: The path to create.
        """
        self.filesystem.makedirs(path=path, exist_ok=True)

    def mkdir(self, path: PathType) -> None:
        """Create a directory at the given path.

        Args:
            path: The path to create.
        """
        self.filesystem.makedir(path=path, exist_ok=True)

    def remove(self, path: PathType) -> None:
        """Remove the file at the given path.

        Args:
            path: The path to remove.
        """
        self.filesystem.rm_file(path=path)

    def rename(
        self, src: PathType, dst: PathType, overwrite: bool = False
    ) -> None:
        """Rename source file to destination file.

        Args:
            src: The path of the file to rename.
            dst: The path to rename the source file to.
            overwrite: If a file already exists at the destination, this
                method will overwrite it if overwrite=`True` and
                raise a FileExistsError otherwise.

        Raises:
            FileExistsError: If a file already exists at the destination
                and overwrite is not set to `True`.
        """
        if not overwrite and self.filesystem.exists(dst):
            raise FileExistsError(
                f"Unable to rename file to '{convert_to_str(dst)}', "
                f"file already exists. Set `overwrite=True` to rename anyway."
            )

        # TODO [ENG-152]: Check if it works with overwrite=True or if we need
        #  to manually remove it first
        self.filesystem.rename(path1=src, path2=dst)

    def rmtree(self, path: PathType) -> None:
        """Remove the given directory.

        Args:
            path: The path of the directory to remove.
        """
        self.filesystem.delete(path=path, recursive=True)

    def stat(self, path: PathType) -> Dict[str, Any]:
        """Return stat info for the given path.

        Args:
            path: The path to get stat info for.

        Returns:
            Stat info.
        """
        return self.filesystem.stat(path=path)  # type: ignore[no-any-return]

    def size(self, path: PathType) -> int:
        """Get the size of a file in bytes.

        Args:
            path: The path to the file.

        Returns:
            The size of the file in bytes.
        """
        return self.filesystem.size(path=path)  # type: ignore[no-any-return]

    def walk(
        self,
        top: PathType,
        topdown: bool = True,
        onerror: Optional[Callable[..., None]] = None,
    ) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
        """Return an iterator that walks the contents of the given directory.

        Args:
            top: Path of directory to walk.
            topdown: Unused argument to conform to interface.
            onerror: Unused argument to conform to interface.

        Yields:
            An Iterable of Tuples, each of which contain the path of the current
            directory path, a list of directories inside the current directory
            and a list of files inside the current directory.
        """
        # TODO [ENG-153]: Additional params
        prefix, _ = self._split_path(top)
        for (
            directory,
            subdirectories,
            files,
        ) in self.filesystem.walk(path=top):
            yield f"{prefix}{directory}", subdirectories, files
config: AzureArtifactStoreConfig property readonly

Returns the AzureArtifactStoreConfig config.

Returns:

Type Description
AzureArtifactStoreConfig

The configuration.

filesystem: adlfs.AzureBlobFileSystem property readonly

The adlfs filesystem to access this artifact store.

Returns:

Type Description
adlfs.AzureBlobFileSystem

The adlfs filesystem to access this artifact store.

copyfile(self, src, dst, overwrite=False)

Copy a file.

Parameters:

Name Type Description Default
src Union[bytes, str]

The path to copy from.

required
dst Union[bytes, str]

The path to copy to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Exceptions:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def copyfile(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Copy a file.

    Args:
        src: The path to copy from.
        dst: The path to copy to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to copy to destination '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to copy anyway."
        )

    # TODO [ENG-151]: Check if it works with overwrite=True or if we need to
    #  manually remove it first
    self.filesystem.copy(path1=src, path2=dst)
exists(self, path)

Check whether a path exists.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to check.

required

Returns:

Type Description
bool

True if the path exists, False otherwise.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def exists(self, path: PathType) -> bool:
    """Check whether a path exists.

    Args:
        path: The path to check.

    Returns:
        True if the path exists, False otherwise.
    """
    return self.filesystem.exists(path=path)  # type: ignore[no-any-return]
get_credentials(self)

Returns the credentials for the Azure Artifact Store if configured.

Returns:

Type Description
Optional[zenml.secret.schemas.azure_secret_schema.AzureSecretSchema]

The credentials.

Exceptions:

Type Description
RuntimeError

If the connector is not configured with Azure service principal credentials.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def get_credentials(self) -> Optional[AzureSecretSchema]:
    """Returns the credentials for the Azure Artifact Store if configured.

    Returns:
        The credentials.

    Raises:
        RuntimeError: If the connector is not configured with Azure service
            principal credentials.
    """
    connector = self.get_connector()
    if connector:
        from azure.identity import ClientSecretCredential
        from azure.storage.blob import BlobServiceClient

        client = connector.connect()
        if not isinstance(client, BlobServiceClient):
            raise RuntimeError(
                f"Expected a {BlobServiceClient.__module__}."
                f"{BlobServiceClient.__name__} object while "
                f"trying to use the linked connector, but got "
                f"{type(client)}."
            )
        # Get the credentials from the client
        credentials = client.credential
        if not isinstance(credentials, ClientSecretCredential):
            raise RuntimeError(
                "The Azure Artifact Store connector can only be used "
                "with a service connector that is configured with "
                "Azure service principal credentials."
            )
        return AzureSecretSchema(
            client_id=credentials._client_id,
            client_secret=credentials._client_credential,
            tenant_id=credentials._tenant_id,
            account_name=client.account_name,
        )

    secret = self.get_typed_authentication_secret(
        expected_schema_type=AzureSecretSchema
    )
    return secret
glob(self, pattern)

Return all paths that match the given glob pattern.

The glob pattern may include: - '' to match any number of characters - '?' to match a single character - '[...]' to match one of the characters inside the brackets - '' as the full name of a path component to match to search in subdirectories of any depth (e.g. '/some_dir/*/some_file)

Parameters:

Name Type Description Default
pattern Union[bytes, str]

The glob pattern to match, see details above.

required

Returns:

Type Description
List[Union[bytes, str]]

A list of paths that match the given glob pattern.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def glob(self, pattern: PathType) -> List[PathType]:
    """Return all paths that match the given glob pattern.

    The glob pattern may include:
    - '*' to match any number of characters
    - '?' to match a single character
    - '[...]' to match one of the characters inside the brackets
    - '**' as the full name of a path component to match to search
        in subdirectories of any depth (e.g. '/some_dir/**/some_file)

    Args:
        pattern: The glob pattern to match, see details above.

    Returns:
        A list of paths that match the given glob pattern.
    """
    prefix, _ = self._split_path(pattern)
    return [
        f"{prefix}{path}" for path in self.filesystem.glob(path=pattern)
    ]
isdir(self, path)

Check whether a path is a directory.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to check.

required

Returns:

Type Description
bool

True if the path is a directory, False otherwise.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def isdir(self, path: PathType) -> bool:
    """Check whether a path is a directory.

    Args:
        path: The path to check.

    Returns:
        True if the path is a directory, False otherwise.
    """
    return self.filesystem.isdir(path=path)  # type: ignore[no-any-return]
listdir(self, path)

Return a list of files in a directory.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to list.

required

Returns:

Type Description
List[Union[bytes, str]]

A list of files in the given directory.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def listdir(self, path: PathType) -> List[PathType]:
    """Return a list of files in a directory.

    Args:
        path: The path to list.

    Returns:
        A list of files in the given directory.
    """
    _, path = self._split_path(path)

    def _extract_basename(file_dict: Dict[str, Any]) -> str:
        """Extracts the basename from a dictionary returned by the Azure filesystem.

        Args:
            file_dict: A dictionary returned by the Azure filesystem.

        Returns:
            The basename of the file.
        """
        file_path = cast(str, file_dict["name"])
        base_name = file_path[len(path) :]
        return base_name.lstrip("/")

    return [
        _extract_basename(dict_)
        for dict_ in self.filesystem.listdir(path=path)
    ]
makedirs(self, path)

Create a directory at the given path.

If needed also create missing parent directories.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to create.

required
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def makedirs(self, path: PathType) -> None:
    """Create a directory at the given path.

    If needed also create missing parent directories.

    Args:
        path: The path to create.
    """
    self.filesystem.makedirs(path=path, exist_ok=True)
mkdir(self, path)

Create a directory at the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to create.

required
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def mkdir(self, path: PathType) -> None:
    """Create a directory at the given path.

    Args:
        path: The path to create.
    """
    self.filesystem.makedir(path=path, exist_ok=True)
open(self, path, mode='r')

Open a file at the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

Path of the file to open.

required
mode str

Mode in which to open the file. Currently, only 'rb' and 'wb' to read and write binary files are supported.

'r'

Returns:

Type Description
Any

A file-like object.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def open(self, path: PathType, mode: str = "r") -> Any:
    """Open a file at the given path.

    Args:
        path: Path of the file to open.
        mode: Mode in which to open the file. Currently, only
            'rb' and 'wb' to read and write binary files are supported.

    Returns:
        A file-like object.
    """
    return self.filesystem.open(path=path, mode=mode)
remove(self, path)

Remove the file at the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to remove.

required
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def remove(self, path: PathType) -> None:
    """Remove the file at the given path.

    Args:
        path: The path to remove.
    """
    self.filesystem.rm_file(path=path)
rename(self, src, dst, overwrite=False)

Rename source file to destination file.

Parameters:

Name Type Description Default
src Union[bytes, str]

The path of the file to rename.

required
dst Union[bytes, str]

The path to rename the source file to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Exceptions:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def rename(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Rename source file to destination file.

    Args:
        src: The path of the file to rename.
        dst: The path to rename the source file to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to rename file to '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to rename anyway."
        )

    # TODO [ENG-152]: Check if it works with overwrite=True or if we need
    #  to manually remove it first
    self.filesystem.rename(path1=src, path2=dst)
rmtree(self, path)

Remove the given directory.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path of the directory to remove.

required
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def rmtree(self, path: PathType) -> None:
    """Remove the given directory.

    Args:
        path: The path of the directory to remove.
    """
    self.filesystem.delete(path=path, recursive=True)
size(self, path)

Get the size of a file in bytes.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to the file.

required

Returns:

Type Description
int

The size of the file in bytes.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def size(self, path: PathType) -> int:
    """Get the size of a file in bytes.

    Args:
        path: The path to the file.

    Returns:
        The size of the file in bytes.
    """
    return self.filesystem.size(path=path)  # type: ignore[no-any-return]
stat(self, path)

Return stat info for the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to get stat info for.

required

Returns:

Type Description
Dict[str, Any]

Stat info.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def stat(self, path: PathType) -> Dict[str, Any]:
    """Return stat info for the given path.

    Args:
        path: The path to get stat info for.

    Returns:
        Stat info.
    """
    return self.filesystem.stat(path=path)  # type: ignore[no-any-return]
walk(self, top, topdown=True, onerror=None)

Return an iterator that walks the contents of the given directory.

Parameters:

Name Type Description Default
top Union[bytes, str]

Path of directory to walk.

required
topdown bool

Unused argument to conform to interface.

True
onerror Optional[Callable[..., NoneType]]

Unused argument to conform to interface.

None

Yields:

Type Description
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]]

An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def walk(
    self,
    top: PathType,
    topdown: bool = True,
    onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
    """Return an iterator that walks the contents of the given directory.

    Args:
        top: Path of directory to walk.
        topdown: Unused argument to conform to interface.
        onerror: Unused argument to conform to interface.

    Yields:
        An Iterable of Tuples, each of which contain the path of the current
        directory path, a list of directories inside the current directory
        and a list of files inside the current directory.
    """
    # TODO [ENG-153]: Additional params
    prefix, _ = self._split_path(top)
    for (
        directory,
        subdirectories,
        files,
    ) in self.filesystem.walk(path=top):
        yield f"{prefix}{directory}", subdirectories, files

azureml_utils

AzureML definitions.

check_settings_and_compute_configuration(parameter, settings, compute)

Utility function comparing a parameter between settings and compute.

Parameters:

Name Type Description Default
parameter str

the name of the parameter.

required
settings AzureMLComputeSettings

The AzureML orchestrator settings.

required
compute azure.ai.ml.entities.Compute

The compute instance or cluster from AzureML.

required
Source code in zenml/integrations/azure/azureml_utils.py
def check_settings_and_compute_configuration(
    parameter: str,
    settings: AzureMLComputeSettings,
    compute: Compute,
) -> None:
    """Utility function comparing a parameter between settings and compute.

    Args:
        parameter: the name of the parameter.
        settings: The AzureML orchestrator settings.
        compute: The compute instance or cluster from AzureML.
    """
    # Check the compute size
    compute_value = getattr(compute, parameter)
    settings_value = getattr(settings, parameter)

    if settings_value is not None and settings_value != compute_value:
        logger.warning(
            f"The '{parameter}' defined in the settings '{settings_value}' "
            "does not match the actual parameter of the instance: "
            f"'{compute_value}'. Will ignore this setting for now."
        )

create_or_get_compute(client, settings, default_compute_name)

Creates or fetches the compute target if defined in the settings.

Parameters:

Name Type Description Default
client azure.ai.ml.MLClient

the AzureML client.

required
settings AzureMLComputeSettings

the settings for the orchestrator.

required
default_compute_name str

the default name for the compute target, if one is not provided in the settings.

required

Returns:

Type Description
Optional[str]

None, if the orchestrator is using serverless compute or str, the name of the compute target (instance or cluster).

Exceptions:

Type Description
RuntimeError

if the fetched compute target is unsupported or the mode defined in the setting does not match the type of the compute target.

Source code in zenml/integrations/azure/azureml_utils.py
def create_or_get_compute(
    client: MLClient,
    settings: AzureMLComputeSettings,
    default_compute_name: str,
) -> Optional[str]:
    """Creates or fetches the compute target if defined in the settings.

    Args:
        client: the AzureML client.
        settings: the settings for the orchestrator.
        default_compute_name: the default name for the compute target, if one
            is not provided in the settings.

    Returns:
        None, if the orchestrator is using serverless compute or
        str, the name of the compute target (instance or cluster).

    Raises:
        RuntimeError: if the fetched compute target is unsupported or the
            mode defined in the setting does not match the type of the
            compute target.
    """
    # If the mode is serverless, we can not fetch anything anyhow
    if settings.mode == AzureMLComputeTypes.SERVERLESS:
        return None

    # If a name is not provided, generate one based on the orchestrator id
    compute_name = settings.compute_name or default_compute_name

    # Try to fetch the compute target
    try:
        compute = client.compute.get(compute_name)

        logger.info(f"Using existing compute target: '{compute_name}'.")

        # Check if compute size matches with the settings
        check_settings_and_compute_configuration(
            parameter="size", settings=settings, compute=compute
        )

        compute_type = compute.type

        # Check the type and matches the settings
        if compute_type == "computeinstance":  # Compute Instance
            if settings.mode != AzureMLComputeTypes.COMPUTE_INSTANCE:
                raise RuntimeError(
                    "The mode of operation for the compute target defined"
                    f"in the settings '{settings.mode}' does not match "
                    f"the type of the compute target: `{compute_name}` "
                    "which is a 'compute-instance'. Please make sure that "
                    "the settings are adjusted properly."
                )

            if compute.state != "Running":
                raise RuntimeError(
                    f"The compute instance `{compute_name}` is not in a "
                    "running state at the moment. Please make sure that "
                    "the compute target is running, before executing the "
                    "pipeline."
                )

            # Idle time before shutdown
            check_settings_and_compute_configuration(
                parameter="idle_time_before_shutdown_minutes",
                settings=settings,
                compute=compute,
            )

        elif compute_type == "amIcompute":  # Compute Cluster
            if settings.mode != AzureMLComputeTypes.COMPUTE_CLUSTER:
                raise RuntimeError(
                    "The mode of operation for the compute target defined "
                    f"in the settings '{settings.mode}' does not match "
                    f"the type of the compute target: `{compute_name}` "
                    "which is a 'compute-cluster'. Please make sure that "
                    "the settings are adjusted properly."
                )

            if compute.provisioning_state != "Succeeded":
                raise RuntimeError(
                    f"The provisioning state '{compute.provisioning_state}'"
                    f"of the compute cluster `{compute_name}` is not "
                    "successful. Please make sure that the compute cluster "
                    "is provisioned properly, before executing the "
                    "pipeline."
                )

            for parameter in [
                "idle_time_before_scale_down",
                "max_instances",
                "min_instances",
                "tier",
                "location",
            ]:
                # Check all possible configurations
                check_settings_and_compute_configuration(
                    parameter=parameter, settings=settings, compute=compute
                )
        else:
            raise RuntimeError(f"Unsupported compute type: {compute_type}")
        return compute_name

    # If the compute target does not exist create it
    except ResourceNotFoundError:
        logger.info(
            "Can not find the compute target with name: " f"'{compute_name}':"
        )

        if settings.mode == AzureMLComputeTypes.COMPUTE_INSTANCE:
            logger.info(
                "Creating a new compute instance. This might take a "
                "few minutes."
            )

            from azure.ai.ml.entities import ComputeInstance

            compute_instance = ComputeInstance(
                name=compute_name,
                size=settings.size,
                idle_time_before_shutdown_minutes=settings.idle_time_before_shutdown_minutes,
            )
            client.begin_create_or_update(compute_instance).result()
            return compute_name

        elif settings.mode == AzureMLComputeTypes.COMPUTE_CLUSTER:
            logger.info(
                "Creating a new compute cluster. This might take a "
                "few minutes."
            )

            from azure.ai.ml.entities import AmlCompute

            compute_cluster = AmlCompute(
                name=compute_name,
                size=settings.size,
                location=settings.location,
                min_instances=settings.min_instances,
                max_instances=settings.max_instances,
                idle_time_before_scale_down=settings.idle_time_before_scaledown_down,
                tier=settings.tier,
            )
            client.begin_create_or_update(compute_cluster).result()
            return compute_name

    return None

flavors special

Azure integration flavors.

azure_artifact_store_flavor

Azure artifact store flavor.

AzureArtifactStoreConfig (BaseArtifactStoreConfig, AuthenticationConfigMixin)

Configuration class for Azure Artifact Store.

Source code in zenml/integrations/azure/flavors/azure_artifact_store_flavor.py
class AzureArtifactStoreConfig(
    BaseArtifactStoreConfig, AuthenticationConfigMixin
):
    """Configuration class for Azure Artifact Store."""

    SUPPORTED_SCHEMES: ClassVar[Set[str]] = {"abfs://", "az://"}
AzureArtifactStoreFlavor (BaseArtifactStoreFlavor)

Azure Artifact Store flavor.

Source code in zenml/integrations/azure/flavors/azure_artifact_store_flavor.py
class AzureArtifactStoreFlavor(BaseArtifactStoreFlavor):
    """Azure Artifact Store flavor."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return AZURE_ARTIFACT_STORE_FLAVOR

    @property
    def service_connector_requirements(
        self,
    ) -> Optional[ServiceConnectorRequirements]:
        """Service connector resource requirements for service connectors.

        Specifies resource requirements that are used to filter the available
        service connector types that are compatible with this flavor.

        Returns:
            Requirements for compatible service connectors, if a service
            connector is required for this flavor.
        """
        return ServiceConnectorRequirements(
            connector_type=AZURE_CONNECTOR_TYPE,
            resource_type=BLOB_RESOURCE_TYPE,
            resource_id_attr="path",
        )

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/artifact_store/azure.png"

    @property
    def config_class(self) -> Type[AzureArtifactStoreConfig]:
        """Returns AzureArtifactStoreConfig config class.

        Returns:
            The config class.
        """
        return AzureArtifactStoreConfig

    @property
    def implementation_class(self) -> Type["AzureArtifactStore"]:
        """Implementation class.

        Returns:
            The implementation class.
        """
        from zenml.integrations.azure.artifact_stores import AzureArtifactStore

        return AzureArtifactStore
config_class: Type[zenml.integrations.azure.flavors.azure_artifact_store_flavor.AzureArtifactStoreConfig] property readonly

Returns AzureArtifactStoreConfig config class.

Returns:

Type Description
Type[zenml.integrations.azure.flavors.azure_artifact_store_flavor.AzureArtifactStoreConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[AzureArtifactStore] property readonly

Implementation class.

Returns:

Type Description
Type[AzureArtifactStore]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements] property readonly

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service connector is required for this flavor.

azureml

AzureML definitions.

AzureMLComputeSettings (BaseSettings)

Settings for the AzureML compute.

These settings adjust the compute resources that will be used by the pipeline execution.

There are three possible use cases for this implementation:

1. Serverless compute (default behavior):
    - The `mode` is set to `serverless` (default behavior).
    - All the other parameters become irrelevant and will throw a
    warning if set.

2. Compute instance:
    - The `mode` is set to `compute-instance`.
    - In this case, users have to provide a `compute-name`.
        - If a compute instance exists with this name, this instance
        will be used and all the other parameters become irrelevant
        and will throw a warning if set.
        - If a compute instance does not already exist, ZenML will
        create it. You can use the parameters `compute_size` and
        `idle_type_before_shutdown_minutes` for this operation.

3. Compute cluster:
    - The `mode` is set to `compute-cluster`.
    - In this case, users have to provide a `compute-name`.
        - If a compute cluster exists with this name, this instance
        will be used and all the other parameters become irrelevant
        and will throw a warning if set.
        - If a compute cluster does not already exist, ZenML will
        create it. You can set the additional parameters for this
        operation.
Source code in zenml/integrations/azure/flavors/azureml.py
class AzureMLComputeSettings(BaseSettings):
    """Settings for the AzureML compute.

    These settings adjust the compute resources that will be used by the
    pipeline execution.

    There are three possible use cases for this implementation:

        1. Serverless compute (default behavior):
            - The `mode` is set to `serverless` (default behavior).
            - All the other parameters become irrelevant and will throw a
            warning if set.

        2. Compute instance:
            - The `mode` is set to `compute-instance`.
            - In this case, users have to provide a `compute-name`.
                - If a compute instance exists with this name, this instance
                will be used and all the other parameters become irrelevant
                and will throw a warning if set.
                - If a compute instance does not already exist, ZenML will
                create it. You can use the parameters `compute_size` and
                `idle_type_before_shutdown_minutes` for this operation.

        3. Compute cluster:
            - The `mode` is set to `compute-cluster`.
            - In this case, users have to provide a `compute-name`.
                - If a compute cluster exists with this name, this instance
                will be used and all the other parameters become irrelevant
                and will throw a warning if set.
                - If a compute cluster does not already exist, ZenML will
                create it. You can set the additional parameters for this
                operation.
    """

    # Mode for compute
    mode: AzureMLComputeTypes = AzureMLComputeTypes.SERVERLESS

    # Common Configuration for Compute Instances and Clusters
    compute_name: Optional[str] = None
    size: Optional[str] = None

    # Additional configuration for a Compute Instance
    idle_time_before_shutdown_minutes: Optional[int] = None

    # Additional configuration for a Compute Cluster
    idle_time_before_scaledown_down: Optional[int] = None
    location: Optional[str] = None
    min_instances: Optional[int] = None
    max_instances: Optional[int] = None
    tier: Optional[str] = None

    @model_validator(mode="after")
    def azureml_settings_validator(self) -> "AzureMLComputeSettings":
        """Checks whether the right configuration is set based on mode.

        Returns:
            the instance itself.

        Raises:
            AssertionError: if a name is not provided when working with
                instances and clusters.
        """
        viable_configuration_fields = {
            AzureMLComputeTypes.SERVERLESS: {"mode"},
            AzureMLComputeTypes.COMPUTE_INSTANCE: {
                "mode",
                "compute_name",
                "size",
                "idle_time_before_shutdown_minutes",
            },
            AzureMLComputeTypes.COMPUTE_CLUSTER: {
                "mode",
                "compute_name",
                "size",
                "idle_time_before_scaledown_down",
                "location",
                "min_instances",
                "max_instances",
                "tier",
            },
        }
        viable_fields = viable_configuration_fields[self.mode]

        for field in self.model_fields_set:
            if (
                field not in viable_fields
                and field in AzureMLComputeSettings.model_fields
            ):
                logger.warning(
                    f"In the {self.__class__.__name__} settings, the mode of "
                    f"operation is set to {self.mode}. In this mode, you can "
                    f"not configure the parameter '{field}'. This "
                    "configuration will be ignored."
                )

        if (
            self.mode == AzureMLComputeTypes.COMPUTE_INSTANCE
            or self.mode == AzureMLComputeTypes.COMPUTE_CLUSTER
        ):
            assert self.compute_name is not None, (
                "When you are working with compute instances and clusters, "
                "please define a name for the compute target."
            )

        return self
azureml_settings_validator(self)

Checks whether the right configuration is set based on mode.

Returns:

Type Description
AzureMLComputeSettings

the instance itself.

Exceptions:

Type Description
AssertionError

if a name is not provided when working with instances and clusters.

Source code in zenml/integrations/azure/flavors/azureml.py
@model_validator(mode="after")
def azureml_settings_validator(self) -> "AzureMLComputeSettings":
    """Checks whether the right configuration is set based on mode.

    Returns:
        the instance itself.

    Raises:
        AssertionError: if a name is not provided when working with
            instances and clusters.
    """
    viable_configuration_fields = {
        AzureMLComputeTypes.SERVERLESS: {"mode"},
        AzureMLComputeTypes.COMPUTE_INSTANCE: {
            "mode",
            "compute_name",
            "size",
            "idle_time_before_shutdown_minutes",
        },
        AzureMLComputeTypes.COMPUTE_CLUSTER: {
            "mode",
            "compute_name",
            "size",
            "idle_time_before_scaledown_down",
            "location",
            "min_instances",
            "max_instances",
            "tier",
        },
    }
    viable_fields = viable_configuration_fields[self.mode]

    for field in self.model_fields_set:
        if (
            field not in viable_fields
            and field in AzureMLComputeSettings.model_fields
        ):
            logger.warning(
                f"In the {self.__class__.__name__} settings, the mode of "
                f"operation is set to {self.mode}. In this mode, you can "
                f"not configure the parameter '{field}'. This "
                "configuration will be ignored."
            )

    if (
        self.mode == AzureMLComputeTypes.COMPUTE_INSTANCE
        or self.mode == AzureMLComputeTypes.COMPUTE_CLUSTER
    ):
        assert self.compute_name is not None, (
            "When you are working with compute instances and clusters, "
            "please define a name for the compute target."
        )

    return self
AzureMLComputeTypes (StrEnum)

Enum for different types of compute on AzureML.

Source code in zenml/integrations/azure/flavors/azureml.py
class AzureMLComputeTypes(StrEnum):
    """Enum for different types of compute on AzureML."""

    SERVERLESS = "serverless"
    COMPUTE_INSTANCE = "compute-instance"
    COMPUTE_CLUSTER = "compute-cluster"

azureml_orchestrator_flavor

Implementation of the AzureML Orchestrator flavor.

AzureMLOrchestratorConfig (BaseOrchestratorConfig, AzureMLOrchestratorSettings)

Configuration for the AzureML orchestrator.

Source code in zenml/integrations/azure/flavors/azureml_orchestrator_flavor.py
class AzureMLOrchestratorConfig(
    BaseOrchestratorConfig, AzureMLOrchestratorSettings
):
    """Configuration for the AzureML orchestrator."""

    subscription_id: str = Field(
        description="Subscription ID that AzureML is running on."
    )
    resource_group: str = Field(
        description="Name of the resource group that AzureML is running on.",
    )
    workspace: str = Field(
        description="Name of the workspace that AzureML is running on."
    )

    @property
    def is_remote(self) -> bool:
        """Checks if this stack component is running remotely.

        This designation is used to determine if the stack component can be
        used with a local ZenML database or if it requires a remote ZenML
        server.

        Returns:
            True if this config is for a remote component, False otherwise.
        """
        return True

    @property
    def is_synchronous(self) -> bool:
        """Whether the orchestrator runs synchronously or not.

        Returns:
            Whether the orchestrator runs synchronously or not.
        """
        return self.synchronous

    @property
    def is_schedulable(self) -> bool:
        """Whether the orchestrator is schedulable or not.

        Returns:
            Whether the orchestrator is schedulable or not.
        """
        return True
is_remote: bool property readonly

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

is_schedulable: bool property readonly

Whether the orchestrator is schedulable or not.

Returns:

Type Description
bool

Whether the orchestrator is schedulable or not.

is_synchronous: bool property readonly

Whether the orchestrator runs synchronously or not.

Returns:

Type Description
bool

Whether the orchestrator runs synchronously or not.

AzureMLOrchestratorFlavor (BaseOrchestratorFlavor)

Flavor for the AzureML orchestrator.

Source code in zenml/integrations/azure/flavors/azureml_orchestrator_flavor.py
class AzureMLOrchestratorFlavor(BaseOrchestratorFlavor):
    """Flavor for the AzureML orchestrator."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return AZUREML_ORCHESTRATOR_FLAVOR

    @property
    def service_connector_requirements(
        self,
    ) -> Optional[ServiceConnectorRequirements]:
        """Service connector resource requirements for service connectors.

        Specifies resource requirements that are used to filter the available
        service connector types that are compatible with this flavor.

        Returns:
            Requirements for compatible service connectors, if a service
            connector is required for this flavor.
        """
        return ServiceConnectorRequirements(resource_type=AZURE_RESOURCE_TYPE)

    @property
    def docs_url(self) -> Optional[str]:
        """A URL to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A URL to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A URL to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/azureml.png"

    @property
    def config_class(self) -> Type[AzureMLOrchestratorConfig]:
        """Returns AzureMLOrchestratorConfig config class.

        Returns:
            The config class.
        """
        return AzureMLOrchestratorConfig

    @property
    def implementation_class(self) -> Type["AzureMLOrchestrator"]:
        """Implementation class.

        Returns:
            The implementation class.
        """
        from zenml.integrations.azure.orchestrators import AzureMLOrchestrator

        return AzureMLOrchestrator
config_class: Type[zenml.integrations.azure.flavors.azureml_orchestrator_flavor.AzureMLOrchestratorConfig] property readonly

Returns AzureMLOrchestratorConfig config class.

Returns:

Type Description
Type[zenml.integrations.azure.flavors.azureml_orchestrator_flavor.AzureMLOrchestratorConfig]

The config class.

docs_url: Optional[str] property readonly

A URL to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[AzureMLOrchestrator] property readonly

Implementation class.

Returns:

Type Description
Type[AzureMLOrchestrator]

The implementation class.

logo_url: str property readonly

A URL to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A URL to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements] property readonly

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service connector is required for this flavor.

AzureMLOrchestratorSettings (AzureMLComputeSettings)

Settings for the AzureML orchestrator.

Source code in zenml/integrations/azure/flavors/azureml_orchestrator_flavor.py
class AzureMLOrchestratorSettings(AzureMLComputeSettings):
    """Settings for the AzureML orchestrator."""

    synchronous: bool = Field(
        default=True,
        description="Whether the orchestrator runs synchronously or not.",
    )

azureml_step_operator_flavor

AzureML step operator flavor.

AzureMLStepOperatorConfig (BaseStepOperatorConfig, AzureMLStepOperatorSettings)

Config for the AzureML step operator.

Attributes:

Name Type Description
subscription_id str

The Azure account's subscription ID

resource_group str

The resource group to which the AzureML workspace is deployed.

workspace_name str

The name of the AzureML Workspace.

tenant_id Optional[str]

The Azure Tenant ID.

service_principal_id Optional[str]

The ID for the service principal that is created to allow apps to access secure resources.

service_principal_password Optional[str]

Password for the service principal.

Source code in zenml/integrations/azure/flavors/azureml_step_operator_flavor.py
class AzureMLStepOperatorConfig(
    BaseStepOperatorConfig, AzureMLStepOperatorSettings
):
    """Config for the AzureML step operator.

    Attributes:
        subscription_id: The Azure account's subscription ID
        resource_group: The resource group to which the AzureML workspace
            is deployed.
        workspace_name: The name of the AzureML Workspace.
        tenant_id: The Azure Tenant ID.
        service_principal_id: The ID for the service principal that is created
            to allow apps to access secure resources.
        service_principal_password: Password for the service principal.
    """

    subscription_id: str = Field(
        description="Subscription ID that AzureML is running on."
    )
    resource_group: str = Field(
        description="Name of the resource group that AzureML is running on.",
    )
    workspace_name: str = Field(
        description="Name of the workspace that AzureML is running on."
    )

    # Service principal authentication
    # https://docs.microsoft.com/en-us/azure/machine-learning/how-to-setup-authentication#configure-a-service-principal
    tenant_id: Optional[str] = SecretField(default=None)
    service_principal_id: Optional[str] = SecretField(default=None)
    service_principal_password: Optional[str] = SecretField(default=None)

    @property
    def is_remote(self) -> bool:
        """Checks if this stack component is running remotely.

        This designation is used to determine if the stack component can be
        used with a local ZenML database or if it requires a remote ZenML
        server.

        Returns:
            True if this config is for a remote component, False otherwise.
        """
        return True
is_remote: bool property readonly

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

AzureMLStepOperatorFlavor (BaseStepOperatorFlavor)

Flavor for the AzureML step operator.

Source code in zenml/integrations/azure/flavors/azureml_step_operator_flavor.py
class AzureMLStepOperatorFlavor(BaseStepOperatorFlavor):
    """Flavor for the AzureML step operator."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return AZUREML_STEP_OPERATOR_FLAVOR

    @property
    def service_connector_requirements(
        self,
    ) -> Optional[ServiceConnectorRequirements]:
        """Service connector resource requirements for service connectors.

        Specifies resource requirements that are used to filter the available
        service connector types that are compatible with this flavor.

        Returns:
            Requirements for compatible service connectors, if a service
            connector is required for this flavor.
        """
        return ServiceConnectorRequirements(resource_type=AZURE_RESOURCE_TYPE)

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/step_operator/azureml.png"

    @property
    def config_class(self) -> Type[AzureMLStepOperatorConfig]:
        """Returns AzureMLStepOperatorConfig config class.

        Returns:
                The config class.
        """
        return AzureMLStepOperatorConfig

    @property
    def implementation_class(self) -> Type["AzureMLStepOperator"]:
        """Implementation class.

        Returns:
            The implementation class.
        """
        from zenml.integrations.azure.step_operators import AzureMLStepOperator

        return AzureMLStepOperator
config_class: Type[zenml.integrations.azure.flavors.azureml_step_operator_flavor.AzureMLStepOperatorConfig] property readonly

Returns AzureMLStepOperatorConfig config class.

Returns:

Type Description
Type[zenml.integrations.azure.flavors.azureml_step_operator_flavor.AzureMLStepOperatorConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[AzureMLStepOperator] property readonly

Implementation class.

Returns:

Type Description
Type[AzureMLStepOperator]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements] property readonly

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service connector is required for this flavor.

AzureMLStepOperatorSettings (AzureMLComputeSettings)

Settings for the AzureML step operator.

Attributes:

Name Type Description
compute_target_name Optional[str]

The name of the configured ComputeTarget. Deprecated in favor of compute_name.

Source code in zenml/integrations/azure/flavors/azureml_step_operator_flavor.py
class AzureMLStepOperatorSettings(AzureMLComputeSettings):
    """Settings for the AzureML step operator.

    Attributes:
        compute_target_name: The name of the configured ComputeTarget.
            Deprecated in favor of `compute_name`.
    """

    compute_target_name: Optional[str] = Field(
        default=None,
        description="Name of the configured ComputeTarget. Deprecated in favor "
        "of `compute_name`.",
    )

    @model_validator(mode="before")
    @classmethod
    @before_validator_handler
    def _migrate_compute_name(cls, data: Dict[str, Any]) -> Dict[str, Any]:
        """Backward compatibility for compute_target_name.

        Args:
            data: The model data.

        Returns:
            The migrated data.
        """
        if (
            "compute_target_name" in data
            and "compute_name" not in data
            and "mode" not in data
        ):
            data["compute_name"] = data.pop("compute_target_name")
            data["mode"] = AzureMLComputeTypes.COMPUTE_INSTANCE

        return data

orchestrators special

AzureML orchestrator.

azureml_orchestrator

Implementation of the AzureML Orchestrator.

AzureMLOrchestrator (ContainerizedOrchestrator)

Orchestrator responsible for running pipelines on AzureML.

Source code in zenml/integrations/azure/orchestrators/azureml_orchestrator.py
class AzureMLOrchestrator(ContainerizedOrchestrator):
    """Orchestrator responsible for running pipelines on AzureML."""

    @property
    def config(self) -> AzureMLOrchestratorConfig:
        """Returns the `AzureMLOrchestratorConfig` config.

        Returns:
            The configuration.
        """
        return cast(AzureMLOrchestratorConfig, self._config)

    @property
    def settings_class(self) -> Optional[Type["BaseSettings"]]:
        """Settings class for the AzureML orchestrator.

        Returns:
            The settings class.
        """
        return AzureMLOrchestratorSettings

    @property
    def validator(self) -> Optional[StackValidator]:
        """Validates the stack.

        In the remote case, checks that the stack contains a container registry,
        image builder and only remote components.

        Returns:
            A `StackValidator` instance.
        """

        def _validate_remote_components(
            stack: "Stack",
        ) -> Tuple[bool, str]:
            for component in stack.components.values():
                if not component.config.is_local:
                    continue

                return False, (
                    f"The AzureML orchestrator runs pipelines remotely, "
                    f"but the '{component.name}' {component.type.value} is "
                    "a local stack component and will not be available in "
                    "the AzureML step.\nPlease ensure that you always "
                    "use non-local stack components with the AzureML "
                    "orchestrator."
                )

            return True, ""

        return StackValidator(
            required_components={
                StackComponentType.CONTAINER_REGISTRY,
                StackComponentType.IMAGE_BUILDER,
            },
            custom_validation_function=_validate_remote_components,
        )

    def get_orchestrator_run_id(self) -> str:
        """Returns the run id of the active orchestrator run.

        Important: This needs to be a unique ID and return the same value for
        all steps of a pipeline run.

        Returns:
            The orchestrator run id.

        Raises:
            RuntimeError: If the run id cannot be read from the environment.
        """
        try:
            return os.environ[ENV_ZENML_AZUREML_RUN_ID]
        except KeyError:
            raise RuntimeError(
                "Unable to read run id from environment variable "
                f"{ENV_ZENML_AZUREML_RUN_ID}."
            )

    @staticmethod
    def _create_command_component(
        step: Step,
        step_name: str,
        env_name: str,
        image: str,
        command: List[str],
        arguments: List[str],
    ) -> CommandComponent:
        """Creates a CommandComponent to run on AzureML Pipelines.

        Args:
            step: The step definition in ZenML.
            step_name: The name of the step.
            env_name: The name of the environment.
            image: The image to use in the environment
            command: The command to execute the entrypoint with.
            arguments: The arguments to pass into the command.

        Returns:
            the generated AzureML CommandComponent.
        """
        env = Environment(name=env_name, image=image)

        outputs = {"completed": Output(type="uri_file")}

        inputs = {}
        if step.spec.upstream_steps:
            inputs = {
                f"{upstream_step}": Input(type="uri_file")
                for upstream_step in step.spec.upstream_steps
            }

        return CommandComponent(
            name=step_name,
            display_name=step_name,
            description=f"AzureML CommandComponent for {step_name}.",
            inputs=inputs,
            outputs=outputs,
            environment=env,
            command=" ".join(command + arguments),
        )

    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeploymentResponse",
        stack: "Stack",
        environment: Dict[str, str],
    ) -> Iterator[Dict[str, MetadataType]]:
        """Prepares or runs a pipeline on AzureML.

        Args:
            deployment: The deployment to prepare or run.
            stack: The stack to run on.
            environment: Environment variables to set in the orchestration
                environment.

        Raises:
            RuntimeError: If the creation of the schedule fails.

        Yields:
            A dictionary of metadata related to the pipeline run.
        """
        # Authentication
        if connector := self.get_connector():
            credentials = connector.connect()
        else:
            credentials = DefaultAzureCredential()

        # Settings
        settings = cast(
            AzureMLOrchestratorSettings,
            self.get_settings(deployment),
        )

        # Client creation
        ml_client = MLClient(
            credential=credentials,
            subscription_id=self.config.subscription_id,
            resource_group_name=self.config.resource_group,
            workspace_name=self.config.workspace,
        )

        # Create components
        components = {}
        for step_name, step in deployment.step_configurations.items():
            # Get the image for each step
            image = self.get_image(deployment=deployment, step_name=step_name)

            # Get the command and arguments
            command = AzureMLEntrypointConfiguration.get_entrypoint_command()
            arguments = (
                AzureMLEntrypointConfiguration.get_entrypoint_arguments(
                    step_name=step_name,
                    deployment_id=deployment.id,
                    zenml_env_variables=b64_encode(json.dumps(environment)),
                )
            )

            # Generate an AzureML CommandComponent
            components[step_name] = self._create_command_component(
                step=step,
                step_name=step_name,
                env_name=deployment.pipeline_configuration.name,
                image=image,
                command=command,
                arguments=arguments,
            )

        # Pipeline definition
        pipeline_args = dict()
        run_name = get_orchestrator_run_name(
            pipeline_name=deployment.pipeline_configuration.name
        )
        pipeline_args["name"] = run_name

        if compute_target := create_or_get_compute(
            ml_client, settings, default_compute_name=f"zenml_{self.id}"
        ):
            pipeline_args["compute"] = compute_target

        @pipeline(force_rerun=True, **pipeline_args)  # type: ignore[call-overload, misc]
        def azureml_pipeline() -> None:
            """Create an AzureML pipeline."""
            # Here we have to track the inputs and outputs so that we can bind
            # the components to each other to execute them in a specific order.
            component_outputs: Dict[str, Any] = {}
            for component_name, component in components.items():
                # Inputs
                component_inputs = {}
                if component.inputs:
                    component_inputs.update(
                        {i: component_outputs[i] for i in component.inputs}
                    )

                # Job
                component_job = component(**component_inputs)

                # Outputs
                if component_job.outputs:
                    component_outputs[component_name] = (
                        component_job.outputs.completed
                    )

        # Create and execute the pipeline job
        pipeline_job = azureml_pipeline()

        if settings.mode == AzureMLComputeTypes.SERVERLESS:
            pipeline_job.settings.default_compute = "serverless"

        # Scheduling
        if schedule := deployment.schedule:
            try:
                schedule_trigger: Optional[
                    Union[CronTrigger, RecurrenceTrigger]
                ] = None

                start_time = None
                if schedule.start_time is not None:
                    start_time = schedule.start_time.isoformat()

                end_time = None
                if schedule.end_time is not None:
                    end_time = schedule.end_time.isoformat()

                if schedule.cron_expression:
                    # If we are working with a cron expression
                    schedule_trigger = CronTrigger(
                        expression=schedule.cron_expression,
                        start_time=start_time,
                        end_time=end_time,
                        time_zone=TimeZone.UTC,
                    )

                elif schedule.interval_second:
                    # If we are working with intervals
                    interval = schedule.interval_second.total_seconds()

                    if interval % 60 != 0:
                        logger.warning(
                            "The ZenML AzureML orchestrator only works with "
                            "time intervals defined over minutes. Will "
                            f"use a schedule over {int(interval // 60)}."
                        )

                    if interval < 60:
                        raise RuntimeError(
                            "Can not create a schedule with an interval less "
                            "than 60 secs."
                        )

                    frequency = "minute"
                    interval = int(interval // 60)

                    schedule_trigger = RecurrenceTrigger(
                        frequency=frequency,
                        interval=interval,
                        start_time=start_time,
                        end_time=end_time,
                        time_zone=TimeZone.UTC,
                    )

                if schedule_trigger:
                    # Create and execute the job schedule
                    job_schedule = JobSchedule(
                        name=run_name,
                        trigger=schedule_trigger,
                        create_job=pipeline_job,
                    )
                    ml_client.schedules.begin_create_or_update(
                        job_schedule
                    ).result()
                    logger.info(
                        f"Scheduled pipeline '{run_name}' with recurrence "
                        "or cron expression."
                    )
                else:
                    raise RuntimeError(
                        "No valid scheduling configuration found for "
                        f"pipeline '{run_name}'."
                    )

            except (HttpResponseError, ResourceExistsError) as e:
                raise RuntimeError(
                    "Failed to create schedule for the pipeline "
                    f"'{run_name}': {str(e)}"
                )

        else:
            job = ml_client.jobs.create_or_update(pipeline_job)
            logger.info(f"Pipeline {run_name} has been started.")

            # Yield metadata based on the generated job object
            yield from self.compute_metadata(job)

            assert job.services is not None
            assert job.name is not None

            logger.info(
                f"Pipeline {run_name} is running. "
                "You can view the pipeline in the AzureML portal at "
                f"{job.services['Studio'].endpoint}"
            )

            if settings.synchronous:
                logger.info("Waiting for pipeline to finish...")
                ml_client.jobs.stream(job.name)

    def get_pipeline_run_metadata(
        self, run_id: UUID
    ) -> Dict[str, "MetadataType"]:
        """Get general component-specific metadata for a pipeline run.

        Args:
            run_id: The ID of the pipeline run.

        Returns:
            A dictionary of metadata.
        """
        try:
            if connector := self.get_connector():
                credentials = connector.connect()
            else:
                credentials = DefaultAzureCredential()

            ml_client = MLClient(
                credential=credentials,
                subscription_id=self.config.subscription_id,
                resource_group_name=self.config.resource_group,
                workspace_name=self.config.workspace,
            )

            azureml_root_run_id = os.environ[ENV_ZENML_AZUREML_RUN_ID]
            azureml_job = ml_client.jobs.get(azureml_root_run_id)

            return {
                METADATA_ORCHESTRATOR_URL: Uri(azureml_job.studio_url),
            }
        except Exception as e:
            logger.warning(
                f"Failed to fetch the Studio URL of the AzureML pipeline "
                f"job: {e}"
            )
            return {}

    def fetch_status(self, run: "PipelineRunResponse") -> ExecutionStatus:
        """Refreshes the status of a specific pipeline run.

        Args:
            run: The run that was executed by this orchestrator.

        Returns:
            the actual status of the pipeline execution.

        Raises:
            AssertionError: If the run was not executed by to this orchestrator.
            ValueError: If it fetches an unknown state or if we can not fetch
                the orchestrator run ID.
        """
        # Make sure that the stack exists and is accessible
        if run.stack is None:
            raise ValueError(
                "The stack that the run was executed on is not available "
                "anymore."
            )

        # Make sure that the run belongs to this orchestrator
        assert (
            self.id
            == run.stack.components[StackComponentType.ORCHESTRATOR][0].id
        )

        # Initialize the AzureML client
        if connector := self.get_connector():
            credentials = connector.connect()
        else:
            credentials = DefaultAzureCredential()

        ml_client = MLClient(
            credential=credentials,
            subscription_id=self.config.subscription_id,
            resource_group_name=self.config.resource_group,
            workspace_name=self.config.workspace,
        )

        # Fetch the status of the PipelineJob
        if METADATA_ORCHESTRATOR_RUN_ID in run.run_metadata:
            run_id = run.run_metadata[METADATA_ORCHESTRATOR_RUN_ID]
        elif run.orchestrator_run_id is not None:
            run_id = run.orchestrator_run_id
        else:
            raise ValueError(
                "Can not find the orchestrator run ID, thus can not fetch "
                "the status."
            )
        status = ml_client.jobs.get(run_id).status

        # Map the potential outputs to ZenML ExecutionStatus. Potential values:
        # https://learn.microsoft.com/en-us/python/api/azure-ai-ml/azure.ai.ml.entities.pipelinejob?view=azure-python#azure-ai-ml-entities-pipelinejob-status
        if status in [
            "NotStarted",
            "Starting",
            "Provisioning",
            "Preparing",
            "Queued",
        ]:
            return ExecutionStatus.INITIALIZING
        elif status in ["Running", "Finalizing"]:
            return ExecutionStatus.RUNNING
        elif status in [
            "CancelRequested",
            "Failed",
            "Canceled",
            "NotResponding",
        ]:
            return ExecutionStatus.FAILED
        elif status in ["Completed"]:
            return ExecutionStatus.COMPLETED
        else:
            raise ValueError("Unknown status for the pipeline job.")

    def compute_metadata(self, job: Any) -> Iterator[Dict[str, MetadataType]]:
        """Generate run metadata based on the generated AzureML PipelineJob.

        Args:
            job: The corresponding PipelineJob object.

        Yields:
            A dictionary of metadata related to the pipeline run.
        """
        # Metadata
        metadata: Dict[str, MetadataType] = {}

        # Orchestrator Run ID
        if run_id := self._compute_orchestrator_run_id(job):
            metadata[METADATA_ORCHESTRATOR_RUN_ID] = run_id

        # URL to the AzureML's pipeline view
        if orchestrator_url := self._compute_orchestrator_url(job):
            metadata[METADATA_ORCHESTRATOR_URL] = Uri(orchestrator_url)

        yield metadata

    @staticmethod
    def _compute_orchestrator_url(job: Any) -> Optional[str]:
        """Generate the Orchestrator Dashboard URL upon pipeline execution.

        Args:
            job: The corresponding PipelineJob object.

        Returns:
             the URL to the dashboard view in AzureML.
        """
        try:
            if job.studio_url:
                return str(job.studio_url)

            return None

        except Exception as e:
            logger.warning(
                f"There was an issue while extracting the pipeline url: {e}"
            )
            return None

    @staticmethod
    def _compute_orchestrator_run_id(job: Any) -> Optional[str]:
        """Generate the Orchestrator Dashboard URL upon pipeline execution.

        Args:
            job: The corresponding PipelineJob object.

        Returns:
             the URL to the dashboard view in AzureML.
        """
        try:
            if job.name:
                return str(job.name)

            return None

        except Exception as e:
            logger.warning(
                f"There was an issue while extracting the pipeline run ID: {e}"
            )
            return None
config: AzureMLOrchestratorConfig property readonly

Returns the AzureMLOrchestratorConfig config.

Returns:

Type Description
AzureMLOrchestratorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property readonly

Settings class for the AzureML orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[zenml.stack.stack_validator.StackValidator] property readonly

Validates the stack.

In the remote case, checks that the stack contains a container registry, image builder and only remote components.

Returns:

Type Description
Optional[zenml.stack.stack_validator.StackValidator]

A StackValidator instance.

compute_metadata(self, job)

Generate run metadata based on the generated AzureML PipelineJob.

Parameters:

Name Type Description Default
job Any

The corresponding PipelineJob object.

required

Yields:

Type Description
Iterator[Dict[str, Union[str, int, float, bool, Dict[Any, Any], List[Any], Set[Any], Tuple[Any, ...], zenml.metadata.metadata_types.Uri, zenml.metadata.metadata_types.Path, zenml.metadata.metadata_types.DType, zenml.metadata.metadata_types.StorageSize]]]

A dictionary of metadata related to the pipeline run.

Source code in zenml/integrations/azure/orchestrators/azureml_orchestrator.py
def compute_metadata(self, job: Any) -> Iterator[Dict[str, MetadataType]]:
    """Generate run metadata based on the generated AzureML PipelineJob.

    Args:
        job: The corresponding PipelineJob object.

    Yields:
        A dictionary of metadata related to the pipeline run.
    """
    # Metadata
    metadata: Dict[str, MetadataType] = {}

    # Orchestrator Run ID
    if run_id := self._compute_orchestrator_run_id(job):
        metadata[METADATA_ORCHESTRATOR_RUN_ID] = run_id

    # URL to the AzureML's pipeline view
    if orchestrator_url := self._compute_orchestrator_url(job):
        metadata[METADATA_ORCHESTRATOR_URL] = Uri(orchestrator_url)

    yield metadata
fetch_status(self, run)

Refreshes the status of a specific pipeline run.

Parameters:

Name Type Description Default
run PipelineRunResponse

The run that was executed by this orchestrator.

required

Returns:

Type Description
ExecutionStatus

the actual status of the pipeline execution.

Exceptions:

Type Description
AssertionError

If the run was not executed by to this orchestrator.

ValueError

If it fetches an unknown state or if we can not fetch the orchestrator run ID.

Source code in zenml/integrations/azure/orchestrators/azureml_orchestrator.py
def fetch_status(self, run: "PipelineRunResponse") -> ExecutionStatus:
    """Refreshes the status of a specific pipeline run.

    Args:
        run: The run that was executed by this orchestrator.

    Returns:
        the actual status of the pipeline execution.

    Raises:
        AssertionError: If the run was not executed by to this orchestrator.
        ValueError: If it fetches an unknown state or if we can not fetch
            the orchestrator run ID.
    """
    # Make sure that the stack exists and is accessible
    if run.stack is None:
        raise ValueError(
            "The stack that the run was executed on is not available "
            "anymore."
        )

    # Make sure that the run belongs to this orchestrator
    assert (
        self.id
        == run.stack.components[StackComponentType.ORCHESTRATOR][0].id
    )

    # Initialize the AzureML client
    if connector := self.get_connector():
        credentials = connector.connect()
    else:
        credentials = DefaultAzureCredential()

    ml_client = MLClient(
        credential=credentials,
        subscription_id=self.config.subscription_id,
        resource_group_name=self.config.resource_group,
        workspace_name=self.config.workspace,
    )

    # Fetch the status of the PipelineJob
    if METADATA_ORCHESTRATOR_RUN_ID in run.run_metadata:
        run_id = run.run_metadata[METADATA_ORCHESTRATOR_RUN_ID]
    elif run.orchestrator_run_id is not None:
        run_id = run.orchestrator_run_id
    else:
        raise ValueError(
            "Can not find the orchestrator run ID, thus can not fetch "
            "the status."
        )
    status = ml_client.jobs.get(run_id).status

    # Map the potential outputs to ZenML ExecutionStatus. Potential values:
    # https://learn.microsoft.com/en-us/python/api/azure-ai-ml/azure.ai.ml.entities.pipelinejob?view=azure-python#azure-ai-ml-entities-pipelinejob-status
    if status in [
        "NotStarted",
        "Starting",
        "Provisioning",
        "Preparing",
        "Queued",
    ]:
        return ExecutionStatus.INITIALIZING
    elif status in ["Running", "Finalizing"]:
        return ExecutionStatus.RUNNING
    elif status in [
        "CancelRequested",
        "Failed",
        "Canceled",
        "NotResponding",
    ]:
        return ExecutionStatus.FAILED
    elif status in ["Completed"]:
        return ExecutionStatus.COMPLETED
    else:
        raise ValueError("Unknown status for the pipeline job.")
get_orchestrator_run_id(self)

Returns the run id of the active orchestrator run.

Important: This needs to be a unique ID and return the same value for all steps of a pipeline run.

Returns:

Type Description
str

The orchestrator run id.

Exceptions:

Type Description
RuntimeError

If the run id cannot be read from the environment.

Source code in zenml/integrations/azure/orchestrators/azureml_orchestrator.py
def get_orchestrator_run_id(self) -> str:
    """Returns the run id of the active orchestrator run.

    Important: This needs to be a unique ID and return the same value for
    all steps of a pipeline run.

    Returns:
        The orchestrator run id.

    Raises:
        RuntimeError: If the run id cannot be read from the environment.
    """
    try:
        return os.environ[ENV_ZENML_AZUREML_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_AZUREML_RUN_ID}."
        )
get_pipeline_run_metadata(self, run_id)

Get general component-specific metadata for a pipeline run.

Parameters:

Name Type Description Default
run_id UUID

The ID of the pipeline run.

required

Returns:

Type Description
Dict[str, MetadataType]

A dictionary of metadata.

Source code in zenml/integrations/azure/orchestrators/azureml_orchestrator.py
def get_pipeline_run_metadata(
    self, run_id: UUID
) -> Dict[str, "MetadataType"]:
    """Get general component-specific metadata for a pipeline run.

    Args:
        run_id: The ID of the pipeline run.

    Returns:
        A dictionary of metadata.
    """
    try:
        if connector := self.get_connector():
            credentials = connector.connect()
        else:
            credentials = DefaultAzureCredential()

        ml_client = MLClient(
            credential=credentials,
            subscription_id=self.config.subscription_id,
            resource_group_name=self.config.resource_group,
            workspace_name=self.config.workspace,
        )

        azureml_root_run_id = os.environ[ENV_ZENML_AZUREML_RUN_ID]
        azureml_job = ml_client.jobs.get(azureml_root_run_id)

        return {
            METADATA_ORCHESTRATOR_URL: Uri(azureml_job.studio_url),
        }
    except Exception as e:
        logger.warning(
            f"Failed to fetch the Studio URL of the AzureML pipeline "
            f"job: {e}"
        )
        return {}
prepare_or_run_pipeline(self, deployment, stack, environment)

Prepares or runs a pipeline on AzureML.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The deployment to prepare or run.

required
stack Stack

The stack to run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required

Exceptions:

Type Description
RuntimeError

If the creation of the schedule fails.

Yields:

Type Description
Iterator[Dict[str, Union[str, int, float, bool, Dict[Any, Any], List[Any], Set[Any], Tuple[Any, ...], zenml.metadata.metadata_types.Uri, zenml.metadata.metadata_types.Path, zenml.metadata.metadata_types.DType, zenml.metadata.metadata_types.StorageSize]]]

A dictionary of metadata related to the pipeline run.

Source code in zenml/integrations/azure/orchestrators/azureml_orchestrator.py
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
) -> Iterator[Dict[str, MetadataType]]:
    """Prepares or runs a pipeline on AzureML.

    Args:
        deployment: The deployment to prepare or run.
        stack: The stack to run on.
        environment: Environment variables to set in the orchestration
            environment.

    Raises:
        RuntimeError: If the creation of the schedule fails.

    Yields:
        A dictionary of metadata related to the pipeline run.
    """
    # Authentication
    if connector := self.get_connector():
        credentials = connector.connect()
    else:
        credentials = DefaultAzureCredential()

    # Settings
    settings = cast(
        AzureMLOrchestratorSettings,
        self.get_settings(deployment),
    )

    # Client creation
    ml_client = MLClient(
        credential=credentials,
        subscription_id=self.config.subscription_id,
        resource_group_name=self.config.resource_group,
        workspace_name=self.config.workspace,
    )

    # Create components
    components = {}
    for step_name, step in deployment.step_configurations.items():
        # Get the image for each step
        image = self.get_image(deployment=deployment, step_name=step_name)

        # Get the command and arguments
        command = AzureMLEntrypointConfiguration.get_entrypoint_command()
        arguments = (
            AzureMLEntrypointConfiguration.get_entrypoint_arguments(
                step_name=step_name,
                deployment_id=deployment.id,
                zenml_env_variables=b64_encode(json.dumps(environment)),
            )
        )

        # Generate an AzureML CommandComponent
        components[step_name] = self._create_command_component(
            step=step,
            step_name=step_name,
            env_name=deployment.pipeline_configuration.name,
            image=image,
            command=command,
            arguments=arguments,
        )

    # Pipeline definition
    pipeline_args = dict()
    run_name = get_orchestrator_run_name(
        pipeline_name=deployment.pipeline_configuration.name
    )
    pipeline_args["name"] = run_name

    if compute_target := create_or_get_compute(
        ml_client, settings, default_compute_name=f"zenml_{self.id}"
    ):
        pipeline_args["compute"] = compute_target

    @pipeline(force_rerun=True, **pipeline_args)  # type: ignore[call-overload, misc]
    def azureml_pipeline() -> None:
        """Create an AzureML pipeline."""
        # Here we have to track the inputs and outputs so that we can bind
        # the components to each other to execute them in a specific order.
        component_outputs: Dict[str, Any] = {}
        for component_name, component in components.items():
            # Inputs
            component_inputs = {}
            if component.inputs:
                component_inputs.update(
                    {i: component_outputs[i] for i in component.inputs}
                )

            # Job
            component_job = component(**component_inputs)

            # Outputs
            if component_job.outputs:
                component_outputs[component_name] = (
                    component_job.outputs.completed
                )

    # Create and execute the pipeline job
    pipeline_job = azureml_pipeline()

    if settings.mode == AzureMLComputeTypes.SERVERLESS:
        pipeline_job.settings.default_compute = "serverless"

    # Scheduling
    if schedule := deployment.schedule:
        try:
            schedule_trigger: Optional[
                Union[CronTrigger, RecurrenceTrigger]
            ] = None

            start_time = None
            if schedule.start_time is not None:
                start_time = schedule.start_time.isoformat()

            end_time = None
            if schedule.end_time is not None:
                end_time = schedule.end_time.isoformat()

            if schedule.cron_expression:
                # If we are working with a cron expression
                schedule_trigger = CronTrigger(
                    expression=schedule.cron_expression,
                    start_time=start_time,
                    end_time=end_time,
                    time_zone=TimeZone.UTC,
                )

            elif schedule.interval_second:
                # If we are working with intervals
                interval = schedule.interval_second.total_seconds()

                if interval % 60 != 0:
                    logger.warning(
                        "The ZenML AzureML orchestrator only works with "
                        "time intervals defined over minutes. Will "
                        f"use a schedule over {int(interval // 60)}."
                    )

                if interval < 60:
                    raise RuntimeError(
                        "Can not create a schedule with an interval less "
                        "than 60 secs."
                    )

                frequency = "minute"
                interval = int(interval // 60)

                schedule_trigger = RecurrenceTrigger(
                    frequency=frequency,
                    interval=interval,
                    start_time=start_time,
                    end_time=end_time,
                    time_zone=TimeZone.UTC,
                )

            if schedule_trigger:
                # Create and execute the job schedule
                job_schedule = JobSchedule(
                    name=run_name,
                    trigger=schedule_trigger,
                    create_job=pipeline_job,
                )
                ml_client.schedules.begin_create_or_update(
                    job_schedule
                ).result()
                logger.info(
                    f"Scheduled pipeline '{run_name}' with recurrence "
                    "or cron expression."
                )
            else:
                raise RuntimeError(
                    "No valid scheduling configuration found for "
                    f"pipeline '{run_name}'."
                )

        except (HttpResponseError, ResourceExistsError) as e:
            raise RuntimeError(
                "Failed to create schedule for the pipeline "
                f"'{run_name}': {str(e)}"
            )

    else:
        job = ml_client.jobs.create_or_update(pipeline_job)
        logger.info(f"Pipeline {run_name} has been started.")

        # Yield metadata based on the generated job object
        yield from self.compute_metadata(job)

        assert job.services is not None
        assert job.name is not None

        logger.info(
            f"Pipeline {run_name} is running. "
            "You can view the pipeline in the AzureML portal at "
            f"{job.services['Studio'].endpoint}"
        )

        if settings.synchronous:
            logger.info("Waiting for pipeline to finish...")
            ml_client.jobs.stream(job.name)

azureml_orchestrator_entrypoint_config

Entrypoint configuration for ZenML AzureML pipeline steps.

AzureMLEntrypointConfiguration (StepEntrypointConfiguration)

Entrypoint configuration for ZenML AzureML pipeline steps.

Source code in zenml/integrations/azure/orchestrators/azureml_orchestrator_entrypoint_config.py
class AzureMLEntrypointConfiguration(StepEntrypointConfiguration):
    """Entrypoint configuration for ZenML AzureML pipeline steps."""

    @classmethod
    def get_entrypoint_options(cls) -> Set[str]:
        """Gets all options required for running with this configuration.

        Returns:
            The superclass options as well as an option for the
            environmental variables.
        """
        return super().get_entrypoint_options() | {ZENML_ENV_VARIABLES}

    @classmethod
    def get_entrypoint_arguments(cls, **kwargs: Any) -> List[str]:
        """Gets all arguments that the entrypoint command should be called with.

        Args:
            **kwargs: Kwargs, can include the environmental variables.

        Returns:
            The superclass arguments as well as arguments for environmental
            variables.
        """
        return super().get_entrypoint_arguments(**kwargs) + [
            f"--{ZENML_ENV_VARIABLES}",
            kwargs[ZENML_ENV_VARIABLES],
        ]

    def _set_env_variables(self) -> None:
        """Sets the environmental variables before executing the step."""
        env_variables = json.loads(
            b64_decode(self.entrypoint_args[ZENML_ENV_VARIABLES])
        )
        os.environ.update(env_variables)

    def run(self) -> None:
        """Runs the step."""
        # Set the environmental variables first
        self._set_env_variables()

        # Azure automatically changes the working directory, we have to set it
        # back to /app before running the step.
        os.makedirs("/app", exist_ok=True)
        os.chdir("/app")

        # Run the step
        super().run()

        # Unfortunately, in AzureML's Python SDK v2, there is no native way
        # to execute steps/components in a specific sequence. In order to
        # establish the correct order, we are using dummy inputs and
        # outputs. However, these steps only execute if the inputs and outputs
        # actually exist. This is why we create a dummy file and write to it and
        # use it as the output of the steps.
        if completed := os.environ.get(AZURE_ML_OUTPUT_COMPLETED):
            os.makedirs(os.path.dirname(completed), exist_ok=True)
            with open(completed, "w") as f:
                f.write("Component completed!")
get_entrypoint_arguments(**kwargs) classmethod

Gets all arguments that the entrypoint command should be called with.

Parameters:

Name Type Description Default
**kwargs Any

Kwargs, can include the environmental variables.

{}

Returns:

Type Description
List[str]

The superclass arguments as well as arguments for environmental variables.

Source code in zenml/integrations/azure/orchestrators/azureml_orchestrator_entrypoint_config.py
@classmethod
def get_entrypoint_arguments(cls, **kwargs: Any) -> List[str]:
    """Gets all arguments that the entrypoint command should be called with.

    Args:
        **kwargs: Kwargs, can include the environmental variables.

    Returns:
        The superclass arguments as well as arguments for environmental
        variables.
    """
    return super().get_entrypoint_arguments(**kwargs) + [
        f"--{ZENML_ENV_VARIABLES}",
        kwargs[ZENML_ENV_VARIABLES],
    ]
get_entrypoint_options() classmethod

Gets all options required for running with this configuration.

Returns:

Type Description
Set[str]

The superclass options as well as an option for the environmental variables.

Source code in zenml/integrations/azure/orchestrators/azureml_orchestrator_entrypoint_config.py
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
    """Gets all options required for running with this configuration.

    Returns:
        The superclass options as well as an option for the
        environmental variables.
    """
    return super().get_entrypoint_options() | {ZENML_ENV_VARIABLES}
run(self)

Runs the step.

Source code in zenml/integrations/azure/orchestrators/azureml_orchestrator_entrypoint_config.py
def run(self) -> None:
    """Runs the step."""
    # Set the environmental variables first
    self._set_env_variables()

    # Azure automatically changes the working directory, we have to set it
    # back to /app before running the step.
    os.makedirs("/app", exist_ok=True)
    os.chdir("/app")

    # Run the step
    super().run()

    # Unfortunately, in AzureML's Python SDK v2, there is no native way
    # to execute steps/components in a specific sequence. In order to
    # establish the correct order, we are using dummy inputs and
    # outputs. However, these steps only execute if the inputs and outputs
    # actually exist. This is why we create a dummy file and write to it and
    # use it as the output of the steps.
    if completed := os.environ.get(AZURE_ML_OUTPUT_COMPLETED):
        os.makedirs(os.path.dirname(completed), exist_ok=True)
        with open(completed, "w") as f:
            f.write("Component completed!")

service_connectors special

Azure Service Connector.

azure_service_connector

Azure Service Connector.

AzureAccessToken (AuthenticationConfig)

Azure access token credentials.

Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureAccessToken(AuthenticationConfig):
    """Azure access token credentials."""

    token: PlainSerializedSecretStr = Field(
        title="Azure Access Token",
        description="The Azure access token to use for authentication",
    )
AzureAccessTokenConfig (AzureBaseConfig, AzureAccessToken)

Azure token configuration.

Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureAccessTokenConfig(AzureBaseConfig, AzureAccessToken):
    """Azure token configuration."""
AzureAuthenticationMethods (StrEnum)

Azure Authentication methods.

Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureAuthenticationMethods(StrEnum):
    """Azure Authentication methods."""

    IMPLICIT = "implicit"
    SERVICE_PRINCIPAL = "service-principal"
    ACCESS_TOKEN = "access-token"
AzureBaseConfig (AuthenticationConfig)

Azure base configuration.

Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureBaseConfig(AuthenticationConfig):
    """Azure base configuration."""

    subscription_id: Optional[UUID] = Field(
        default=None,
        title="Azure Subscription ID",
        description="The subscription ID of the Azure account. If not "
        "specified, ZenML will attempt to retrieve the subscription ID from "
        "Azure using the configured credentials.",
    )
    tenant_id: Optional[UUID] = Field(
        default=None,
        title="Azure Tenant ID",
        description="The tenant ID of the Azure account. If not specified, "
        "ZenML will attempt to retrieve the tenant from Azure using the "
        "configured credentials.",
    )
    resource_group: Optional[str] = Field(
        default=None,
        title="Azure Resource Group",
        description="A resource group may be used to restrict the scope of "
        "Azure resources like AKS clusters and ACR repositories. If not "
        "specified, ZenML will retrieve resources from all resource groups "
        "accessible with the configured credentials.",
    )
    storage_account: Optional[str] = Field(
        default=None,
        title="Azure Storage Account",
        description="The name of an Azure storage account may be used to "
        "restrict the scope of Azure Blob storage containers. If not "
        "specified, ZenML will retrieve blob containers from all storage "
        "accounts accessible with the configured credentials.",
    )
AzureClientConfig (AzureBaseConfig)

Azure client configuration.

Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureClientConfig(AzureBaseConfig):
    """Azure client configuration."""

    client_id: UUID = Field(
        title="Azure Client ID",
        description="The service principal's client ID",
    )
    tenant_id: UUID = Field(
        title="Azure Tenant ID",
        description="ID of the service principal's tenant. Also called its "
        "'directory' ID.",
    )
AzureClientSecret (AuthenticationConfig)

Azure client secret credentials.

Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureClientSecret(AuthenticationConfig):
    """Azure client secret credentials."""

    client_secret: PlainSerializedSecretStr = Field(
        title="Service principal client secret",
        description="The client secret of the service principal",
    )
AzureServiceConnector (ServiceConnector)

Azure service connector.

Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureServiceConnector(ServiceConnector):
    """Azure service connector."""

    config: AzureBaseConfig

    _subscription_id: Optional[str] = None
    _subscription_name: Optional[str] = None
    _tenant_id: Optional[str] = None
    _session_cache: Dict[
        str,
        Tuple[TokenCredential, Optional[datetime.datetime]],
    ] = {}

    @classmethod
    def _get_connector_type(cls) -> ServiceConnectorTypeModel:
        """Get the service connector type specification.

        Returns:
            The service connector type specification.
        """
        return AZURE_SERVICE_CONNECTOR_TYPE_SPEC

    @property
    def subscription(self) -> Tuple[str, str]:
        """Get the Azure subscription ID and name.

        Returns:
            The Azure subscription ID and name.

        Raises:
            AuthorizationException: If the Azure subscription could not be
                determined or doesn't match the configured subscription ID.
        """
        if self._subscription_id is None or self._subscription_name is None:
            logger.debug("Getting subscription from Azure...")
            try:
                credential, _ = self.get_azure_credential(self.auth_method)
                subscription_client = SubscriptionClient(credential)
                if self.config.subscription_id is not None:
                    subscription = subscription_client.subscriptions.get(
                        str(self.config.subscription_id)
                    )
                    self._subscription_name = subscription.display_name
                    self._subscription_id = subscription.subscription_id
                else:
                    subscriptions = list(
                        subscription_client.subscriptions.list()
                    )
                    if len(subscriptions) == 0:
                        raise AuthorizationException(
                            "no Azure subscriptions found for the configured "
                            "credentials."
                        )
                    if len(subscriptions) > 1:
                        raise AuthorizationException(
                            "multiple Azure subscriptions found for the "
                            "configured credentials. Please configure the "
                            "subscription ID explicitly in the connector."
                        )

                    subscription = subscriptions[0]
                    self._subscription_id = subscription.subscription_id
                    self._subscription_name = subscription.display_name
            except AzureError as e:
                raise AuthorizationException(
                    f"failed to fetch the Azure subscription: {e}"
                ) from e

        assert self._subscription_id is not None
        assert self._subscription_name is not None
        return self._subscription_id, self._subscription_name

    @property
    def tenant_id(self) -> str:
        """Get the Azure tenant ID.

        Returns:
            The Azure tenant ID.

        Raises:
            AuthorizationException: If the Azure tenant ID could not be
                determined or doesn't match the configured tenant ID.
        """
        if self._tenant_id is None:
            logger.debug("Getting tenant ID from Azure...")
            try:
                credential, _ = self.get_azure_credential(self.auth_method)
                subscription_client = SubscriptionClient(credential)
                tenants = subscription_client.tenants.list()

                if self.config.tenant_id is not None:
                    for tenant in tenants:
                        if str(tenant.tenant_id) == str(self.config.tenant_id):
                            self._tenant_id = str(tenant.tenant_id)
                            break
                    else:
                        raise AuthorizationException(
                            "the configured tenant ID is not associated with "
                            "the configured credentials."
                        )
                else:
                    tenants = list(tenants)
                    if len(tenants) == 0:
                        raise AuthorizationException(
                            "no Azure tenants found for the configured "
                            "credentials."
                        )
                    if len(tenants) > 1:
                        raise AuthorizationException(
                            "multiple Azure tenants found for the "
                            "configured credentials. Please configure the "
                            "tenant ID explicitly in the connector."
                        )

                    tenant = tenants[0]
                    self._tenant_id = tenant.tenant_id
            except AzureError as e:
                raise AuthorizationException(
                    f"failed to fetch the Azure tenant: {e}"
                ) from e
        assert self._tenant_id is not None
        return self._tenant_id

    def get_azure_credential(
        self,
        auth_method: str,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
    ) -> Tuple[TokenCredential, Optional[datetime.datetime]]:
        """Get an Azure credential for the specified resource.

        Args:
            auth_method: The authentication method to use.
            resource_type: The resource type to get a credential for.
            resource_id: The resource ID to get a credential for.

        Returns:
            An Azure credential for the specified resource and its expiration
            timestamp, if applicable.
        """
        # We maintain a cache of all sessions to avoid re-authenticating
        # multiple times for the same resource
        key = auth_method
        if key in self._session_cache:
            session, expires_at = self._session_cache[key]
            if expires_at is None:
                return session, None

            # Refresh expired sessions
            now = datetime.datetime.now(datetime.timezone.utc)
            expires_at = expires_at.replace(tzinfo=datetime.timezone.utc)

            # check if the token expires in the near future
            if expires_at > now + datetime.timedelta(
                minutes=AZURE_SESSION_EXPIRATION_BUFFER
            ):
                return session, expires_at

        logger.debug(
            f"Creating Azure credential for auth method '{auth_method}', "
            f"resource type '{resource_type}' and resource ID "
            f"'{resource_id}'..."
        )
        session, expires_at = self._authenticate(
            auth_method, resource_type, resource_id
        )
        self._session_cache[key] = (session, expires_at)
        return session, expires_at

    def _authenticate(
        self,
        auth_method: str,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
    ) -> Tuple[TokenCredential, Optional[datetime.datetime]]:
        """Authenticate to Azure and return a token credential.

        Args:
            auth_method: The authentication method to use.
            resource_type: The resource type to authenticate for.
            resource_id: The resource ID to authenticate for.

        Returns:
            An Azure token credential and the expiration time of the
            temporary credentials if applicable.

        Raises:
            NotImplementedError: If the authentication method is not supported.
            AuthorizationException: If the authentication failed.
        """
        cfg = self.config
        credential: TokenCredential
        if auth_method == AzureAuthenticationMethods.IMPLICIT:
            self._check_implicit_auth_method_allowed()

            try:
                credential = DefaultAzureCredential()
            except AzureError as e:
                raise AuthorizationException(
                    f"Failed to authenticate to Azure using implicit "
                    f"authentication. Please check your local Azure CLI "
                    f"configuration or managed identity setup: {e}"
                )

            return credential, None

        if auth_method == AzureAuthenticationMethods.SERVICE_PRINCIPAL:
            assert isinstance(cfg, AzureServicePrincipalConfig)
            try:
                credential = ClientSecretCredential(
                    tenant_id=str(cfg.tenant_id),
                    client_id=str(cfg.client_id),
                    client_secret=cfg.client_secret.get_secret_value(),
                )
            except AzureError as e:
                raise AuthorizationException(
                    f"Failed to authenticate to Azure using the provided "
                    f"service principal credentials. Please check your Azure "
                    f"configuration: {e}"
                )

            return credential, None

        if auth_method == AzureAuthenticationMethods.ACCESS_TOKEN:
            assert isinstance(cfg, AzureAccessTokenConfig)

            expires_at = self.expires_at
            assert expires_at is not None

            credential = ZenMLAzureTokenCredential(
                token=cfg.token.get_secret_value(),
                expires_at=expires_at,
            )

            return credential, expires_at

        raise NotImplementedError(
            f"Authentication method '{auth_method}' is not supported by "
            "the Azure connector."
        )

    def _parse_blob_container_resource_id(self, resource_id: str) -> str:
        """Validate and convert an Azure blob resource ID to a container name.

        The resource ID could mean different things:

        - Azure blob container URI: `{az|abfs}://{container-name}`
        - Azure blob container name: `{container-name}`

        This method extracts the container name from the provided resource ID.

        Args:
            resource_id: The resource ID to convert.

        Returns:
            The container name.

        Raises:
            ValueError: If the provided resource ID is not a valid Azure blob
                resource ID.
        """
        container_name: Optional[str] = None
        if re.match(
            r"^(az|abfs)://[a-z0-9](?!.*--)[a-z0-9-]{1,61}[a-z0-9](/.*)*$",
            resource_id,
        ):
            # The resource ID is an Azure blob container URI
            container_name = resource_id.split("/")[2]
        elif re.match(
            r"^[a-z0-9](?!.*--)[a-z0-9-]{1,61}[a-z0-9]$",
            resource_id,
        ):
            # The resource ID is the Azure blob container name
            container_name = resource_id
        else:
            raise ValueError(
                f"Invalid resource ID for an Azure blob storage container: "
                f"{resource_id}. "
                "Supported formats are:\n"
                "Azure blob container URI: {az|abfs}://<container-name>\n"
                "Azure blob container name: <container-name>"
            )

        return container_name

    def _parse_acr_resource_id(
        self,
        resource_id: str,
    ) -> str:
        """Validate and convert an ACR resource ID to an ACR registry name.

        The resource ID could mean different things:

        - ACR registry URI: `[https://]{registry-name}.azurecr.io`
        - ACR registry name: `{registry-name}`

        This method extracts the registry name from the provided resource ID.

        Args:
            resource_id: The resource ID to convert.

        Returns:
            The ACR registry name.

        Raises:
            ValueError: If the provided resource ID is not a valid ACR
                resource ID.
        """
        registry_name: Optional[str] = None
        if re.match(
            r"^(https?://)?[a-zA-Z0-9]+\.azurecr\.io(/.+)*$",
            resource_id,
        ):
            # The resource ID is an ACR registry URI
            registry_name = resource_id.split(".")[0].split("/")[-1]
        elif re.match(
            r"^[a-zA-Z0-9]+$",
            resource_id,
        ):
            # The resource ID is an ACR registry name
            registry_name = resource_id
        else:
            raise ValueError(
                f"Invalid resource ID for a ACR registry: {resource_id}. "
                f"Supported formats are:\n"
                "ACR registry URI: [https://]{registry-name}.azurecr.io\n"
                "ACR registry name: {registry-name}"
            )

        return registry_name

    def _parse_aks_resource_id(
        self, resource_id: str
    ) -> Tuple[Optional[str], str]:
        """Validate and convert an AKS resource ID to an AKS cluster name.

        The resource ID could mean different things:

        - resource group scoped AKS cluster name (canonical): `{resource-group}/{cluster-name}`
        - AKS cluster name: `{cluster-name}`

        This method extracts the resource group name and cluster name from the
        provided resource ID.

        Args:
            resource_id: The resource ID to convert.

        Returns:
            The Azure resource group and AKS cluster name.

        Raises:
            ValueError: If the provided resource ID is not a valid AKS cluster
                name.
        """
        resource_group: Optional[str] = self.config.resource_group
        if re.match(
            r"^[a-zA-Z0-9_.()-]+/[a-zA-Z0-9]+[a-zA-Z0-9_-]*[a-zA-Z0-9]+$",
            resource_id,
        ):
            # The resource ID is an AKS cluster name including the resource
            # group
            resource_group, cluster_name = resource_id.split("/")
        elif re.match(
            r"^[a-zA-Z0-9]+[a-zA-Z0-9_-]*[a-zA-Z0-9]+$",
            resource_id,
        ):
            # The resource ID is an AKS cluster name without the resource group
            cluster_name = resource_id
        else:
            raise ValueError(
                f"Invalid resource ID for a AKS cluster: {resource_id}. "
                f"Supported formats are:\n"
                "resource group scoped AKS cluster name (canonical): {resource-group}/{cluster-name}\n"
                "AKS cluster name: {cluster-name}"
            )

        if (
            self.config.resource_group
            and self.config.resource_group != resource_group
        ):
            raise ValueError(
                f"Invalid resource ID for an AKS cluster: {resource_id}. "
                f"The resource group '{resource_group}' does not match the "
                f"resource group configured in the connector: "
                f"'{self.config.resource_group}'."
            )

        return resource_group, cluster_name

    def _canonical_resource_id(
        self, resource_type: str, resource_id: str
    ) -> str:
        """Convert a resource ID to its canonical form.

        Args:
            resource_type: The resource type to canonicalize.
            resource_id: The resource ID to canonicalize.

        Returns:
            The canonical resource ID.
        """
        if resource_type == BLOB_RESOURCE_TYPE:
            container_name = self._parse_blob_container_resource_id(
                resource_id
            )
            return f"az://{container_name}"
        elif resource_type == KUBERNETES_CLUSTER_RESOURCE_TYPE:
            resource_group, cluster_name = self._parse_aks_resource_id(
                resource_id
            )
            if resource_group:
                return f"{resource_group}/{cluster_name}"
            return cluster_name
        elif resource_type == DOCKER_REGISTRY_RESOURCE_TYPE:
            registry_name = self._parse_acr_resource_id(
                resource_id,
            )
            return f"{registry_name}.azurecr.io"
        else:
            return resource_id

    def _get_default_resource_id(self, resource_type: str) -> str:
        """Get the default resource ID for a resource type.

        Args:
            resource_type: The type of the resource to get a default resource ID
                for. Only called with resource types that do not support
                multiple instances.

        Returns:
            The default resource ID for the resource type.

        Raises:
            RuntimeError: If the ECR registry ID (Azure account ID)
                cannot be retrieved from Azure because the connector is not
                authorized.
        """
        if resource_type == AZURE_RESOURCE_TYPE:
            _, subscription_name = self.subscription
            return subscription_name

        raise RuntimeError(
            f"Default resource ID for '{resource_type}' not available."
        )

    def _connect_to_resource(
        self,
        **kwargs: Any,
    ) -> Any:
        """Authenticate and connect to an Azure resource.

        Initialize and return a session or client object depending on the
        connector configuration:

        - initialize and return an Azure TokenCredential if the resource type
        is a generic Azure resource
        - initialize and return an Azure BlobServiceClient for an Azure blob
        storage container resource type

        For the Docker and Kubernetes resource types, the connector does not
        support connecting to the resource directly. Instead, the connector
        supports generating a connector client object for the resource type
        in question.

        Args:
            kwargs: Additional implementation specific keyword arguments to pass
                to the session or client constructor.

        Returns:
            A TokenCredential for Azure generic resources and a
            BlobServiceClient client for an Azure blob storage container.

        Raises:
            NotImplementedError: If the connector instance does not support
                directly connecting to the indicated resource type.
        """
        resource_type = self.resource_type
        resource_id = self.resource_id

        assert resource_type is not None
        assert resource_id is not None

        # Regardless of the resource type, we must authenticate to Azure first
        # before we can connect to any Azure resource
        credential, _ = self.get_azure_credential(
            self.auth_method,
            resource_type=resource_type,
            resource_id=resource_id,
        )

        if resource_type == BLOB_RESOURCE_TYPE:
            container_name = self._parse_blob_container_resource_id(
                resource_id
            )

            containers = self._list_blob_containers(
                credential,
                container_name=container_name,
            )

            container_name, storage_account = containers.popitem()
            account_url = f"https://{storage_account}.blob.core.windows.net/"

            blob_client = BlobServiceClient(
                account_url=account_url,
                credential=credential,
            )

            return blob_client

        if resource_type == AZURE_RESOURCE_TYPE:
            return credential

        raise NotImplementedError(
            f"Connecting to {resource_type} resources is not directly "
            "supported by the Azure connector. Please call the "
            f"`get_connector_client` method to get a {resource_type} connector "
            "instance for the resource."
        )

    def _configure_local_client(
        self,
        **kwargs: Any,
    ) -> None:
        """Configure a local client to authenticate and connect to a resource.

        This method uses the connector's configuration to configure a local
        client or SDK installed on the localhost for the indicated resource.

        Args:
            kwargs: Additional implementation specific keyword arguments to use
                to configure the client.

        Raises:
            NotImplementedError: If the connector instance does not support
                local configuration for the configured resource type or
                authentication method.registry
            AuthorizationException: If the connector instance does not support
                local configuration for the configured authentication method.
        """
        resource_type = self.resource_type

        if resource_type in [AZURE_RESOURCE_TYPE, BLOB_RESOURCE_TYPE]:
            if (
                self.auth_method
                == AzureAuthenticationMethods.SERVICE_PRINCIPAL
            ):
                # Use the service principal credentials to configure the local
                # Azure CLI
                assert isinstance(self.config, AzureServicePrincipalConfig)

                command = [
                    "az",
                    "login",
                    "--service-principal",
                    "-u",
                    str(self.config.client_id),
                    "-p",
                    self.config.client_secret.get_secret_value(),
                    "--tenant",
                    str(self.config.tenant_id),
                ]

                try:
                    subprocess.run(command, check=True)
                except subprocess.CalledProcessError as e:
                    raise AuthorizationException(
                        f"Failed to update the local Azure CLI with the "
                        f"connector service principal credentials: {e}"
                    ) from e

                logger.info(
                    "Updated the local Azure CLI configuration with the "
                    "connector's service principal credentials."
                )

                return

            raise NotImplementedError(
                f"Local Azure client configuration for resource type "
                f"{resource_type} is only supported if the "
                f"'{AzureAuthenticationMethods.SERVICE_PRINCIPAL}' "
                f"authentication method is used."
            )

        raise NotImplementedError(
            f"Configuring the local client for {resource_type} resources is "
            "not directly supported by the Azure connector. Please call the "
            f"`get_connector_client` method to get a {resource_type} connector "
            "instance for the resource."
        )

    @classmethod
    def _auto_configure(
        cls,
        auth_method: Optional[str] = None,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
        resource_group: Optional[str] = None,
        storage_account: Optional[str] = None,
        **kwargs: Any,
    ) -> "AzureServiceConnector":
        """Auto-configure the connector.

        Instantiate an Azure connector with a configuration extracted from the
        authentication configuration available in the environment (e.g.
        environment variables or local Azure client/SDK configuration files).

        Args:
            auth_method: The particular authentication method to use. If not
                specified, the connector implementation must decide which
                authentication method to use or raise an exception.
            resource_type: The type of resource to configure.
            resource_id: The ID of the resource to configure. The implementation
                may choose to either require or ignore this parameter if it does
                not support or detect an resource type that supports multiple
                instances.
            resource_group: A resource group may be used to restrict the scope
                of Azure resources like AKS clusters and ACR repositories. If
                not specified, ZenML will retrieve resources from all resource
                groups accessible with the discovered credentials.
            storage_account: The name of an Azure storage account may be used to
                restrict the scope of Azure Blob storage containers. If not
                specified, ZenML will retrieve blob containers from all storage
                accounts accessible with the discovered credentials.
            kwargs: Additional implementation specific keyword arguments to use.

        Returns:
            An Azure connector instance configured with authentication
            credentials automatically extracted from the environment.

        Raises:
            NotImplementedError: If the connector implementation does not
                support auto-configuration for the specified authentication
                method.
            AuthorizationException: If no Azure credentials can be loaded from
                the environment.
        """
        auth_config: AzureBaseConfig
        expiration_seconds: Optional[int] = None
        expires_at: Optional[datetime.datetime] = None
        if auth_method == AzureAuthenticationMethods.IMPLICIT:
            auth_config = AzureBaseConfig(
                resource_group=resource_group,
                storage_account=storage_account,
            )
        else:
            # Initialize an Azure credential with the default configuration
            # loaded from the environment.
            try:
                credential = DefaultAzureCredential()
            except AzureError as e:
                raise AuthorizationException(
                    f"Failed to authenticate to Azure using implicit "
                    f"authentication. Please check your local Azure CLI "
                    f"configuration: {e}"
                )

            if (
                auth_method
                and auth_method != AzureAuthenticationMethods.ACCESS_TOKEN
            ):
                raise NotImplementedError(
                    f"The specified authentication method '{auth_method}' "
                    "could not be used to auto-configure the connector. "
                )
            auth_method = AzureAuthenticationMethods.ACCESS_TOKEN

            if resource_type == BLOB_RESOURCE_TYPE:
                raise AuthorizationException(
                    f"Auto-configuration for {resource_type} resources is not "
                    "supported by the Azure connector."
                )
            else:
                token = credential.get_token(
                    AZURE_MANAGEMENT_TOKEN_SCOPE,
                )

            # Convert the expiration timestamp from Unix time to datetime
            # format.
            expires_at = datetime.datetime.fromtimestamp(token.expires_on)
            # Convert the expiration timestamp from local time to UTC time.
            expires_at = expires_at.astimezone(datetime.timezone.utc)

            auth_config = AzureAccessTokenConfig(
                token=token.token,
                resource_group=resource_group,
                storage_account=storage_account,
            )

        return cls(
            auth_method=auth_method,
            resource_type=resource_type,
            resource_id=resource_id
            if resource_type not in [AZURE_RESOURCE_TYPE, None]
            else None,
            expiration_seconds=expiration_seconds,
            expires_at=expires_at,
            config=auth_config,
        )

    @classmethod
    def _get_resource_group(cls, resource_id: str) -> str:
        """Get the resource group of an Azure resource.

        Args:
            resource_id: The ID of the Azure resource.

        Returns:
            The resource group of the Azure resource.
        """
        # The resource group is the fourth component of the resource ID.
        return resource_id.split("/")[4]

    def _list_blob_containers(
        self, credential: TokenCredential, container_name: Optional[str] = None
    ) -> Dict[str, str]:
        """Get the list of blob storage containers that the connector can access.

        Args:
            credential: The Azure credential to use to access the blob storage
                containers.
            container_name: The name of the blob container to get. If omitted,
                all accessible blob containers are returned.

        Returns:
            The list of blob storage containers that the connector can access
            as a dictionary mapping container names to their corresponding
            storage account names. If the `container_name` argument was
            specified, the dictionary will contain a single entry.

        Raises:
            AuthorizationException: If the connector does not have access to
                access the target blob storage containers.
        """
        subscription_id, _ = self.subscription
        # Azure blob storage containers are scoped to a storage account. We
        # need to figure out which storage accounts the connector can access
        # and then list the containers in each of them. If a container name
        # is provided, we only need to find the storage account that contains
        # it.

        storage_accounts: List[str] = []
        if self.config.storage_account:
            storage_accounts = [self.config.storage_account]
        else:
            try:
                storage_client = StorageManagementClient(
                    credential, subscription_id
                )
                accounts = list(storage_client.storage_accounts.list())
            except AzureError as e:
                raise AuthorizationException(
                    f"failed to list available Azure storage accounts. Please "
                    f"check that the credentials have permissions to list "
                    f"storage accounts or consider configuring a storage "
                    f"account in the connector: {e}"
                ) from e

            if not accounts:
                raise AuthorizationException(
                    "no storage accounts were found. Please check that the "
                    "credentials have permissions to access one or more "
                    "storage accounts."
                )

            if self.config.resource_group:
                # Keep only the storage accounts in the specified resource
                # group.
                accounts = [
                    account
                    for account in accounts
                    if self._get_resource_group(account.id)
                    == self.config.resource_group
                ]
                if not accounts:
                    raise AuthorizationException(
                        f"no storage accounts were found in the "
                        f"'{self.config.resource_group}' resource group "
                        f"specified in the connector configuration. Please "
                        f"check that resource group contains one or more "
                        f"storage accounts and that the connector credentials "
                        f"have permissions to access them."
                    )

            storage_accounts = [
                account.name for account in accounts if account.name
            ]

        containers: Dict[str, str] = {}
        for storage_account in storage_accounts:
            account_url = f"https://{storage_account}.blob.core.windows.net/"

            try:
                blob_client = BlobServiceClient(
                    account_url, credential=credential
                )
                response = blob_client.list_containers()
                account_containers = [
                    container.name for container in response if container.name
                ]
            except AzureError as e:
                raise AuthorizationException(
                    f"failed to fetch Azure blob storage containers in the "
                    f"'{storage_account}' storage account. Please check that "
                    f"the credentials have permissions to list containers in "
                    f"that storage account: {e}"
                ) from e

            if container_name:
                if container_name not in account_containers:
                    continue
                containers = {container_name: storage_account}
                break

            containers.update(
                {
                    container: storage_account
                    for container in account_containers
                }
            )

        if not containers:
            if container_name:
                if self.config.storage_account:
                    raise AuthorizationException(
                        f"the '{container_name}' container was not found in "
                        f"the '{self.config.storage_account}' storage "
                        f"account. Please check that container exists and the "
                        f"credentials have permissions to access that "
                        f"container."
                    )

                raise AuthorizationException(
                    f"the '{container_name}' container was not found in any "
                    f"of the storage accounts accessible to the connector. "
                    f"Please check that container exists and the credentials "
                    f"have permissions to access the storage account it "
                    f"belongs to."
                )

            raise AuthorizationException(
                "no Azure blob storage containers were found in any of the "
                "storage accounts accessible to the connector. Please check "
                "that the credentials have permissions to access one or more "
                "storage accounts."
            )

        return containers

    def _list_acr_registries(
        self, credential: TokenCredential, registry_name: Optional[str] = None
    ) -> Dict[str, str]:
        """Get the list of ACR registries that the connector can access.

        Args:
            credential: The Azure credential to use.
            registry_name: The name of the registry to get. If omitted,
                all accessible registries are returned.

        Returns:
            The list of ACR registries that the connector can access as a
            dictionary mapping registry names to their corresponding
            resource group names. If the `registry_name` argument was
            specified, the dictionary will contain a single entry.

        Raises:
            AuthorizationException: If the connector does not have access to
                access the target ACR registries.
        """
        subscription_id, _ = self.subscription

        container_registries: Dict[str, str] = {}
        if registry_name and self.config.resource_group:
            try:
                container_client = ContainerRegistryManagementClient(
                    credential, subscription_id
                )
                registry = container_client.registries.get(
                    resource_group_name=self.config.resource_group,
                    registry_name=registry_name,
                )
                if registry.name:
                    container_registries = {
                        registry.name: self.config.resource_group
                    }
            except AzureError as e:
                raise AuthorizationException(
                    f"failed to fetch the Azure container registry "
                    f"'{registry_name}' in the '{self.config.resource_group}' "
                    f"resource group specified in the connector configuration. "
                    f"Please check that the registry exists in that resource "
                    f"group and that the connector credentials have "
                    f"permissions to access that registry: {e}"
                ) from e
        else:
            try:
                container_client = ContainerRegistryManagementClient(
                    credential, subscription_id
                )
                registries = list(container_client.registries.list())
            except AzureError as e:
                raise AuthorizationException(
                    "failed to list available Azure container registries. "
                    "Please check that the credentials have permissions to "
                    f"list container registries: {e}"
                ) from e

            if not registries:
                raise AuthorizationException(
                    "no container registries were found. Please check that the "
                    "credentials have permissions to access one or more "
                    "container registries."
                )

            if self.config.resource_group:
                # Keep only the registries in the specified resource
                # group.
                registries = [
                    registry
                    for registry in registries
                    if self._get_resource_group(registry.id)
                    == self.config.resource_group
                ]

                if not registries:
                    raise AuthorizationException(
                        f"no container registries were found in the "
                        f"'{self.config.resource_group}' resource group "
                        f"specified in the connector configuration. Please "
                        f"check that resource group contains one or more "
                        f"container registries and that the connector "
                        f"credentials have permissions to access them."
                    )

            container_registries = {
                registry.name: self._get_resource_group(registry.id)
                for registry in registries
                if registry.name
            }

            if registry_name:
                if registry_name not in container_registries:
                    if self.config.resource_group:
                        raise AuthorizationException(
                            f"the '{registry_name}' registry was not found in "
                            f"the '{self.config.resource_group}' resource "
                            f"group specified in the connector configuration. "
                            f"Please check that registry exists in that "
                            f"resource group and that the connector "
                            f"credentials have permissions to access that "
                            f"registry."
                        )

                    raise AuthorizationException(
                        f"the '{registry_name}' registry was not found or is "
                        "not accessible. Please check that registry exists and "
                        "the credentials have permissions to access it."
                    )

                container_registries = {
                    registry_name: container_registries[registry_name]
                }

        return container_registries

    def _list_aks_clusters(
        self,
        credential: TokenCredential,
        cluster_name: Optional[str] = None,
        resource_group: Optional[str] = None,
    ) -> List[Tuple[str, str]]:
        """Get the list of AKS clusters that the connector can access.

        Args:
            credential: The Azure credential to use.
            cluster_name: The name of the cluster to get. If omitted,
                all accessible clusters are returned.
            resource_group: The name of the resource group to which the
                search should be limited. If omitted, all accessible
                clusters are returned.

        Returns:
            The list of AKS clusters that the connector can access as a
            list of cluster names and their corresponding resource group names.
            If the `cluster_name` argument was specified, the dictionary will
            contain a single entry.

        Raises:
            AuthorizationException: If the connector does not have access to
                access the target AKS clusters.
        """
        subscription_id, _ = self.subscription

        clusters: List[Tuple[str, str]] = []
        if cluster_name and resource_group:
            try:
                container_client = ContainerServiceClient(
                    credential, subscription_id
                )
                cluster = container_client.managed_clusters.get(
                    resource_group_name=resource_group,
                    resource_name=cluster_name,
                )
                if cluster.name:
                    clusters = [(cluster.name, resource_group)]
            except AzureError as e:
                raise AuthorizationException(
                    f"failed to fetch the Azure AKS cluster '{cluster_name}' "
                    f"in the '{resource_group}' resource group. Please check "
                    f"that the cluster exists in that resource group and that "
                    f"the connector credentials have permissions to access "
                    f"that cluster: {e}"
                ) from e
        else:
            try:
                container_client = ContainerServiceClient(
                    credential, subscription_id
                )
                aks_clusters = list(container_client.managed_clusters.list())
            except AzureError as e:
                raise AuthorizationException(
                    f"failed to list available Azure AKS clusters. Please "
                    f"check that the credentials have permissions to list "
                    f"AKS clusters: {e}"
                ) from e

            if not aks_clusters:
                raise AuthorizationException(
                    "no AKS clusters were found. Please check that the "
                    "credentials have permissions to access one or more "
                    "AKS clusters."
                )

            if self.config.resource_group:
                # Keep only the clusters in the specified resource
                # group.
                aks_clusters = [
                    cluster
                    for cluster in aks_clusters
                    if self._get_resource_group(cluster.id)
                    == self.config.resource_group
                ]

                if not aks_clusters:
                    raise AuthorizationException(
                        f"no AKS clusters were found in the "
                        f"'{self.config.resource_group}' resource group "
                        f"specified in the connector configuration. Please "
                        f"check that resource group contains one or more "
                        f"AKS clusters and that the connector credentials "
                        f"have permissions to access them."
                    )

            clusters = [
                (cluster.name, self._get_resource_group(cluster.id))
                for cluster in aks_clusters
                if cluster.name
                and (not cluster_name or cluster.name == cluster_name)
            ]

            if cluster_name:
                if not clusters:
                    if self.config.resource_group:
                        raise AuthorizationException(
                            f"the '{cluster_name}' AKS cluster was not found "
                            f"in the '{self.config.resource_group}' resource "
                            f"group specified in the connector configuration. "
                            f"Please check that cluster exists in that "
                            f"resource group and that the connector "
                            f"credentials have permissions to access that "
                            f"cluster."
                        )

                    raise AuthorizationException(
                        f"the '{cluster_name}' AKS cluster was not found or "
                        "is not accessible. Please check that cluster exists "
                        "and the credentials have permissions to access it."
                    )

                if len(clusters) > 1:
                    resource_groups = [cluster[1] for cluster in clusters]
                    raise AuthorizationException(
                        f"the '{cluster_name}' AKS cluster was found in "
                        f"multiple resource groups: "
                        f"{', '.join(resource_groups)}. "
                        f"Please specify a resource group in the connector "
                        f"configuration or include the resource group in the "
                        "resource name to disambiguate."
                    )

        return clusters

    def _verify(
        self,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
    ) -> List[str]:
        """Verify and list all the resources that the connector can access.

        Args:
            resource_type: The type of the resource to verify. If omitted and
                if the connector supports multiple resource types, the
                implementation must verify that it can authenticate and connect
                to any and all of the supported resource types.
            resource_id: The ID of the resource to connect to. Omitted if a
                resource type is not specified. It has the same value as the
                default resource ID if the supplied resource type doesn't
                support multiple instances. If the supplied resource type does
                allows multiple instances, this parameter may still be omitted
                to fetch a list of resource IDs identifying all the resources
                of the indicated type that the connector can access.

        Returns:
            The list of resources IDs in canonical format identifying the
            resources that the connector can access. This list is empty only
            if the resource type is not specified (i.e. for multi-type
            connectors).

        Raises:
            AuthorizationException: If the connector cannot authenticate or
                access the specified resource.
        """
        # If the resource type is not specified, treat this the
        # same as a generic Azure connector.
        if not resource_type:
            # fetch the Azure subscription and tenant IDs to verify the
            # credentials globally
            subscription_id, subscription_name = self.subscription
            return []

        if resource_type == AZURE_RESOURCE_TYPE:
            assert resource_id is not None
            return [resource_id]

        credential, _ = self.get_azure_credential(
            self.auth_method,
            resource_type=resource_type or AZURE_RESOURCE_TYPE,
            resource_id=resource_id,
        )

        if resource_type == BLOB_RESOURCE_TYPE:
            if self.auth_method == AzureAuthenticationMethods.ACCESS_TOKEN:
                raise AuthorizationException(
                    f"the '{self.auth_method}' authentication method is not "
                    "supported for blob storage resources"
                )

            container_name: Optional[str] = None
            if resource_id:
                container_name = self._parse_blob_container_resource_id(
                    resource_id
                )

            containers = self._list_blob_containers(
                credential,
                container_name=container_name,
            )

            return [f"az://{container}" for container in containers.keys()]

        if resource_type == DOCKER_REGISTRY_RESOURCE_TYPE:
            registry_name: Optional[str] = None
            if resource_id:
                registry_name = self._parse_acr_resource_id(resource_id)

            registries = self._list_acr_registries(
                credential,
                registry_name=registry_name,
            )

            return [f"{registry}.azurecr.io" for registry in registries.keys()]

        if resource_type == KUBERNETES_CLUSTER_RESOURCE_TYPE:
            cluster_name: Optional[str] = None
            resource_group = self.config.resource_group
            if resource_id:
                resource_group, cluster_name = self._parse_aks_resource_id(
                    resource_id
                )

            clusters = self._list_aks_clusters(
                credential,
                cluster_name=cluster_name,
                resource_group=resource_group,
            )

            return [
                f"{resource_group}/{cluster_name}"
                for cluster_name, resource_group in clusters
            ]

        return []

    def _get_connector_client(
        self,
        resource_type: str,
        resource_id: str,
    ) -> "ServiceConnector":
        """Get a connector instance that can be used to connect to a resource.

        This method generates a client-side connector instance that can be used
        to connect to a resource of the given type. The client-side connector
        is configured with temporary Azure credentials extracted from the
        current connector and, depending on resource type, it may also be
        of a different connector type:

        - a Kubernetes connector for Kubernetes clusters
        - a Docker connector for Docker registries

        Args:
            resource_type: The type of the resources to connect to.
            resource_id: The ID of a particular resource to connect to.

        Returns:
            An Azure, Kubernetes or Docker connector instance that can be used to
            connect to the specified resource.

        Raises:
            AuthorizationException: If authentication failed.
            ValueError: If the resource type is not supported.
            RuntimeError: If the Kubernetes connector is not installed and the
                resource type is Kubernetes.
        """
        connector_name = ""
        if self.name:
            connector_name = self.name
        if resource_id:
            connector_name += f" ({resource_type} | {resource_id} client)"
        else:
            connector_name += f" ({resource_type} client)"

        logger.debug(f"Getting connector client for {connector_name}")

        if resource_type in [AZURE_RESOURCE_TYPE, BLOB_RESOURCE_TYPE]:
            auth_method = self.auth_method
            if (
                self.resource_type == resource_type
                and self.resource_id == resource_id
            ):
                # If the requested type and resource ID are the same as
                # those configured, we can return the current connector
                # instance because it's fully formed and ready to use
                # to connect to the specified resource
                return self

            config = self.config
            expires_at = self.expires_at

            # Create a client-side Azure connector instance that is fully formed
            # and ready to use to connect to the specified resource (i.e. has
            # all the necessary configuration and credentials, a resource type
            # and a resource ID where applicable)
            return AzureServiceConnector(
                id=self.id,
                name=connector_name,
                auth_method=auth_method,
                resource_type=resource_type,
                resource_id=resource_id,
                config=config,
                expires_at=expires_at,
            )

        subscription_id, _ = self.subscription
        credential, expires_at = self.get_azure_credential(
            self.auth_method,
            resource_type=resource_type,
            resource_id=resource_id,
        )
        resource_group: Optional[str] = None

        if resource_type == DOCKER_REGISTRY_RESOURCE_TYPE:
            registry_name = self._parse_acr_resource_id(resource_id)

            # If a service principal is used for authentication, the client ID
            # and client secret can be used to authenticate to the registry, if
            # configured.
            # https://learn.microsoft.com/en-us/azure/container-registry/container-registry-auth-service-principal#authenticate-with-the-service-principal
            if (
                self.auth_method
                == AzureAuthenticationMethods.SERVICE_PRINCIPAL
            ):
                assert isinstance(self.config, AzureServicePrincipalConfig)
                username = str(self.config.client_id)
                password = self.config.client_secret

            # Without a service principal, this only works if the admin account
            # is enabled for the registry, but this is not recommended and
            # disabled by default.
            # https://docs.microsoft.com/en-us/azure/container-registry/container-registry-authentication#admin-account

            else:
                registries = self._list_acr_registries(
                    credential,
                    registry_name=registry_name,
                )
                registry_name, resource_group = registries.popitem()

                try:
                    client = ContainerRegistryManagementClient(
                        credential, subscription_id
                    )

                    registry_credentials = client.registries.list_credentials(
                        resource_group, registry_name
                    )
                    username = registry_credentials.username
                    password = registry_credentials.passwords[0].value
                except AzureError as e:
                    raise AuthorizationException(
                        f"failed to list admin credentials for Azure Container "
                        f"Registry '{registry_name}' in resource group "
                        f"'{resource_group}'. Make sure the registry is "
                        f"configured with an admin account: {e}"
                    ) from e

            # Create a client-side Docker connector instance with the temporary
            # Docker credentials
            return DockerServiceConnector(
                id=self.id,
                name=connector_name,
                auth_method=DockerAuthenticationMethods.PASSWORD,
                resource_type=resource_type,
                config=DockerConfiguration(
                    username=username,
                    password=password,
                    registry=f"{registry_name}.azurecr.io",
                ),
                expires_at=expires_at,
            )

        if resource_type == KUBERNETES_CLUSTER_RESOURCE_TYPE:
            resource_group, cluster_name = self._parse_aks_resource_id(
                resource_id
            )
            clusters = self._list_aks_clusters(
                credential,
                cluster_name=cluster_name,
                resource_group=resource_group,
            )
            cluster_name, resource_group = clusters[0]

            try:
                client = ContainerServiceClient(credential, subscription_id)

                creds = client.managed_clusters.list_cluster_admin_credentials(
                    resource_group_name=resource_group,
                    resource_name=cluster_name,
                )

                kubeconfig_yaml = creds.kubeconfigs[0].value.decode(
                    encoding="UTF-8"
                )
            except AzureError as e:
                raise AuthorizationException(
                    f"failed to list credentials for Azure Kubernetes "
                    f"Service cluster '{cluster_name}' in resource group "
                    f"'{resource_group}': {e}"
                ) from e

            kubeconfig = yaml.safe_load(kubeconfig_yaml)

            # Create a client-side Kubernetes connector instance with the
            # Kubernetes credentials
            try:
                # Import libraries only when needed
                from zenml.integrations.kubernetes.service_connectors.kubernetes_service_connector import (
                    KubernetesAuthenticationMethods,
                    KubernetesServiceConnector,
                    KubernetesTokenConfig,
                )
            except ImportError as e:
                raise RuntimeError(
                    f"The Kubernetes Service Connector functionality could not "
                    f"be used due to missing dependencies: {e}"
                )
            cluster_name = kubeconfig["clusters"][0]["name"]
            cluster = kubeconfig["clusters"][0]["cluster"]
            user = kubeconfig["users"][0]["user"]
            return KubernetesServiceConnector(
                id=self.id,
                name=connector_name,
                auth_method=KubernetesAuthenticationMethods.TOKEN,
                resource_type=resource_type,
                config=KubernetesTokenConfig(
                    cluster_name=cluster_name,
                    certificate_authority=cluster.get(
                        "certificate-authority-data"
                    ),
                    server=cluster["server"],
                    token=user["token"],
                    client_certificate=user.get("client-certificate-data"),
                    client_key=user.get("client-key-data"),
                ),
                expires_at=expires_at,
            )

        raise ValueError(f"Unsupported resource type: {resource_type}")
subscription: Tuple[str, str] property readonly

Get the Azure subscription ID and name.

Returns:

Type Description
Tuple[str, str]

The Azure subscription ID and name.

Exceptions:

Type Description
AuthorizationException

If the Azure subscription could not be determined or doesn't match the configured subscription ID.

tenant_id: str property readonly

Get the Azure tenant ID.

Returns:

Type Description
str

The Azure tenant ID.

Exceptions:

Type Description
AuthorizationException

If the Azure tenant ID could not be determined or doesn't match the configured tenant ID.

get_azure_credential(self, auth_method, resource_type=None, resource_id=None)

Get an Azure credential for the specified resource.

Parameters:

Name Type Description Default
auth_method str

The authentication method to use.

required
resource_type Optional[str]

The resource type to get a credential for.

None
resource_id Optional[str]

The resource ID to get a credential for.

None

Returns:

Type Description
Tuple[azure.core.credentials.TokenCredential, Optional[datetime.datetime]]

An Azure credential for the specified resource and its expiration timestamp, if applicable.

Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
def get_azure_credential(
    self,
    auth_method: str,
    resource_type: Optional[str] = None,
    resource_id: Optional[str] = None,
) -> Tuple[TokenCredential, Optional[datetime.datetime]]:
    """Get an Azure credential for the specified resource.

    Args:
        auth_method: The authentication method to use.
        resource_type: The resource type to get a credential for.
        resource_id: The resource ID to get a credential for.

    Returns:
        An Azure credential for the specified resource and its expiration
        timestamp, if applicable.
    """
    # We maintain a cache of all sessions to avoid re-authenticating
    # multiple times for the same resource
    key = auth_method
    if key in self._session_cache:
        session, expires_at = self._session_cache[key]
        if expires_at is None:
            return session, None

        # Refresh expired sessions
        now = datetime.datetime.now(datetime.timezone.utc)
        expires_at = expires_at.replace(tzinfo=datetime.timezone.utc)

        # check if the token expires in the near future
        if expires_at > now + datetime.timedelta(
            minutes=AZURE_SESSION_EXPIRATION_BUFFER
        ):
            return session, expires_at

    logger.debug(
        f"Creating Azure credential for auth method '{auth_method}', "
        f"resource type '{resource_type}' and resource ID "
        f"'{resource_id}'..."
    )
    session, expires_at = self._authenticate(
        auth_method, resource_type, resource_id
    )
    self._session_cache[key] = (session, expires_at)
    return session, expires_at
model_post_init(/, self, context)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Parameters:

Name Type Description Default
self BaseModel

The BaseModel instance.

required
context Any

The context.

required
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
def init_private_attributes(self: BaseModel, context: Any, /) -> None:
    """This function is meant to behave like a BaseModel method to initialise private attributes.

    It takes context as an argument since that's what pydantic-core passes when calling it.

    Args:
        self: The BaseModel instance.
        context: The context.
    """
    if getattr(self, '__pydantic_private__', None) is None:
        pydantic_private = {}
        for name, private_attr in self.__private_attributes__.items():
            default = private_attr.get_default()
            if default is not PydanticUndefined:
                pydantic_private[name] = default
        object_setattr(self, '__pydantic_private__', pydantic_private)
AzureServicePrincipalConfig (AzureClientConfig, AzureClientSecret)

Azure service principal configuration.

Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureServicePrincipalConfig(AzureClientConfig, AzureClientSecret):
    """Azure service principal configuration."""
ZenMLAzureTokenCredential (TokenCredential)

ZenML Azure token credential.

This class is used to provide a pre-configured token credential to Azure clients. Tokens are generated from other Azure credentials and are served to Azure clients to authenticate requests.

Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class ZenMLAzureTokenCredential(TokenCredential):
    """ZenML Azure token credential.

    This class is used to provide a pre-configured token credential to Azure
    clients. Tokens are generated from other Azure credentials and are served
    to Azure clients to authenticate requests.
    """

    def __init__(self, token: str, expires_at: datetime.datetime):
        """Initialize ZenML Azure token credential.

        Args:
            token: The token to use for authentication
            expires_at: The expiration time of the token
        """
        self.token = token

        # Convert the expiration time from UTC to local time
        expires_at.replace(tzinfo=datetime.timezone.utc)
        expires_at = expires_at.astimezone(
            datetime.datetime.now().astimezone().tzinfo
        )

        self.expires_on = int(expires_at.timestamp())

    def get_token(self, *scopes: str, **kwargs: Any) -> Any:
        """Get token.

        Args:
            *scopes: Scopes
            **kwargs: Keyword arguments

        Returns:
            Token
        """
        return AccessToken(token=self.token, expires_on=self.expires_on)
__init__(self, token, expires_at) special

Initialize ZenML Azure token credential.

Parameters:

Name Type Description Default
token str

The token to use for authentication

required
expires_at datetime

The expiration time of the token

required
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
def __init__(self, token: str, expires_at: datetime.datetime):
    """Initialize ZenML Azure token credential.

    Args:
        token: The token to use for authentication
        expires_at: The expiration time of the token
    """
    self.token = token

    # Convert the expiration time from UTC to local time
    expires_at.replace(tzinfo=datetime.timezone.utc)
    expires_at = expires_at.astimezone(
        datetime.datetime.now().astimezone().tzinfo
    )

    self.expires_on = int(expires_at.timestamp())
get_token(self, *scopes, **kwargs)

Get token.

Parameters:

Name Type Description Default
*scopes str

Scopes

()
**kwargs Any

Keyword arguments

{}

Returns:

Type Description
Any

Token

Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
def get_token(self, *scopes: str, **kwargs: Any) -> Any:
    """Get token.

    Args:
        *scopes: Scopes
        **kwargs: Keyword arguments

    Returns:
        Token
    """
    return AccessToken(token=self.token, expires_on=self.expires_on)

step_operators special

Initialization of AzureML Step Operator integration.

azureml_step_operator

Implementation of the ZenML AzureML Step Operator.

AzureMLStepOperator (BaseStepOperator)

Step operator to run a step on AzureML.

This class defines code that can set up an AzureML environment and run the ZenML entrypoint command in it.

Source code in zenml/integrations/azure/step_operators/azureml_step_operator.py
class AzureMLStepOperator(BaseStepOperator):
    """Step operator to run a step on AzureML.

    This class defines code that can set up an AzureML environment and run the
    ZenML entrypoint command in it.
    """

    @property
    def config(self) -> AzureMLStepOperatorConfig:
        """Returns the `AzureMLStepOperatorConfig` config.

        Returns:
            The configuration.
        """
        return cast(AzureMLStepOperatorConfig, self._config)

    @property
    def settings_class(self) -> Optional[Type["BaseSettings"]]:
        """Settings class for the AzureML step operator.

        Returns:
            The settings class.
        """
        return AzureMLStepOperatorSettings

    @property
    def validator(self) -> Optional[StackValidator]:
        """Validates the stack.

        Returns:
            A validator that checks that the stack contains a remote container
            registry and a remote artifact store.
        """

        def _validate_remote_components(
            stack: "Stack",
        ) -> Tuple[bool, str]:
            if stack.artifact_store.config.is_local:
                return False, (
                    "The AzureML step operator runs code remotely and "
                    "needs to write files into the artifact store, but the "
                    f"artifact store `{stack.artifact_store.name}` of the "
                    "active stack is local. Please ensure that your stack "
                    "contains a remote artifact store when using the AzureML "
                    "step operator."
                )

            container_registry = stack.container_registry
            assert container_registry is not None

            if container_registry.config.is_local:
                return False, (
                    "The AzureML step operator runs code remotely and "
                    "needs to push/pull Docker images, but the "
                    f"container registry `{container_registry.name}` of the "
                    "active stack is local. Please ensure that your stack "
                    "contains a remote container registry when using the "
                    "AzureML step operator."
                )

            return True, ""

        return StackValidator(
            required_components={
                StackComponentType.CONTAINER_REGISTRY,
                StackComponentType.IMAGE_BUILDER,
            },
            custom_validation_function=_validate_remote_components,
        )

    def _get_credentials(self) -> TokenCredential:
        """Returns the authentication object for the AzureML environment.

        Returns:
            The authentication object for the AzureML environment.
        """
        # Authentication
        if connector := self.get_connector():
            credentials = connector.connect()
            assert isinstance(credentials, TokenCredential)
            return credentials
        elif (
            self.config.tenant_id
            and self.config.service_principal_id
            and self.config.service_principal_password
        ):
            return ClientSecretCredential(
                tenant_id=self.config.tenant_id,
                client_id=self.config.service_principal_id,
                client_secret=self.config.service_principal_password,
            )
        else:
            return DefaultAzureCredential()

    def get_docker_builds(
        self, deployment: "PipelineDeploymentBase"
    ) -> List["BuildConfiguration"]:
        """Gets the Docker builds required for the component.

        Args:
            deployment: The pipeline deployment for which to get the builds.

        Returns:
            The required Docker builds.
        """
        builds = []
        for step_name, step in deployment.step_configurations.items():
            if step.config.step_operator == self.name:
                build = BuildConfiguration(
                    key=AZUREML_STEP_OPERATOR_DOCKER_IMAGE_KEY,
                    settings=step.config.docker_settings,
                    step_name=step_name,
                )
                builds.append(build)

        return builds

    def launch(
        self,
        info: "StepRunInfo",
        entrypoint_command: List[str],
        environment: Dict[str, str],
    ) -> None:
        """Launches a step on AzureML.

        Args:
            info: Information about the step run.
            entrypoint_command: Command that executes the step.
            environment: Environment variables to set in the step operator
                environment.
        """
        settings = cast(AzureMLStepOperatorSettings, self.get_settings(info))
        image_name = info.get_image(key=AZUREML_STEP_OPERATOR_DOCKER_IMAGE_KEY)

        # Client creation
        ml_client = MLClient(
            credential=self._get_credentials(),
            subscription_id=self.config.subscription_id,
            resource_group_name=self.config.resource_group,
            workspace_name=self.config.workspace_name,
        )

        env = Environment(name=f"zenml-{info.run_name}", image=image_name)

        compute_target = create_or_get_compute(
            ml_client, settings, default_compute_name=f"zenml_{self.id}"
        )

        command_job = command(
            name=info.run_name,
            command=" ".join(entrypoint_command),
            environment=env,
            environment_variables=environment,
            compute=compute_target,
            experiment_name=info.pipeline.name,
        )

        job = ml_client.jobs.create_or_update(command_job)

        logger.info(f"AzureML job created with id: {job.id}")
        ml_client.jobs.stream(info.run_name)
config: AzureMLStepOperatorConfig property readonly

Returns the AzureMLStepOperatorConfig config.

Returns:

Type Description
AzureMLStepOperatorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property readonly

Settings class for the AzureML step operator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[zenml.stack.stack_validator.StackValidator] property readonly

Validates the stack.

Returns:

Type Description
Optional[zenml.stack.stack_validator.StackValidator]

A validator that checks that the stack contains a remote container registry and a remote artifact store.

get_docker_builds(self, deployment)

Gets the Docker builds required for the component.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The pipeline deployment for which to get the builds.

required

Returns:

Type Description
List[BuildConfiguration]

The required Docker builds.

Source code in zenml/integrations/azure/step_operators/azureml_step_operator.py
def get_docker_builds(
    self, deployment: "PipelineDeploymentBase"
) -> List["BuildConfiguration"]:
    """Gets the Docker builds required for the component.

    Args:
        deployment: The pipeline deployment for which to get the builds.

    Returns:
        The required Docker builds.
    """
    builds = []
    for step_name, step in deployment.step_configurations.items():
        if step.config.step_operator == self.name:
            build = BuildConfiguration(
                key=AZUREML_STEP_OPERATOR_DOCKER_IMAGE_KEY,
                settings=step.config.docker_settings,
                step_name=step_name,
            )
            builds.append(build)

    return builds
launch(self, info, entrypoint_command, environment)

Launches a step on AzureML.

Parameters:

Name Type Description Default
info StepRunInfo

Information about the step run.

required
entrypoint_command List[str]

Command that executes the step.

required
environment Dict[str, str]

Environment variables to set in the step operator environment.

required
Source code in zenml/integrations/azure/step_operators/azureml_step_operator.py
def launch(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
    environment: Dict[str, str],
) -> None:
    """Launches a step on AzureML.

    Args:
        info: Information about the step run.
        entrypoint_command: Command that executes the step.
        environment: Environment variables to set in the step operator
            environment.
    """
    settings = cast(AzureMLStepOperatorSettings, self.get_settings(info))
    image_name = info.get_image(key=AZUREML_STEP_OPERATOR_DOCKER_IMAGE_KEY)

    # Client creation
    ml_client = MLClient(
        credential=self._get_credentials(),
        subscription_id=self.config.subscription_id,
        resource_group_name=self.config.resource_group,
        workspace_name=self.config.workspace_name,
    )

    env = Environment(name=f"zenml-{info.run_name}", image=image_name)

    compute_target = create_or_get_compute(
        ml_client, settings, default_compute_name=f"zenml_{self.id}"
    )

    command_job = command(
        name=info.run_name,
        command=" ".join(entrypoint_command),
        environment=env,
        environment_variables=environment,
        compute=compute_target,
        experiment_name=info.pipeline.name,
    )

    job = ml_client.jobs.create_or_update(command_job)

    logger.info(f"AzureML job created with id: {job.id}")
    ml_client.jobs.stream(info.run_name)