Skip to content

Login

zenml.login special

ZenML login utilities.

credentials

ZenML login credentials models.

APIToken (BaseModel)

Cached API Token.

Source code in zenml/login/credentials.py
class APIToken(BaseModel):
    """Cached API Token."""

    access_token: str
    expires_in: Optional[int] = None
    expires_at: Optional[datetime] = None
    leeway: Optional[int] = None
    cookie_name: Optional[str] = None
    device_id: Optional[UUID] = None
    device_metadata: Optional[Dict[str, Any]] = None

    @property
    def expires_at_with_leeway(self) -> Optional[datetime]:
        """Get the token expiration time with leeway.

        Returns:
            The token expiration time with leeway.
        """
        if not self.expires_at:
            return None
        if not self.leeway:
            return self.expires_at
        return self.expires_at - timedelta(seconds=self.leeway)

    @property
    def expired(self) -> bool:
        """Check if the token is expired.

        Returns:
            bool: True if the token is expired, False otherwise.
        """
        expires_at = self.expires_at_with_leeway
        if not expires_at:
            return False
        return expires_at < datetime.now(timezone.utc)

    model_config = ConfigDict(
        # Allow extra attributes to allow backwards compatibility
        extra="allow",
    )
expired: bool property readonly

Check if the token is expired.

Returns:

Type Description
bool

True if the token is expired, False otherwise.

expires_at_with_leeway: Optional[datetime.datetime] property readonly

Get the token expiration time with leeway.

Returns:

Type Description
Optional[datetime.datetime]

The token expiration time with leeway.

ServerCredentials (BaseModel)

Cached Server Credentials.

Source code in zenml/login/credentials.py
class ServerCredentials(BaseModel):
    """Cached Server Credentials."""

    url: str
    api_key: Optional[str] = None
    api_token: Optional[APIToken] = None
    username: Optional[str] = None
    password: Optional[str] = None

    # Extra server attributes
    server_id: Optional[UUID] = None
    server_name: Optional[str] = None
    organization_name: Optional[str] = None
    organization_id: Optional[UUID] = None
    status: Optional[str] = None
    version: Optional[str] = None

    @property
    def id(self) -> str:
        """Get the server identifier.

        Returns:
            The server identifier.
        """
        if self.server_id:
            return str(self.server_id)
        return self.url

    @property
    def type(self) -> ServerType:
        """Get the server type.

        Returns:
            The server type.
        """
        from zenml.login.pro.utils import is_zenml_pro_server_url

        if self.url == ZENML_PRO_API_URL:
            return ServerType.PRO_API
        if self.organization_id or is_zenml_pro_server_url(self.url):
            return ServerType.PRO
        if urlparse(self.url).hostname in [
            "localhost",
            "127.0.0.1",
            "host.docker.internal",
        ]:
            return ServerType.LOCAL
        return ServerType.REMOTE

    def update_server_info(
        self, server_info: Union[ServerModel, TenantRead]
    ) -> None:
        """Update with server information received from the server itself or from a ZenML Pro tenant descriptor.

        Args:
            server_info: The server information to update with.
        """
        if isinstance(server_info, ServerModel):
            # The server ID doesn't change during the lifetime of the server
            self.server_id = self.server_id or server_info.id

            # All other attributes can change during the lifetime of the server
            server_name = (
                server_info.metadata.get("tenant_name") or server_info.name
            )
            if server_name:
                self.server_name = server_name
            organization_id = server_info.metadata.get("organization_id")
            if organization_id:
                self.organization_id = UUID(organization_id)
            self.version = server_info.version or self.version
            # The server information was retrieved from the server itself, so we
            # can assume that the server is available
            self.status = "available"
        else:
            self.server_id = server_info.id
            self.server_name = server_info.name
            self.organization_name = server_info.organization_name
            self.organization_id = server_info.organization_id
            self.status = server_info.status
            self.version = server_info.version

    @property
    def is_available(self) -> bool:
        """Check if the server is available (running and authenticated).

        Returns:
            True if the server is available, False otherwise.
        """
        if self.status not in [TenantStatus.AVAILABLE, ServiceState.ACTIVE]:
            return False
        if (
            self.api_key
            or self.api_token
            or self.username
            and self.password is not None
            or self.type in [ServerType.PRO, ServerType.LOCAL]
        ):
            return True
        if self.api_token and not self.api_token.expired:
            return True
        return False

    @property
    def auth_status(self) -> str:
        """Get the authentication status.

        Returns:
            The authentication status.
        """
        if self.api_key:
            return "API key"
        if self.username and self.password is not None:
            return "password"
        if not self.api_token:
            if self.type == ServerType.LOCAL:
                return "no authentication required"
            return "N/A"
        expires_at = self.api_token.expires_at_with_leeway
        if not expires_at:
            return "never expires"
        if expires_at < datetime.now(timezone.utc):
            return "expired at " + self.expires_at

        return f"valid until {self.expires_at} (in {self.expires_in})"

    @property
    def expires_at(self) -> str:
        """Get the expiration time of the token as a string.

        Returns:
            The expiration time of the token as a string.
        """
        if not self.api_token:
            return "N/A"
        expires_at = self.api_token.expires_at_with_leeway
        if not expires_at:
            return "never"

        # Convert the date in the local timezone
        local_expires_at = expires_at.astimezone()
        return local_expires_at.strftime("%Y-%m-%d %H:%M:%S %Z")

    @property
    def expires_in(self) -> str:
        """Get the time remaining until the token expires.

        Returns:
            The time remaining until the token expires.
        """
        if not self.api_token:
            return "N/A"
        expires_at = self.api_token.expires_at_with_leeway
        if not expires_at:
            return "never"

        # Get the time remaining until the token expires
        expires_in = expires_at - datetime.now(timezone.utc)
        return get_human_readable_time(expires_in.total_seconds())

    @property
    def dashboard_url(self) -> str:
        """Get the URL to the ZenML dashboard for this server.

        Returns:
            The URL to the ZenML dashboard for this server.
        """
        if self.organization_id and self.server_id:
            return (
                ZENML_PRO_URL
                + f"/organizations/{str(self.organization_id)}/tenants/{str(self.server_id)}"
            )
        return self.url

    @property
    def dashboard_organization_url(self) -> str:
        """Get the URL to the ZenML Pro dashboard for this tenant's organization.

        Returns:
            The URL to the ZenML Pro dashboard for this tenant's organization.
        """
        if self.organization_id:
            return (
                ZENML_PRO_URL + f"/organizations/{str(self.organization_id)}"
            )
        return ""

    @property
    def dashboard_hyperlink(self) -> str:
        """Get the hyperlink to the ZenML dashboard for this tenant.

        Returns:
            The hyperlink to the ZenML dashboard for this tenant.
        """
        return f"[link={self.dashboard_url}]{self.dashboard_url}[/link]"

    @property
    def api_hyperlink(self) -> str:
        """Get the hyperlink to the ZenML OpenAPI dashboard for this tenant.

        Returns:
            The hyperlink to the ZenML OpenAPI dashboard for this tenant.
        """
        api_url = self.url + "/docs"
        return f"[link={api_url}]{self.url}[/link]"

    @property
    def server_name_hyperlink(self) -> str:
        """Get the hyperlink to the ZenML dashboard for this server using its name.

        Returns:
            The hyperlink to the ZenML dashboard for this server using its name.
        """
        if self.server_name is None:
            return "N/A"
        return f"[link={self.dashboard_url}]{self.server_name}[/link]"

    @property
    def server_id_hyperlink(self) -> str:
        """Get the hyperlink to the ZenML dashboard for this server using its ID.

        Returns:
            The hyperlink to the ZenML dashboard for this server using its ID.
        """
        if self.server_id is None:
            return "N/A"
        return f"[link={self.dashboard_url}]{str(self.server_id)}[/link]"

    @property
    def organization_hyperlink(self) -> str:
        """Get the hyperlink to the ZenML Pro dashboard for this server's organization.

        Returns:
            The hyperlink to the ZenML Pro dashboard for this server's
            organization.
        """
        if self.organization_name:
            return self.organization_name_hyperlink
        if self.organization_id:
            return self.organization_id_hyperlink
        return "N/A"

    @property
    def organization_name_hyperlink(self) -> str:
        """Get the hyperlink to the ZenML Pro dashboard for this server's organization using its name.

        Returns:
            The hyperlink to the ZenML Pro dashboard for this server's
            organization using its name.
        """
        if self.organization_name is None:
            return "N/A"
        return f"[link={self.dashboard_organization_url}]{self.organization_name}[/link]"

    @property
    def organization_id_hyperlink(self) -> str:
        """Get the hyperlink to the ZenML Pro dashboard for this tenant's organization using its ID.

        Returns:
            The hyperlink to the ZenML Pro dashboard for this tenant's
            organization using its ID.
        """
        if self.organization_id is None:
            return "N/A"
        return f"[link={self.dashboard_organization_url}]{self.organization_id}[/link]"

Get the hyperlink to the ZenML OpenAPI dashboard for this tenant.

Returns:

Type Description
str

The hyperlink to the ZenML OpenAPI dashboard for this tenant.

auth_status: str property readonly

Get the authentication status.

Returns:

Type Description
str

The authentication status.

Get the hyperlink to the ZenML dashboard for this tenant.

Returns:

Type Description
str

The hyperlink to the ZenML dashboard for this tenant.

dashboard_organization_url: str property readonly

Get the URL to the ZenML Pro dashboard for this tenant's organization.

Returns:

Type Description
str

The URL to the ZenML Pro dashboard for this tenant's organization.

dashboard_url: str property readonly

Get the URL to the ZenML dashboard for this server.

Returns:

Type Description
str

The URL to the ZenML dashboard for this server.

expires_at: str property readonly

Get the expiration time of the token as a string.

Returns:

Type Description
str

The expiration time of the token as a string.

expires_in: str property readonly

Get the time remaining until the token expires.

Returns:

Type Description
str

The time remaining until the token expires.

id: str property readonly

Get the server identifier.

Returns:

Type Description
str

The server identifier.

is_available: bool property readonly

Check if the server is available (running and authenticated).

Returns:

Type Description
bool

True if the server is available, False otherwise.

Get the hyperlink to the ZenML Pro dashboard for this server's organization.

Returns:

Type Description
str

The hyperlink to the ZenML Pro dashboard for this server's organization.

Get the hyperlink to the ZenML Pro dashboard for this tenant's organization using its ID.

Returns:

Type Description
str

The hyperlink to the ZenML Pro dashboard for this tenant's organization using its ID.

Get the hyperlink to the ZenML Pro dashboard for this server's organization using its name.

Returns:

Type Description
str

The hyperlink to the ZenML Pro dashboard for this server's organization using its name.

Get the hyperlink to the ZenML dashboard for this server using its ID.

Returns:

Type Description
str

The hyperlink to the ZenML dashboard for this server using its ID.

Get the hyperlink to the ZenML dashboard for this server using its name.

Returns:

Type Description
str

The hyperlink to the ZenML dashboard for this server using its name.

type: ServerType property readonly

Get the server type.

Returns:

Type Description
ServerType

The server type.

update_server_info(self, server_info)

Update with server information received from the server itself or from a ZenML Pro tenant descriptor.

Parameters:

Name Type Description Default
server_info Union[zenml.models.v2.misc.server_models.ServerModel, zenml.login.pro.tenant.models.TenantRead]

The server information to update with.

required
Source code in zenml/login/credentials.py
def update_server_info(
    self, server_info: Union[ServerModel, TenantRead]
) -> None:
    """Update with server information received from the server itself or from a ZenML Pro tenant descriptor.

    Args:
        server_info: The server information to update with.
    """
    if isinstance(server_info, ServerModel):
        # The server ID doesn't change during the lifetime of the server
        self.server_id = self.server_id or server_info.id

        # All other attributes can change during the lifetime of the server
        server_name = (
            server_info.metadata.get("tenant_name") or server_info.name
        )
        if server_name:
            self.server_name = server_name
        organization_id = server_info.metadata.get("organization_id")
        if organization_id:
            self.organization_id = UUID(organization_id)
        self.version = server_info.version or self.version
        # The server information was retrieved from the server itself, so we
        # can assume that the server is available
        self.status = "available"
    else:
        self.server_id = server_info.id
        self.server_name = server_info.name
        self.organization_name = server_info.organization_name
        self.organization_id = server_info.organization_id
        self.status = server_info.status
        self.version = server_info.version

ServerType (StrEnum)

The type of server.

Source code in zenml/login/credentials.py
class ServerType(StrEnum):
    """The type of server."""

    PRO_API = "PRO_API"
    PRO = "PRO"
    REMOTE = "REMOTE"
    LOCAL = "LOCAL"

credentials_store

ZenML login credentials store support.

CredentialsStore

Login credentials store.

This is a singleton object that maintains a cache of all API tokens and API keys that are configured for the ZenML servers that the client connects to throughout its lifetime.

The cache is persistent and it is backed by a credentials.yaml YAML file kept in the global configuration location. The Credentials Store cache is populated mainly in the following ways:

1. when the user runs `zenml login` to authenticate to a ZenML Pro
server, the ZenML Pro API token fetched from the web login flow is
stored in the Credentials Store.
2. when the user runs `zenml login` to authenticate to a regular ZenML
server with the web login flow, the ZenML server API token fetched
through the web login flow is stored in the Credentials Store
3. when the user runs `zenml login` to authenticate to any ZenML server
using an API key, the API key is stored in the Credentials Store
4. when the REST zen store is initialized, it starts up not yet
authenticated. Then, if/when it needs to authenticate or re-authenticate
to the remote server, it will use whatever form of credentials it finds
in the Credentials Store, in order of priority:

    * if it finds an API token that is not expired (e.g. authorized
    device API tokens fetched through the web login flow or short-lived
    session API tokens fetched through some other means of
    authentication), it will use that to authenticate
    * for ZenML servers that use an API key to authenticate, it will use
    that to fetch a short-lived ZenML Pro server API token that it also
    stores in the Credentials Store
    * for ZenML Pro servers, it exchanges the longer-lived ZenML Pro API
    token into a short lived ZenML Pro server API token

Alongside credentials, the Credentials Store is also used to store additional server information: * ZenML Pro tenant information populated by the zenml login command * ZenML server information populated by the REST zen store by fetching the server's information endpoint after authenticating

Source code in zenml/login/credentials_store.py
class CredentialsStore(metaclass=SingletonMetaClass):
    """Login credentials store.

    This is a singleton object that maintains a cache of all API tokens and API
    keys that are configured for the ZenML servers that the client connects to
    throughout its lifetime.

    The cache is persistent and it is backed by a `credentials.yaml` YAML file
    kept in the global configuration location. The Credentials Store cache is
    populated mainly in the following ways:

        1. when the user runs `zenml login` to authenticate to a ZenML Pro
        server, the ZenML Pro API token fetched from the web login flow is
        stored in the Credentials Store.
        2. when the user runs `zenml login` to authenticate to a regular ZenML
        server with the web login flow, the ZenML server API token fetched
        through the web login flow is stored in the Credentials Store
        3. when the user runs `zenml login` to authenticate to any ZenML server
        using an API key, the API key is stored in the Credentials Store
        4. when the REST zen store is initialized, it starts up not yet
        authenticated. Then, if/when it needs to authenticate or re-authenticate
        to the remote server, it will use whatever form of credentials it finds
        in the Credentials Store, in order of priority:

            * if it finds an API token that is not expired (e.g. authorized
            device API tokens fetched through the web login flow or short-lived
            session API tokens fetched through some other means of
            authentication), it will use that to authenticate
            * for ZenML servers that use an API key to authenticate, it will use
            that to fetch a short-lived ZenML Pro server API token that it also
            stores in the Credentials Store
            * for ZenML Pro servers, it exchanges the longer-lived ZenML Pro API
            token into a short lived ZenML Pro server API token

    Alongside credentials, the Credentials Store is also used to store
    additional server information:
        * ZenML Pro tenant information populated by the `zenml login` command
        * ZenML server information populated by the REST zen store by fetching
        the server's information endpoint after authenticating

    """

    credentials: Dict[str, ServerCredentials]
    last_modified_time: Optional[float] = None

    def __init__(self) -> None:
        """Initializes the login credentials store with values loaded from the credentials YAML file.

        CredentialsStore is a singleton class: only one instance can exist.
        Calling this constructor multiple times will always yield the same
        instance.
        """
        self.credentials = {}
        self._load_credentials()

    @classmethod
    def reset_instance(
        cls, store: Optional["CredentialsStore"] = None
    ) -> None:
        """Reset the singleton instance of the CredentialsStore.

        Args:
            store: Optional instance of the CredentialsStore to set as the
                singleton instance. If None, a new instance will be created.
        """
        current_store = cls.get_instance()
        if current_store is not None and current_store is not store:
            # Delete the credentials file from disk if it exists, otherwise
            # the credentials will be reloaded from the file when the new
            # instance is created and this call will have no effect
            current_store._delete_credentials_file()

        cls._clear(store)  # type: ignore[arg-type]
        if store:
            store._save_credentials()

    @classmethod
    def get_instance(cls) -> Optional["CredentialsStore"]:
        """Get the singleton instance of the CredentialsStore.

        Returns:
            The singleton instance of the CredentialsStore.
        """
        return cast(CredentialsStore, cls._instance())

    @property
    def _credentials_file(self) -> str:
        """Path to the file where the credentials are stored.

        Returns:
            The path to the file where the credentials are stored.
        """
        config_path = GlobalConfiguration().config_directory
        return os.path.join(config_path, CREDENTIALS_STORE_FILENAME)

    def _load_credentials(self) -> None:
        """Load the credentials from the YAML file if it exists."""
        if handle_bool_env_var(ENV_ZENML_DISABLE_CREDENTIALS_DISK_CACHING):
            return

        credentials_file = self._credentials_file
        credentials_store = {}

        if fileio.exists(credentials_file):
            try:
                credentials_store = yaml_utils.read_yaml(credentials_file)
            except Exception as e:
                logger.error(
                    f"Failed to load credentials file {credentials_file}: {e}. "
                )
            self.last_modified_time = os.path.getmtime(credentials_file)

        if credentials_store is None:
            # This can happen for example if the config file is empty
            credentials_store = {}
        elif not isinstance(credentials_store, dict):
            logger.warning(
                f"The credentials file {credentials_file} is corrupted. "
                "Creating a new credentials file."
            )
            credentials_store = {}

        self.credentials = {}
        for server_url, token_data in credentials_store.items():
            try:
                self.credentials[server_url] = ServerCredentials(**token_data)
            except ValueError as e:
                logger.warning(
                    f"Failed to load credentials for {server_url}: {e}. "
                    "Ignoring this token."
                )

    def _save_credentials(self) -> None:
        """Dump the current credentials store to the YAML file."""
        if handle_bool_env_var(ENV_ZENML_DISABLE_CREDENTIALS_DISK_CACHING):
            return
        credentials_file = self._credentials_file
        credentials_store = {
            server_url: credential.model_dump(
                mode="json", exclude_none=True, exclude_unset=True
            )
            for server_url, credential in self.credentials.items()
            # Evict tokens that have expired past the eviction time
            # and have no API key or username/password to fall back on
            if credential.api_key
            or credential.username
            and credential.password is not None
            or credential.api_token
            and (
                not credential.api_token.expires_at
                or credential.api_token.expires_at
                + timedelta(seconds=TOKEN_STORE_EVICTION_TIME)
                > datetime.now(timezone.utc)
            )
        }
        yaml_utils.write_yaml(credentials_file, credentials_store)
        self.last_modified_time = os.path.getmtime(credentials_file)

    def _delete_credentials_file(self) -> None:
        """Delete the credentials file."""
        if handle_bool_env_var(ENV_ZENML_DISABLE_CREDENTIALS_DISK_CACHING):
            return
        credentials_file = self._credentials_file
        if fileio.exists(credentials_file):
            fileio.remove(credentials_file)
            self.last_modified_time = None

    def check_and_reload_from_file(self) -> None:
        """Check if the credentials file has been modified and reload it if necessary."""
        if handle_bool_env_var(ENV_ZENML_DISABLE_CREDENTIALS_DISK_CACHING):
            return
        if not self.last_modified_time:
            return
        credentials_file = self._credentials_file
        try:
            last_modified_time = os.path.getmtime(credentials_file)
        except FileNotFoundError:
            # The credentials file has been deleted
            self.last_modified_time = None
            return
        if last_modified_time != self.last_modified_time:
            self._load_credentials()

    def get_password(
        self, server_url: str
    ) -> Tuple[Optional[str], Optional[str]]:
        """Retrieve the username and password from the credentials store for a specific server URL.

        Args:
            server_url: The server URL for which to retrieve the username and
                password.

        Returns:
            The stored username and password if they exist, None otherwise.
        """
        self.check_and_reload_from_file()
        credential = self.credentials.get(server_url)
        if credential:
            return credential.username, credential.password
        return None, None

    def get_api_key(self, server_url: str) -> Optional[str]:
        """Retrieve an API key from the credentials store for a specific server URL.

        Args:
            server_url: The server URL for which to retrieve the API key.

        Returns:
            The stored API key if it exists, None otherwise.
        """
        self.check_and_reload_from_file()
        credential = self.credentials.get(server_url)
        if credential:
            return credential.api_key
        return None

    def get_token(
        self, server_url: str, allow_expired: bool = False
    ) -> Optional[APIToken]:
        """Retrieve a valid token from the credentials store for a specific server URL.

        Args:
            server_url: The server URL for which to retrieve the token.
            allow_expired: Whether to allow expired tokens to be returned. The
                default behavior is to return None if a token does exist but is
                expired.

        Returns:
            The stored token if it exists and is not expired, None otherwise.
        """
        self.check_and_reload_from_file()
        credential = self.credentials.get(server_url)
        if credential:
            token = credential.api_token
            if token and (not token.expired or allow_expired):
                return token
        return None

    def get_credentials(self, server_url: str) -> Optional[ServerCredentials]:
        """Retrieve the credentials for a specific server URL.

        Args:
            server_url: The server URL for which to retrieve the credentials.

        Returns:
            The stored credentials if they exist, None otherwise.
        """
        self.check_and_reload_from_file()
        return self.credentials.get(server_url)

    def get_pro_token(self, allow_expired: bool = False) -> Optional[APIToken]:
        """Retrieve a valid token from the credentials store for the ZenML Pro API server.

        Args:
            allow_expired: Whether to allow expired tokens to be returned. The
                default behavior is to return None if a token does exist but is
                expired.

        Returns:
            The stored token if it exists and is not expired, None otherwise.
        """
        return self.get_token(ZENML_PRO_API_URL, allow_expired)

    def get_pro_credentials(
        self, allow_expired: bool = False
    ) -> Optional[ServerCredentials]:
        """Retrieve a valid token from the credentials store for the ZenML Pro API server.

        Args:
            allow_expired: Whether to allow expired tokens to be returned. The
                default behavior is to return None if a token does exist but is
                expired.

        Returns:
            The stored credentials if they exist and are not expired, None otherwise.
        """
        credential = self.get_credentials(ZENML_PRO_API_URL)
        if (
            credential
            and credential.api_token
            and (not credential.api_token.expired or allow_expired)
        ):
            return credential
        return None

    def clear_pro_credentials(self) -> None:
        """Delete the token from the store for the ZenML Pro API server."""
        self.clear_token(ZENML_PRO_API_URL)

    def clear_all_pro_tokens(self) -> None:
        """Delete all tokens from the store for ZenML Pro API servers."""
        for server_url, server in self.credentials.copy().items():
            if server.type == ServerType.PRO:
                if server.api_key:
                    continue
                self.clear_token(server_url)

    def has_valid_authentication(self, url: str) -> bool:
        """Check if a valid authentication credential for the given server URL is stored.

        Args:
            url: The server URL for which to check the authentication.

        Returns:
            bool: True if a valid token or API key is stored, False otherwise.
        """
        self.check_and_reload_from_file()
        credential = self.credentials.get(url)

        if not credential:
            return False
        if credential.api_key or (
            credential.username and credential.password is not None
        ):
            return True
        token = credential.api_token
        return token is not None and not token.expired

    def has_valid_pro_authentication(self) -> bool:
        """Check if a valid token for the ZenML Pro API server is stored.

        Returns:
            bool: True if a valid token is stored, False otherwise.
        """
        return self.get_token(ZENML_PRO_API_URL) is not None

    def set_api_key(
        self,
        server_url: str,
        api_key: str,
    ) -> None:
        """Store an API key in the credentials store for a specific server URL.

        If an API token or a password is already stored for the server URL, they
        will be replaced by the API key.

        Args:
            server_url: The server URL for which the token is to be stored.
            api_key: The API key to store.
        """
        self.check_and_reload_from_file()
        credential = self.credentials.get(server_url)
        if credential and credential.api_key != api_key:
            # Reset the API token if a new or updated API key is set, because
            # the current token might have been issued for a different account
            credential.api_token = None
            credential.api_key = api_key
            credential.username = None
            credential.password = None
        else:
            self.credentials[server_url] = ServerCredentials(
                url=server_url, api_key=api_key
            )

        self._save_credentials()

    def set_password(
        self,
        server_url: str,
        username: str,
        password: str,
    ) -> None:
        """Store a username and password in the credentials store for a specific server URL.

        If an API token is already stored for the server URL, it will be
        replaced by the username and password.

        Args:
            server_url: The server URL for which the token is to be stored.
            username: The username to store.
            password: The password to store.
        """
        self.check_and_reload_from_file()
        credential = self.credentials.get(server_url)
        if credential and (
            credential.username != username or credential.password != password
        ):
            # Reset the API token if a new or updated password is set, because
            # the current token might have been issued for a different account
            credential.api_token = None
            credential.username = username
            credential.password = password
            credential.api_key = None
        else:
            self.credentials[server_url] = ServerCredentials(
                url=server_url, username=username, password=password
            )

        self._save_credentials()

    def set_token(
        self,
        server_url: str,
        token_response: OAuthTokenResponse,
    ) -> APIToken:
        """Store an API token received from an OAuth2 server.

        Args:
            server_url: The server URL for which the token is to be stored.
            token_response: Token response received from an OAuth2 server.

        Returns:
            APIToken: The stored token.
        """
        self.check_and_reload_from_file()
        if token_response.expires_in:
            expires_at = datetime.now(timezone.utc) + timedelta(
                seconds=token_response.expires_in
            )
            # Best practice to calculate the leeway depending on the token
            # expiration time:
            #
            # - for short-lived tokens (less than 1 hour), use a fixed leeway of
            # a few seconds (e.g., 30 seconds)
            # - for longer-lived tokens (e.g., 1 hour or more), use a
            # percentage-based leeway of 5-10%
            if token_response.expires_in < 3600:
                leeway = 30
            else:
                leeway = token_response.expires_in // 20
        else:
            expires_at = None
            leeway = None

        api_token = APIToken(
            access_token=token_response.access_token,
            expires_in=token_response.expires_in,
            expires_at=expires_at,
            leeway=leeway,
            cookie_name=token_response.cookie_name,
            device_id=token_response.device_id,
            device_metadata=token_response.device_metadata,
        )

        credential = self.credentials.get(server_url)
        if credential:
            credential.api_token = api_token
        else:
            self.credentials[server_url] = ServerCredentials(
                url=server_url, api_token=api_token
            )

        self._save_credentials()

        return api_token

    def set_bare_token(
        self,
        server_url: str,
        token: str,
    ) -> APIToken:
        """Store a bare API token.

        Args:
            server_url: The server URL for which the token is to be stored.
            token: The token to store.

        Returns:
            APIToken: The stored token.
        """
        self.check_and_reload_from_file()

        api_token = APIToken(
            access_token=token,
        )

        credential = self.credentials.get(server_url)
        if credential:
            credential.api_token = api_token
        else:
            self.credentials[server_url] = ServerCredentials(
                url=server_url, api_token=api_token
            )

        self._save_credentials()

        return api_token

    def update_server_info(
        self,
        server_url: str,
        server_info: Union[ServerModel, TenantRead],
    ) -> None:
        """Update the server information stored for a specific server URL.

        Args:
            server_url: The server URL for which the server information is to be
                updated.
            server_info: Updated server information.
        """
        self.check_and_reload_from_file()

        credential = self.credentials.get(server_url)
        if not credential:
            # No credentials stored for this server URL, nothing to update
            return

        credential.update_server_info(server_info)
        self._save_credentials()

    def clear_token(self, server_url: str) -> None:
        """Delete a token from the store for a specific server URL.

        Args:
            server_url: The server URL for which to delete the token.
        """
        self.check_and_reload_from_file()

        if server_url in self.credentials:
            credential = self.credentials[server_url]
            if (
                not credential.api_key
                and not credential.username
                and not credential.password is not None
            ):
                # Only delete the credential entry if there is no API key or
                # username/password to fall back on
                del self.credentials[server_url]
            else:
                credential.api_token = None
            self._save_credentials()

    def clear_credentials(self, server_url: str) -> None:
        """Delete all credentials from the store for a specific server URL.

        Args:
            server_url: The server URL for which to delete the credentials.
        """
        self.check_and_reload_from_file()

        if server_url in self.credentials:
            del self.credentials[server_url]
            self._save_credentials()

    def list_credentials(
        self, type: Optional[ServerType] = None
    ) -> List[ServerCredentials]:
        """Get all credentials stored in the credentials store.

        Args:
            type: Optional server type to filter the credentials by.

        Returns:
            A list of all credentials stored in the credentials store.
        """
        self.check_and_reload_from_file()

        credentials = list(self.credentials.values())

        if type is not None:
            credentials = [c for c in credentials if c and c.type == type]

        return credentials
__init__(self) special

Initializes the login credentials store with values loaded from the credentials YAML file.

CredentialsStore is a singleton class: only one instance can exist. Calling this constructor multiple times will always yield the same instance.

Source code in zenml/login/credentials_store.py
def __init__(self) -> None:
    """Initializes the login credentials store with values loaded from the credentials YAML file.

    CredentialsStore is a singleton class: only one instance can exist.
    Calling this constructor multiple times will always yield the same
    instance.
    """
    self.credentials = {}
    self._load_credentials()
check_and_reload_from_file(self)

Check if the credentials file has been modified and reload it if necessary.

Source code in zenml/login/credentials_store.py
def check_and_reload_from_file(self) -> None:
    """Check if the credentials file has been modified and reload it if necessary."""
    if handle_bool_env_var(ENV_ZENML_DISABLE_CREDENTIALS_DISK_CACHING):
        return
    if not self.last_modified_time:
        return
    credentials_file = self._credentials_file
    try:
        last_modified_time = os.path.getmtime(credentials_file)
    except FileNotFoundError:
        # The credentials file has been deleted
        self.last_modified_time = None
        return
    if last_modified_time != self.last_modified_time:
        self._load_credentials()
clear_all_pro_tokens(self)

Delete all tokens from the store for ZenML Pro API servers.

Source code in zenml/login/credentials_store.py
def clear_all_pro_tokens(self) -> None:
    """Delete all tokens from the store for ZenML Pro API servers."""
    for server_url, server in self.credentials.copy().items():
        if server.type == ServerType.PRO:
            if server.api_key:
                continue
            self.clear_token(server_url)
clear_credentials(self, server_url)

Delete all credentials from the store for a specific server URL.

Parameters:

Name Type Description Default
server_url str

The server URL for which to delete the credentials.

required
Source code in zenml/login/credentials_store.py
def clear_credentials(self, server_url: str) -> None:
    """Delete all credentials from the store for a specific server URL.

    Args:
        server_url: The server URL for which to delete the credentials.
    """
    self.check_and_reload_from_file()

    if server_url in self.credentials:
        del self.credentials[server_url]
        self._save_credentials()
clear_pro_credentials(self)

Delete the token from the store for the ZenML Pro API server.

Source code in zenml/login/credentials_store.py
def clear_pro_credentials(self) -> None:
    """Delete the token from the store for the ZenML Pro API server."""
    self.clear_token(ZENML_PRO_API_URL)
clear_token(self, server_url)

Delete a token from the store for a specific server URL.

Parameters:

Name Type Description Default
server_url str

The server URL for which to delete the token.

required
Source code in zenml/login/credentials_store.py
def clear_token(self, server_url: str) -> None:
    """Delete a token from the store for a specific server URL.

    Args:
        server_url: The server URL for which to delete the token.
    """
    self.check_and_reload_from_file()

    if server_url in self.credentials:
        credential = self.credentials[server_url]
        if (
            not credential.api_key
            and not credential.username
            and not credential.password is not None
        ):
            # Only delete the credential entry if there is no API key or
            # username/password to fall back on
            del self.credentials[server_url]
        else:
            credential.api_token = None
        self._save_credentials()
get_api_key(self, server_url)

Retrieve an API key from the credentials store for a specific server URL.

Parameters:

Name Type Description Default
server_url str

The server URL for which to retrieve the API key.

required

Returns:

Type Description
Optional[str]

The stored API key if it exists, None otherwise.

Source code in zenml/login/credentials_store.py
def get_api_key(self, server_url: str) -> Optional[str]:
    """Retrieve an API key from the credentials store for a specific server URL.

    Args:
        server_url: The server URL for which to retrieve the API key.

    Returns:
        The stored API key if it exists, None otherwise.
    """
    self.check_and_reload_from_file()
    credential = self.credentials.get(server_url)
    if credential:
        return credential.api_key
    return None
get_credentials(self, server_url)

Retrieve the credentials for a specific server URL.

Parameters:

Name Type Description Default
server_url str

The server URL for which to retrieve the credentials.

required

Returns:

Type Description
Optional[zenml.login.credentials.ServerCredentials]

The stored credentials if they exist, None otherwise.

Source code in zenml/login/credentials_store.py
def get_credentials(self, server_url: str) -> Optional[ServerCredentials]:
    """Retrieve the credentials for a specific server URL.

    Args:
        server_url: The server URL for which to retrieve the credentials.

    Returns:
        The stored credentials if they exist, None otherwise.
    """
    self.check_and_reload_from_file()
    return self.credentials.get(server_url)
get_instance() classmethod

Get the singleton instance of the CredentialsStore.

Returns:

Type Description
Optional[CredentialsStore]

The singleton instance of the CredentialsStore.

Source code in zenml/login/credentials_store.py
@classmethod
def get_instance(cls) -> Optional["CredentialsStore"]:
    """Get the singleton instance of the CredentialsStore.

    Returns:
        The singleton instance of the CredentialsStore.
    """
    return cast(CredentialsStore, cls._instance())
get_password(self, server_url)

Retrieve the username and password from the credentials store for a specific server URL.

Parameters:

Name Type Description Default
server_url str

The server URL for which to retrieve the username and password.

required

Returns:

Type Description
Tuple[Optional[str], Optional[str]]

The stored username and password if they exist, None otherwise.

Source code in zenml/login/credentials_store.py
def get_password(
    self, server_url: str
) -> Tuple[Optional[str], Optional[str]]:
    """Retrieve the username and password from the credentials store for a specific server URL.

    Args:
        server_url: The server URL for which to retrieve the username and
            password.

    Returns:
        The stored username and password if they exist, None otherwise.
    """
    self.check_and_reload_from_file()
    credential = self.credentials.get(server_url)
    if credential:
        return credential.username, credential.password
    return None, None
get_pro_credentials(self, allow_expired=False)

Retrieve a valid token from the credentials store for the ZenML Pro API server.

Parameters:

Name Type Description Default
allow_expired bool

Whether to allow expired tokens to be returned. The default behavior is to return None if a token does exist but is expired.

False

Returns:

Type Description
Optional[zenml.login.credentials.ServerCredentials]

The stored credentials if they exist and are not expired, None otherwise.

Source code in zenml/login/credentials_store.py
def get_pro_credentials(
    self, allow_expired: bool = False
) -> Optional[ServerCredentials]:
    """Retrieve a valid token from the credentials store for the ZenML Pro API server.

    Args:
        allow_expired: Whether to allow expired tokens to be returned. The
            default behavior is to return None if a token does exist but is
            expired.

    Returns:
        The stored credentials if they exist and are not expired, None otherwise.
    """
    credential = self.get_credentials(ZENML_PRO_API_URL)
    if (
        credential
        and credential.api_token
        and (not credential.api_token.expired or allow_expired)
    ):
        return credential
    return None
get_pro_token(self, allow_expired=False)

Retrieve a valid token from the credentials store for the ZenML Pro API server.

Parameters:

Name Type Description Default
allow_expired bool

Whether to allow expired tokens to be returned. The default behavior is to return None if a token does exist but is expired.

False

Returns:

Type Description
Optional[zenml.login.credentials.APIToken]

The stored token if it exists and is not expired, None otherwise.

Source code in zenml/login/credentials_store.py
def get_pro_token(self, allow_expired: bool = False) -> Optional[APIToken]:
    """Retrieve a valid token from the credentials store for the ZenML Pro API server.

    Args:
        allow_expired: Whether to allow expired tokens to be returned. The
            default behavior is to return None if a token does exist but is
            expired.

    Returns:
        The stored token if it exists and is not expired, None otherwise.
    """
    return self.get_token(ZENML_PRO_API_URL, allow_expired)
get_token(self, server_url, allow_expired=False)

Retrieve a valid token from the credentials store for a specific server URL.

Parameters:

Name Type Description Default
server_url str

The server URL for which to retrieve the token.

required
allow_expired bool

Whether to allow expired tokens to be returned. The default behavior is to return None if a token does exist but is expired.

False

Returns:

Type Description
Optional[zenml.login.credentials.APIToken]

The stored token if it exists and is not expired, None otherwise.

Source code in zenml/login/credentials_store.py
def get_token(
    self, server_url: str, allow_expired: bool = False
) -> Optional[APIToken]:
    """Retrieve a valid token from the credentials store for a specific server URL.

    Args:
        server_url: The server URL for which to retrieve the token.
        allow_expired: Whether to allow expired tokens to be returned. The
            default behavior is to return None if a token does exist but is
            expired.

    Returns:
        The stored token if it exists and is not expired, None otherwise.
    """
    self.check_and_reload_from_file()
    credential = self.credentials.get(server_url)
    if credential:
        token = credential.api_token
        if token and (not token.expired or allow_expired):
            return token
    return None
has_valid_authentication(self, url)

Check if a valid authentication credential for the given server URL is stored.

Parameters:

Name Type Description Default
url str

The server URL for which to check the authentication.

required

Returns:

Type Description
bool

True if a valid token or API key is stored, False otherwise.

Source code in zenml/login/credentials_store.py
def has_valid_authentication(self, url: str) -> bool:
    """Check if a valid authentication credential for the given server URL is stored.

    Args:
        url: The server URL for which to check the authentication.

    Returns:
        bool: True if a valid token or API key is stored, False otherwise.
    """
    self.check_and_reload_from_file()
    credential = self.credentials.get(url)

    if not credential:
        return False
    if credential.api_key or (
        credential.username and credential.password is not None
    ):
        return True
    token = credential.api_token
    return token is not None and not token.expired
has_valid_pro_authentication(self)

Check if a valid token for the ZenML Pro API server is stored.

Returns:

Type Description
bool

True if a valid token is stored, False otherwise.

Source code in zenml/login/credentials_store.py
def has_valid_pro_authentication(self) -> bool:
    """Check if a valid token for the ZenML Pro API server is stored.

    Returns:
        bool: True if a valid token is stored, False otherwise.
    """
    return self.get_token(ZENML_PRO_API_URL) is not None
list_credentials(self, type=None)

Get all credentials stored in the credentials store.

Parameters:

Name Type Description Default
type Optional[zenml.login.credentials.ServerType]

Optional server type to filter the credentials by.

None

Returns:

Type Description
List[zenml.login.credentials.ServerCredentials]

A list of all credentials stored in the credentials store.

Source code in zenml/login/credentials_store.py
def list_credentials(
    self, type: Optional[ServerType] = None
) -> List[ServerCredentials]:
    """Get all credentials stored in the credentials store.

    Args:
        type: Optional server type to filter the credentials by.

    Returns:
        A list of all credentials stored in the credentials store.
    """
    self.check_and_reload_from_file()

    credentials = list(self.credentials.values())

    if type is not None:
        credentials = [c for c in credentials if c and c.type == type]

    return credentials
reset_instance(store=None) classmethod

Reset the singleton instance of the CredentialsStore.

Parameters:

Name Type Description Default
store Optional[CredentialsStore]

Optional instance of the CredentialsStore to set as the singleton instance. If None, a new instance will be created.

None
Source code in zenml/login/credentials_store.py
@classmethod
def reset_instance(
    cls, store: Optional["CredentialsStore"] = None
) -> None:
    """Reset the singleton instance of the CredentialsStore.

    Args:
        store: Optional instance of the CredentialsStore to set as the
            singleton instance. If None, a new instance will be created.
    """
    current_store = cls.get_instance()
    if current_store is not None and current_store is not store:
        # Delete the credentials file from disk if it exists, otherwise
        # the credentials will be reloaded from the file when the new
        # instance is created and this call will have no effect
        current_store._delete_credentials_file()

    cls._clear(store)  # type: ignore[arg-type]
    if store:
        store._save_credentials()
set_api_key(self, server_url, api_key)

Store an API key in the credentials store for a specific server URL.

If an API token or a password is already stored for the server URL, they will be replaced by the API key.

Parameters:

Name Type Description Default
server_url str

The server URL for which the token is to be stored.

required
api_key str

The API key to store.

required
Source code in zenml/login/credentials_store.py
def set_api_key(
    self,
    server_url: str,
    api_key: str,
) -> None:
    """Store an API key in the credentials store for a specific server URL.

    If an API token or a password is already stored for the server URL, they
    will be replaced by the API key.

    Args:
        server_url: The server URL for which the token is to be stored.
        api_key: The API key to store.
    """
    self.check_and_reload_from_file()
    credential = self.credentials.get(server_url)
    if credential and credential.api_key != api_key:
        # Reset the API token if a new or updated API key is set, because
        # the current token might have been issued for a different account
        credential.api_token = None
        credential.api_key = api_key
        credential.username = None
        credential.password = None
    else:
        self.credentials[server_url] = ServerCredentials(
            url=server_url, api_key=api_key
        )

    self._save_credentials()
set_bare_token(self, server_url, token)

Store a bare API token.

Parameters:

Name Type Description Default
server_url str

The server URL for which the token is to be stored.

required
token str

The token to store.

required

Returns:

Type Description
APIToken

The stored token.

Source code in zenml/login/credentials_store.py
def set_bare_token(
    self,
    server_url: str,
    token: str,
) -> APIToken:
    """Store a bare API token.

    Args:
        server_url: The server URL for which the token is to be stored.
        token: The token to store.

    Returns:
        APIToken: The stored token.
    """
    self.check_and_reload_from_file()

    api_token = APIToken(
        access_token=token,
    )

    credential = self.credentials.get(server_url)
    if credential:
        credential.api_token = api_token
    else:
        self.credentials[server_url] = ServerCredentials(
            url=server_url, api_token=api_token
        )

    self._save_credentials()

    return api_token
set_password(self, server_url, username, password)

Store a username and password in the credentials store for a specific server URL.

If an API token is already stored for the server URL, it will be replaced by the username and password.

Parameters:

Name Type Description Default
server_url str

The server URL for which the token is to be stored.

required
username str

The username to store.

required
password str

The password to store.

required
Source code in zenml/login/credentials_store.py
def set_password(
    self,
    server_url: str,
    username: str,
    password: str,
) -> None:
    """Store a username and password in the credentials store for a specific server URL.

    If an API token is already stored for the server URL, it will be
    replaced by the username and password.

    Args:
        server_url: The server URL for which the token is to be stored.
        username: The username to store.
        password: The password to store.
    """
    self.check_and_reload_from_file()
    credential = self.credentials.get(server_url)
    if credential and (
        credential.username != username or credential.password != password
    ):
        # Reset the API token if a new or updated password is set, because
        # the current token might have been issued for a different account
        credential.api_token = None
        credential.username = username
        credential.password = password
        credential.api_key = None
    else:
        self.credentials[server_url] = ServerCredentials(
            url=server_url, username=username, password=password
        )

    self._save_credentials()
set_token(self, server_url, token_response)

Store an API token received from an OAuth2 server.

Parameters:

Name Type Description Default
server_url str

The server URL for which the token is to be stored.

required
token_response OAuthTokenResponse

Token response received from an OAuth2 server.

required

Returns:

Type Description
APIToken

The stored token.

Source code in zenml/login/credentials_store.py
def set_token(
    self,
    server_url: str,
    token_response: OAuthTokenResponse,
) -> APIToken:
    """Store an API token received from an OAuth2 server.

    Args:
        server_url: The server URL for which the token is to be stored.
        token_response: Token response received from an OAuth2 server.

    Returns:
        APIToken: The stored token.
    """
    self.check_and_reload_from_file()
    if token_response.expires_in:
        expires_at = datetime.now(timezone.utc) + timedelta(
            seconds=token_response.expires_in
        )
        # Best practice to calculate the leeway depending on the token
        # expiration time:
        #
        # - for short-lived tokens (less than 1 hour), use a fixed leeway of
        # a few seconds (e.g., 30 seconds)
        # - for longer-lived tokens (e.g., 1 hour or more), use a
        # percentage-based leeway of 5-10%
        if token_response.expires_in < 3600:
            leeway = 30
        else:
            leeway = token_response.expires_in // 20
    else:
        expires_at = None
        leeway = None

    api_token = APIToken(
        access_token=token_response.access_token,
        expires_in=token_response.expires_in,
        expires_at=expires_at,
        leeway=leeway,
        cookie_name=token_response.cookie_name,
        device_id=token_response.device_id,
        device_metadata=token_response.device_metadata,
    )

    credential = self.credentials.get(server_url)
    if credential:
        credential.api_token = api_token
    else:
        self.credentials[server_url] = ServerCredentials(
            url=server_url, api_token=api_token
        )

    self._save_credentials()

    return api_token
update_server_info(self, server_url, server_info)

Update the server information stored for a specific server URL.

Parameters:

Name Type Description Default
server_url str

The server URL for which the server information is to be updated.

required
server_info Union[zenml.models.v2.misc.server_models.ServerModel, zenml.login.pro.tenant.models.TenantRead]

Updated server information.

required
Source code in zenml/login/credentials_store.py
def update_server_info(
    self,
    server_url: str,
    server_info: Union[ServerModel, TenantRead],
) -> None:
    """Update the server information stored for a specific server URL.

    Args:
        server_url: The server URL for which the server information is to be
            updated.
        server_info: Updated server information.
    """
    self.check_and_reload_from_file()

    credential = self.credentials.get(server_url)
    if not credential:
        # No credentials stored for this server URL, nothing to update
        return

    credential.update_server_info(server_info)
    self._save_credentials()

get_credentials_store()

Get the global credentials store instance.

Returns:

Type Description
CredentialsStore

The global credentials store instance.

Source code in zenml/login/credentials_store.py
def get_credentials_store() -> CredentialsStore:
    """Get the global credentials store instance.

    Returns:
        The global credentials store instance.
    """
    return CredentialsStore()

pro special

ZenML Pro client.

client

ZenML Pro client.

ZenMLProClient

ZenML Pro client.

Source code in zenml/login/pro/client.py
class ZenMLProClient(metaclass=SingletonMetaClass):
    """ZenML Pro client."""

    _url: str
    _api_token: APIToken
    _session: Optional[requests.Session] = None
    _tenant: Optional["TenantClient"] = None
    _organization: Optional["OrganizationClient"] = None

    def __init__(
        self, url: Optional[str] = None, api_token: Optional[APIToken] = None
    ) -> None:
        """Initialize the ZenML Pro client.

        Args:
            url: The URL of the ZenML Pro API server. If not provided, the
                default ZenML Pro API server URL is used.
            api_token: The API token to use for authentication. If not provided,
                the token is fetched from the credentials store.

        Raises:
            AuthorizationException: If no API token is provided and no token
                is found in the credentials store.
        """
        self._url = url or ZENML_PRO_API_URL
        if api_token is None:
            logger.debug(
                "No ZenML Pro API token provided. Fetching from credentials "
                "store."
            )
            api_token = get_credentials_store().get_token(
                server_url=self._url, allow_expired=True
            )
            if api_token is None:
                raise AuthorizationException(
                    "No ZenML Pro API token found. Please run 'zenml login' to "
                    "login to ZenML Pro."
                )

        self._api_token = api_token

    @property
    def tenant(self) -> "TenantClient":
        """Get the tenant client.

        Returns:
            The tenant client.
        """
        if self._tenant is None:
            from zenml.login.pro.tenant.client import TenantClient

            self._tenant = TenantClient(client=self)
        return self._tenant

    @property
    def organization(self) -> "OrganizationClient":
        """Get the organization client.

        Returns:
            The organization client.
        """
        if self._organization is None:
            from zenml.login.pro.organization.client import OrganizationClient

            self._organization = OrganizationClient(client=self)
        return self._organization

    @property
    def api_token(self) -> str:
        """Get the API token.

        Returns:
            The API token.
        """
        return self._api_token.access_token

    def raise_on_expired_api_token(self) -> None:
        """Raise an exception if the API token has expired.

        Raises:
            AuthorizationException: If the API token has expired.
        """
        if self._api_token and self._api_token.expired:
            raise AuthorizationException(
                "Your ZenML Pro authentication has expired. Please run "
                "'zenml login' to login again."
            )

    @property
    def session(self) -> requests.Session:
        """Authenticate to the ZenML Pro API server.

        Returns:
            A requests session with the authentication token.
        """
        # Check if the API token has expired before every call to the server.
        # This prevents unwanted authorization errors from being raised during
        # the call itself.
        self.raise_on_expired_api_token()
        if self._session is None:
            self._session = requests.Session()
            retries = Retry(backoff_factor=0.1, connect=5)
            self._session.mount("https://", HTTPAdapter(max_retries=retries))
            self._session.mount("http://", HTTPAdapter(max_retries=retries))
            self._session.headers.update(
                {"Authorization": "Bearer " + self.api_token}
            )
            logger.debug("Authenticated to ZenML Pro server.")
        return self._session

    @staticmethod
    def _handle_response(response: requests.Response) -> Json:
        """Handle API response, translating http status codes to Exception.

        Args:
            response: The response to handle.

        Returns:
            The parsed response.

        Raises:
            ValueError: if the response is not in the right format.
            RuntimeError: if an error response is received from the server
                and a more specific exception cannot be determined.
            exc: the exception converted from an error response, if one
                is returned from the server.
        """
        if 200 <= response.status_code < 300:
            try:
                payload: Json = response.json()
                return payload
            except requests.exceptions.JSONDecodeError:
                raise ValueError(
                    "Bad response from API. Expected json, got\n"
                    f"{response.text}"
                )
        elif response.status_code >= 400:
            exc = exception_from_response(response)
            if exc is not None:
                raise exc
            else:
                raise RuntimeError(
                    f"{response.status_code} HTTP Error received from server: "
                    f"{response.text}"
                )
        else:
            raise RuntimeError(
                "Error retrieving from API. Got response "
                f"{response.status_code} with body:\n{response.text}"
            )

    def _request(
        self,
        method: str,
        url: str,
        params: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> Json:
        """Make a request to the REST API.

        Args:
            method: The HTTP method to use.
            url: The URL to request.
            params: The query parameters to pass to the endpoint.
            kwargs: Additional keyword arguments to pass to the request.

        Returns:
            The parsed response.

        Raises:
            AuthorizationException: if the request fails due to an expired
                authentication token.
        """
        params = {k: str(v) for k, v in params.items()} if params else {}

        self.session.headers.update(
            {source_context.name: source_context.get().value}
        )

        try:
            return self._handle_response(
                self.session.request(
                    method,
                    url,
                    params=params,
                    **kwargs,
                )
            )
        except AuthorizationException:
            # Check if this is caused by an expired API token.
            self.raise_on_expired_api_token()

            # If not, raise the exception.
            raise

    def get(
        self,
        path: str,
        params: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> Json:
        """Make a GET request to the given endpoint path.

        Args:
            path: The path to the endpoint.
            params: The query parameters to pass to the endpoint.
            kwargs: Additional keyword arguments to pass to the request.

        Returns:
            The response body.
        """
        logger.debug(f"Sending GET request to {path}...")
        return self._request(
            "GET",
            self._url + path,
            params=params,
            **kwargs,
        )

    def delete(
        self,
        path: str,
        params: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> Json:
        """Make a DELETE request to the given endpoint path.

        Args:
            path: The path to the endpoint.
            params: The query parameters to pass to the endpoint.
            kwargs: Additional keyword arguments to pass to the request.

        Returns:
            The response body.
        """
        logger.debug(f"Sending DELETE request to {path}...")
        return self._request(
            "DELETE",
            self._url + path,
            params=params,
            **kwargs,
        )

    def post(
        self,
        path: str,
        body: BaseRestAPIModel,
        params: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> Json:
        """Make a POST request to the given endpoint path.

        Args:
            path: The path to the endpoint.
            body: The body to send.
            params: The query parameters to pass to the endpoint.
            kwargs: Additional keyword arguments to pass to the request.

        Returns:
            The response body.
        """
        logger.debug(f"Sending POST request to {path}...")
        return self._request(
            "POST",
            self._url + path,
            json=body.model_dump(mode="json"),
            params=params,
            **kwargs,
        )

    def put(
        self,
        path: str,
        body: Optional[BaseRestAPIModel] = None,
        params: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> Json:
        """Make a PUT request to the given endpoint path.

        Args:
            path: The path to the endpoint.
            body: The body to send.
            params: The query parameters to pass to the endpoint.
            kwargs: Additional keyword arguments to pass to the request.

        Returns:
            The response body.
        """
        logger.debug(f"Sending PUT request to {path}...")
        json = (
            body.model_dump(mode="json", exclude_unset=True) if body else None
        )
        return self._request(
            "PUT",
            self._url + path,
            json=json,
            params=params,
            **kwargs,
        )

    def patch(
        self,
        path: str,
        body: Optional[BaseRestAPIModel] = None,
        params: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> Json:
        """Make a PATCH request to the given endpoint path.

        Args:
            path: The path to the endpoint.
            body: The body to send.
            params: The query parameters to pass to the endpoint.
            kwargs: Additional keyword arguments to pass to the request.

        Returns:
            The response body.
        """
        logger.debug(f"Sending PATCH request to {path}...")
        json = (
            body.model_dump(mode="json", exclude_unset=True) if body else None
        )
        return self._request(
            "PATCH",
            self._url + path,
            json=json,
            params=params,
            **kwargs,
        )

    def _create_resource(
        self,
        resource: BaseRestAPIModel,
        response_model: Type[AnyResponse],
        route: str,
        params: Optional[Dict[str, Any]] = None,
    ) -> AnyResponse:
        """Create a new resource.

        Args:
            resource: The resource to create.
            route: The resource REST API route to use.
            response_model: Optional model to use to deserialize the response
                body. If not provided, the resource class itself will be used.
            params: Optional query parameters to pass to the endpoint.

        Returns:
            The created resource.
        """
        response_body = self.post(f"{route}", body=resource, params=params)

        return response_model.model_validate(response_body)

    def _get_resource(
        self,
        resource_id: Union[str, int, UUID],
        route: str,
        response_model: Type[AnyResponse],
        **params: Any,
    ) -> AnyResponse:
        """Retrieve a single resource.

        Args:
            resource_id: The ID of the resource to retrieve.
            route: The resource REST API route to use.
            response_model: Model to use to serialize the response body.
            params: Optional query parameters to pass to the endpoint.

        Returns:
            The retrieved resource.
        """
        # leave out filter params that are not supplied
        params = dict(filter(lambda x: x[1] is not None, params.items()))
        body = self.get(f"{route}/{str(resource_id)}", params=params)
        return response_model.model_validate(body)

    def _list_resources(
        self,
        route: str,
        response_model: Type[AnyResponse],
        **params: Any,
    ) -> List[AnyResponse]:
        """Retrieve a list of resources filtered by some criteria.

        Args:
            route: The resource REST API route to use.
            response_model: Model to use to serialize the response body.
            params: Filter parameters to use in the query.

        Returns:
            List of retrieved resources matching the filter criteria.

        Raises:
            ValueError: If the value returned by the server is not a list.
        """
        # leave out filter params that are not supplied
        params = dict(filter(lambda x: x[1] is not None, params.items()))
        body = self.get(f"{route}", params=params)
        if not isinstance(body, list):
            raise ValueError(
                f"Bad API Response. Expected list, got {type(body)}"
            )
        return [response_model.model_validate(entry) for entry in body]

    def _update_resource(
        self,
        resource_id: Union[str, int, UUID],
        resource_update: BaseRestAPIModel,
        response_model: Type[AnyResponse],
        route: str,
        **params: Any,
    ) -> AnyResponse:
        """Update an existing resource.

        Args:
            resource_id: The id of the resource to update.
            resource_update: The resource update.
            response_model: Optional model to use to deserialize the response
                body. If not provided, the resource class itself will be used.
            route: The resource REST API route to use.
            params: Optional query parameters to pass to the endpoint.

        Returns:
            The updated resource.
        """
        # leave out filter params that are not supplied
        params = dict(filter(lambda x: x[1] is not None, params.items()))
        response_body = self.put(
            f"{route}/{str(resource_id)}", body=resource_update, params=params
        )

        return response_model.model_validate(response_body)

    def _delete_resource(
        self, resource_id: Union[str, UUID], route: str
    ) -> None:
        """Delete a resource.

        Args:
            resource_id: The ID of the resource to delete.
            route: The resource REST API route to use.
        """
        self.delete(f"{route}/{str(resource_id)}")
api_token: str property readonly

Get the API token.

Returns:

Type Description
str

The API token.

organization: OrganizationClient property readonly

Get the organization client.

Returns:

Type Description
OrganizationClient

The organization client.

session: Session property readonly

Authenticate to the ZenML Pro API server.

Returns:

Type Description
Session

A requests session with the authentication token.

tenant: TenantClient property readonly

Get the tenant client.

Returns:

Type Description
TenantClient

The tenant client.

__init__(self, url=None, api_token=None) special

Initialize the ZenML Pro client.

Parameters:

Name Type Description Default
url Optional[str]

The URL of the ZenML Pro API server. If not provided, the default ZenML Pro API server URL is used.

None
api_token Optional[zenml.login.credentials.APIToken]

The API token to use for authentication. If not provided, the token is fetched from the credentials store.

None

Exceptions:

Type Description
AuthorizationException

If no API token is provided and no token is found in the credentials store.

Source code in zenml/login/pro/client.py
def __init__(
    self, url: Optional[str] = None, api_token: Optional[APIToken] = None
) -> None:
    """Initialize the ZenML Pro client.

    Args:
        url: The URL of the ZenML Pro API server. If not provided, the
            default ZenML Pro API server URL is used.
        api_token: The API token to use for authentication. If not provided,
            the token is fetched from the credentials store.

    Raises:
        AuthorizationException: If no API token is provided and no token
            is found in the credentials store.
    """
    self._url = url or ZENML_PRO_API_URL
    if api_token is None:
        logger.debug(
            "No ZenML Pro API token provided. Fetching from credentials "
            "store."
        )
        api_token = get_credentials_store().get_token(
            server_url=self._url, allow_expired=True
        )
        if api_token is None:
            raise AuthorizationException(
                "No ZenML Pro API token found. Please run 'zenml login' to "
                "login to ZenML Pro."
            )

    self._api_token = api_token
delete(self, path, params=None, **kwargs)

Make a DELETE request to the given endpoint path.

Parameters:

Name Type Description Default
path str

The path to the endpoint.

required
params Optional[Dict[str, Any]]

The query parameters to pass to the endpoint.

None
kwargs Any

Additional keyword arguments to pass to the request.

{}

Returns:

Type Description
Union[Dict[str, Any], List[Any], str, int, float, bool]

The response body.

Source code in zenml/login/pro/client.py
def delete(
    self,
    path: str,
    params: Optional[Dict[str, Any]] = None,
    **kwargs: Any,
) -> Json:
    """Make a DELETE request to the given endpoint path.

    Args:
        path: The path to the endpoint.
        params: The query parameters to pass to the endpoint.
        kwargs: Additional keyword arguments to pass to the request.

    Returns:
        The response body.
    """
    logger.debug(f"Sending DELETE request to {path}...")
    return self._request(
        "DELETE",
        self._url + path,
        params=params,
        **kwargs,
    )
get(self, path, params=None, **kwargs)

Make a GET request to the given endpoint path.

Parameters:

Name Type Description Default
path str

The path to the endpoint.

required
params Optional[Dict[str, Any]]

The query parameters to pass to the endpoint.

None
kwargs Any

Additional keyword arguments to pass to the request.

{}

Returns:

Type Description
Union[Dict[str, Any], List[Any], str, int, float, bool]

The response body.

Source code in zenml/login/pro/client.py
def get(
    self,
    path: str,
    params: Optional[Dict[str, Any]] = None,
    **kwargs: Any,
) -> Json:
    """Make a GET request to the given endpoint path.

    Args:
        path: The path to the endpoint.
        params: The query parameters to pass to the endpoint.
        kwargs: Additional keyword arguments to pass to the request.

    Returns:
        The response body.
    """
    logger.debug(f"Sending GET request to {path}...")
    return self._request(
        "GET",
        self._url + path,
        params=params,
        **kwargs,
    )
patch(self, path, body=None, params=None, **kwargs)

Make a PATCH request to the given endpoint path.

Parameters:

Name Type Description Default
path str

The path to the endpoint.

required
body Optional[zenml.login.pro.models.BaseRestAPIModel]

The body to send.

None
params Optional[Dict[str, Any]]

The query parameters to pass to the endpoint.

None
kwargs Any

Additional keyword arguments to pass to the request.

{}

Returns:

Type Description
Union[Dict[str, Any], List[Any], str, int, float, bool]

The response body.

Source code in zenml/login/pro/client.py
def patch(
    self,
    path: str,
    body: Optional[BaseRestAPIModel] = None,
    params: Optional[Dict[str, Any]] = None,
    **kwargs: Any,
) -> Json:
    """Make a PATCH request to the given endpoint path.

    Args:
        path: The path to the endpoint.
        body: The body to send.
        params: The query parameters to pass to the endpoint.
        kwargs: Additional keyword arguments to pass to the request.

    Returns:
        The response body.
    """
    logger.debug(f"Sending PATCH request to {path}...")
    json = (
        body.model_dump(mode="json", exclude_unset=True) if body else None
    )
    return self._request(
        "PATCH",
        self._url + path,
        json=json,
        params=params,
        **kwargs,
    )
post(self, path, body, params=None, **kwargs)

Make a POST request to the given endpoint path.

Parameters:

Name Type Description Default
path str

The path to the endpoint.

required
body BaseRestAPIModel

The body to send.

required
params Optional[Dict[str, Any]]

The query parameters to pass to the endpoint.

None
kwargs Any

Additional keyword arguments to pass to the request.

{}

Returns:

Type Description
Union[Dict[str, Any], List[Any], str, int, float, bool]

The response body.

Source code in zenml/login/pro/client.py
def post(
    self,
    path: str,
    body: BaseRestAPIModel,
    params: Optional[Dict[str, Any]] = None,
    **kwargs: Any,
) -> Json:
    """Make a POST request to the given endpoint path.

    Args:
        path: The path to the endpoint.
        body: The body to send.
        params: The query parameters to pass to the endpoint.
        kwargs: Additional keyword arguments to pass to the request.

    Returns:
        The response body.
    """
    logger.debug(f"Sending POST request to {path}...")
    return self._request(
        "POST",
        self._url + path,
        json=body.model_dump(mode="json"),
        params=params,
        **kwargs,
    )
put(self, path, body=None, params=None, **kwargs)

Make a PUT request to the given endpoint path.

Parameters:

Name Type Description Default
path str

The path to the endpoint.

required
body Optional[zenml.login.pro.models.BaseRestAPIModel]

The body to send.

None
params Optional[Dict[str, Any]]

The query parameters to pass to the endpoint.

None
kwargs Any

Additional keyword arguments to pass to the request.

{}

Returns:

Type Description
Union[Dict[str, Any], List[Any], str, int, float, bool]

The response body.

Source code in zenml/login/pro/client.py
def put(
    self,
    path: str,
    body: Optional[BaseRestAPIModel] = None,
    params: Optional[Dict[str, Any]] = None,
    **kwargs: Any,
) -> Json:
    """Make a PUT request to the given endpoint path.

    Args:
        path: The path to the endpoint.
        body: The body to send.
        params: The query parameters to pass to the endpoint.
        kwargs: Additional keyword arguments to pass to the request.

    Returns:
        The response body.
    """
    logger.debug(f"Sending PUT request to {path}...")
    json = (
        body.model_dump(mode="json", exclude_unset=True) if body else None
    )
    return self._request(
        "PUT",
        self._url + path,
        json=json,
        params=params,
        **kwargs,
    )
raise_on_expired_api_token(self)

Raise an exception if the API token has expired.

Exceptions:

Type Description
AuthorizationException

If the API token has expired.

Source code in zenml/login/pro/client.py
def raise_on_expired_api_token(self) -> None:
    """Raise an exception if the API token has expired.

    Raises:
        AuthorizationException: If the API token has expired.
    """
    if self._api_token and self._api_token.expired:
        raise AuthorizationException(
            "Your ZenML Pro authentication has expired. Please run "
            "'zenml login' to login again."
        )

constants

ZenML Pro login constants.

models

ZenML Pro base models.

BaseRestAPIModel (BaseModel)

Base class for all REST API models.

Source code in zenml/login/pro/models.py
class BaseRestAPIModel(BaseModel):
    """Base class for all REST API models."""

    model_config = ConfigDict(
        # Allow extra attributes to allow compatibility with future versions
        extra="allow",
    )

organization special

ZenML Pro organization client.

client

ZenML Pro organization client.

OrganizationClient

Organization management client.

Source code in zenml/login/pro/organization/client.py
class OrganizationClient:
    """Organization management client."""

    def __init__(
        self,
        client: ZenMLProClient,
    ):
        """Initialize the organization client.

        Args:
            client: ZenML Pro client.
        """
        self.client = client

    def get(
        self,
        id_or_name: Union[UUID, str],
    ) -> OrganizationRead:
        """Get an organization by id or name.

        Args:
            id_or_name: Id or name of the organization to retrieve.

        Returns:
            An organization.
        """
        return self.client._get_resource(
            resource_id=id_or_name,
            route=ORGANIZATIONS_ROUTE,
            response_model=OrganizationRead,
        )

    async def list(
        self,
        offset: int = 0,
        limit: int = 20,
    ) -> List[OrganizationRead]:
        """List organizations.

        Args:
            offset: Query offset.
            limit: Query limit.

        Returns:
            List of organizations.
        """
        return self.client._list_resources(
            route=ORGANIZATIONS_ROUTE,
            response_model=OrganizationRead,
            offset=offset,
            limit=limit,
        )
__init__(self, client) special

Initialize the organization client.

Parameters:

Name Type Description Default
client ZenMLProClient

ZenML Pro client.

required
Source code in zenml/login/pro/organization/client.py
def __init__(
    self,
    client: ZenMLProClient,
):
    """Initialize the organization client.

    Args:
        client: ZenML Pro client.
    """
    self.client = client
get(self, id_or_name)

Get an organization by id or name.

Parameters:

Name Type Description Default
id_or_name Union[uuid.UUID, str]

Id or name of the organization to retrieve.

required

Returns:

Type Description
OrganizationRead

An organization.

Source code in zenml/login/pro/organization/client.py
def get(
    self,
    id_or_name: Union[UUID, str],
) -> OrganizationRead:
    """Get an organization by id or name.

    Args:
        id_or_name: Id or name of the organization to retrieve.

    Returns:
        An organization.
    """
    return self.client._get_resource(
        resource_id=id_or_name,
        route=ORGANIZATIONS_ROUTE,
        response_model=OrganizationRead,
    )
list(self, offset=0, limit=20) async

List organizations.

Parameters:

Name Type Description Default
offset int

Query offset.

0
limit int

Query limit.

20

Returns:

Type Description
List[zenml.login.pro.organization.models.OrganizationRead]

List of organizations.

Source code in zenml/login/pro/organization/client.py
async def list(
    self,
    offset: int = 0,
    limit: int = 20,
) -> List[OrganizationRead]:
    """List organizations.

    Args:
        offset: Query offset.
        limit: Query limit.

    Returns:
        List of organizations.
    """
    return self.client._list_resources(
        route=ORGANIZATIONS_ROUTE,
        response_model=OrganizationRead,
        offset=offset,
        limit=limit,
    )
models

ZenML Pro organization models.

OrganizationRead (BaseRestAPIModel)

Model for viewing organizations.

Source code in zenml/login/pro/organization/models.py
class OrganizationRead(BaseRestAPIModel):
    """Model for viewing organizations."""

    id: UUID

    name: str
    description: Optional[str] = None

    created: datetime
    updated: datetime

tenant special

ZenML Pro tenant client.

client

ZenML Pro tenant client.

TenantClient

Tenant management client.

Source code in zenml/login/pro/tenant/client.py
class TenantClient:
    """Tenant management client."""

    def __init__(
        self,
        client: ZenMLProClient,
    ):
        """Initialize the tenant client.

        Args:
            client: ZenML Pro client.
        """
        self.client = client

    def get(self, id: UUID) -> TenantRead:
        """Get a tenant by id.

        Args:
            id: Id. of the tenant to retrieve.

        Returns:
            A tenant.
        """
        return self.client._get_resource(
            resource_id=id,
            route=TENANTS_ROUTE,
            response_model=TenantRead,
        )

    def list(
        self,
        offset: int = 0,
        limit: int = 20,
        tenant_name: Optional[str] = None,
        url: Optional[str] = None,
        organization_id: Optional[UUID] = None,
        status: Optional[TenantStatus] = None,
        member_only: bool = False,
    ) -> List[TenantRead]:
        """List tenants.

        Args:
            offset: Offset to use for filtering.
            limit: Limit used for filtering.
            tenant_name: Tenant name to filter by.
            url: Tenant service URL to filter by.
            organization_id: Organization ID to filter by.
            status: Filter for only tenants with this status.
            member_only: If True, only list tenants where the user is a member
                (i.e. users that can connect to the tenant).

        Returns:
            List of tenants.
        """
        return self.client._list_resources(
            route=TENANTS_ROUTE,
            response_model=TenantRead,
            offset=offset,
            limit=limit,
            tenant_name=tenant_name,
            url=url,
            organization_id=organization_id,
            status=status,
            member_only=member_only,
        )
__init__(self, client) special

Initialize the tenant client.

Parameters:

Name Type Description Default
client ZenMLProClient

ZenML Pro client.

required
Source code in zenml/login/pro/tenant/client.py
def __init__(
    self,
    client: ZenMLProClient,
):
    """Initialize the tenant client.

    Args:
        client: ZenML Pro client.
    """
    self.client = client
get(self, id)

Get a tenant by id.

Parameters:

Name Type Description Default
id UUID

Id. of the tenant to retrieve.

required

Returns:

Type Description
TenantRead

A tenant.

Source code in zenml/login/pro/tenant/client.py
def get(self, id: UUID) -> TenantRead:
    """Get a tenant by id.

    Args:
        id: Id. of the tenant to retrieve.

    Returns:
        A tenant.
    """
    return self.client._get_resource(
        resource_id=id,
        route=TENANTS_ROUTE,
        response_model=TenantRead,
    )
list(self, offset=0, limit=20, tenant_name=None, url=None, organization_id=None, status=None, member_only=False)

List tenants.

Parameters:

Name Type Description Default
offset int

Offset to use for filtering.

0
limit int

Limit used for filtering.

20
tenant_name Optional[str]

Tenant name to filter by.

None
url Optional[str]

Tenant service URL to filter by.

None
organization_id Optional[uuid.UUID]

Organization ID to filter by.

None
status Optional[zenml.login.pro.tenant.models.TenantStatus]

Filter for only tenants with this status.

None
member_only bool

If True, only list tenants where the user is a member (i.e. users that can connect to the tenant).

False

Returns:

Type Description
List[zenml.login.pro.tenant.models.TenantRead]

List of tenants.

Source code in zenml/login/pro/tenant/client.py
def list(
    self,
    offset: int = 0,
    limit: int = 20,
    tenant_name: Optional[str] = None,
    url: Optional[str] = None,
    organization_id: Optional[UUID] = None,
    status: Optional[TenantStatus] = None,
    member_only: bool = False,
) -> List[TenantRead]:
    """List tenants.

    Args:
        offset: Offset to use for filtering.
        limit: Limit used for filtering.
        tenant_name: Tenant name to filter by.
        url: Tenant service URL to filter by.
        organization_id: Organization ID to filter by.
        status: Filter for only tenants with this status.
        member_only: If True, only list tenants where the user is a member
            (i.e. users that can connect to the tenant).

    Returns:
        List of tenants.
    """
    return self.client._list_resources(
        route=TENANTS_ROUTE,
        response_model=TenantRead,
        offset=offset,
        limit=limit,
        tenant_name=tenant_name,
        url=url,
        organization_id=organization_id,
        status=status,
        member_only=member_only,
    )
models

ZenML Pro tenant models.

TenantRead (BaseRestAPIModel)

Pydantic Model for viewing a Tenant.

Source code in zenml/login/pro/tenant/models.py
class TenantRead(BaseRestAPIModel):
    """Pydantic Model for viewing a Tenant."""

    id: UUID

    name: str
    description: Optional[str] = Field(
        default=None, description="The description of the tenant."
    )

    organization: OrganizationRead

    desired_state: str = Field(description="The desired state of the tenant.")
    state_reason: str = Field(
        description="The reason for the current tenant state.",
    )
    status: str = Field(
        description="The current operational state of the tenant."
    )
    zenml_service: ZenMLServiceRead = Field(description="The ZenML service.")

    @property
    def organization_id(self) -> UUID:
        """Get the organization id.

        Returns:
            The organization id.
        """
        return self.organization.id

    @property
    def organization_name(self) -> str:
        """Get the organization name.

        Returns:
            The organization name.
        """
        return self.organization.name

    @property
    def version(self) -> Optional[str]:
        """Get the ZenML service version.

        Returns:
            The ZenML service version.
        """
        version = self.zenml_service.configuration.version
        if self.zenml_service.status and self.zenml_service.status.version:
            version = self.zenml_service.status.version

        return version

    @property
    def url(self) -> Optional[str]:
        """Get the ZenML server URL.

        Returns:
            The ZenML server URL, if available.
        """
        return (
            self.zenml_service.status.server_url
            if self.zenml_service.status
            else None
        )

    @property
    def dashboard_url(self) -> str:
        """Get the URL to the ZenML Pro dashboard for this tenant.

        Returns:
            The URL to the ZenML Pro dashboard for this tenant.
        """
        return (
            ZENML_PRO_URL
            + f"/organizations/{str(self.organization_id)}/tenants/{str(self.id)}"
        )

    @property
    def dashboard_organization_url(self) -> str:
        """Get the URL to the ZenML Pro dashboard for this tenant's organization.

        Returns:
            The URL to the ZenML Pro dashboard for this tenant's organization.
        """
        return ZENML_PRO_URL + f"/organizations/{str(self.organization_id)}"
dashboard_organization_url: str property readonly

Get the URL to the ZenML Pro dashboard for this tenant's organization.

Returns:

Type Description
str

The URL to the ZenML Pro dashboard for this tenant's organization.

dashboard_url: str property readonly

Get the URL to the ZenML Pro dashboard for this tenant.

Returns:

Type Description
str

The URL to the ZenML Pro dashboard for this tenant.

organization_id: UUID property readonly

Get the organization id.

Returns:

Type Description
UUID

The organization id.

organization_name: str property readonly

Get the organization name.

Returns:

Type Description
str

The organization name.

url: Optional[str] property readonly

Get the ZenML server URL.

Returns:

Type Description
Optional[str]

The ZenML server URL, if available.

version: Optional[str] property readonly

Get the ZenML service version.

Returns:

Type Description
Optional[str]

The ZenML service version.

TenantStatus (StrEnum)

Enum that represents the desired state or status of a tenant.

These values can be used in two places:

  • in the desired_state field of a tenant object, to indicate the desired state of the tenant (with the exception of PENDING and FAILED which are not valid values for desired_state)
  • in the status field of a tenant object, to indicate the current state of the tenant
Source code in zenml/login/pro/tenant/models.py
class TenantStatus(StrEnum):
    """Enum that represents the desired state or status of a tenant.

    These values can be used in two places:

    * in the `desired_state` field of a tenant object, to indicate the desired
    state of the tenant (with the exception of `PENDING` and `FAILED` which
    are not valid values for `desired_state`)
    * in the `status` field of a tenant object, to indicate the current state
    of the tenant
    """

    # Tenant hasn't been deployed yet (i.e. newly created) or has been fully
    # deleted by the infrastructure provider
    NOT_INITIALIZED = "not_initialized"
    # Tenant is being processed by the infrastructure provider (is being
    # deployed, updated, deactivated, re-activated or deleted/cleaned up).
    PENDING = "pending"
    # Tenant is up and running
    AVAILABLE = "available"
    # Tenant is in a failure state (i.e. deployment, update or deletion failed)
    FAILED = "failed"
    # Tenant is deactivated
    DEACTIVATED = "deactivated"
    # Tenant resources have been deleted by the infrastructure provider but
    # the tenant object still exists in the database
    DELETED = "deleted"
ZenMLServiceConfiguration (BaseRestAPIModel)

ZenML service configuration.

Source code in zenml/login/pro/tenant/models.py
class ZenMLServiceConfiguration(BaseRestAPIModel):
    """ZenML service configuration."""

    version: str = Field(
        description="The ZenML version.",
    )
ZenMLServiceRead (BaseRestAPIModel)

Pydantic Model for viewing a ZenML service.

Source code in zenml/login/pro/tenant/models.py
class ZenMLServiceRead(BaseRestAPIModel):
    """Pydantic Model for viewing a ZenML service."""

    configuration: ZenMLServiceConfiguration = Field(
        description="The service configuration."
    )

    status: Optional[ZenMLServiceStatus] = Field(
        default=None,
        description="Information about the service status. Only set if the "
        "service is deployed and active.",
    )
ZenMLServiceStatus (BaseRestAPIModel)

ZenML service status.

Source code in zenml/login/pro/tenant/models.py
class ZenMLServiceStatus(BaseRestAPIModel):
    """ZenML service status."""

    server_url: str = Field(
        description="The ZenML server URL.",
    )
    version: Optional[str] = Field(
        default=None,
        description="The ZenML server version.",
    )

utils

ZenML Pro login utils.

get_troubleshooting_instructions(url)

Get troubleshooting instructions for a given ZenML Pro server URL.

Parameters:

Name Type Description Default
url str

ZenML Pro server URL

required

Returns:

Type Description
str

Troubleshooting instructions

Source code in zenml/login/pro/utils.py
def get_troubleshooting_instructions(url: str) -> str:
    """Get troubleshooting instructions for a given ZenML Pro server URL.

    Args:
        url: ZenML Pro server URL

    Returns:
        Troubleshooting instructions
    """
    credentials_store = get_credentials_store()
    if credentials_store.has_valid_pro_authentication():
        client = ZenMLProClient()

        try:
            servers = client.tenant.list(url=url, member_only=False)
        except Exception as e:
            logger.debug(f"Failed to list tenants: {e}")
        else:
            if servers:
                server = servers[0]
                if server.status == TenantStatus.AVAILABLE:
                    return (
                        f"The '{server.name}' ZenML Pro server that the client "
                        "is connected to is currently running but you may not "
                        "have the necessary permissions to access it. Please "
                        "contact your ZenML Pro administrator for more "
                        "information or try to manage the server members "
                        "yourself if you have the necessary permissions by "
                        f"visiting the ZenML Pro tenant page at {server.dashboard_url}."
                    )
                if server.status == TenantStatus.DEACTIVATED:
                    return (
                        f"The '{server.name}' ZenML Pro server that the client "
                        "is connected to has been deactivated. "
                        "Please contact your ZenML Pro administrator for more "
                        "information or to reactivate the server yourself if "
                        "you have the necessary permissions by visiting the "
                        f"ZenML Pro Organization page at {server.dashboard_organization_url}."
                    )
                if server.status == TenantStatus.PENDING:
                    return (
                        f"The '{server.name}' ZenML Pro server that the client "
                        "is connected to is currently undergoing maintenance "
                        "(e.g. being deployed, upgraded or re-activated). "
                        "Please try again later or contact your ZenML Pro "
                        "administrator for more information. You can also "
                        f"visit the ZenML Pro tenant page at {server.dashboard_url}."
                    )
                return (
                    f"The '{server.name}' ZenML Pro server that the client "
                    "is connected to is currently in a failed "
                    "state. Please contact your ZenML Pro administrator for "
                    "more information or try to re-deploy the server "
                    "yourself if you have the necessary permissions by "
                    "visiting the ZenML Pro Organization page at "
                    f"{server.dashboard_organization_url}."
                )

            return (
                f"The ZenML Pro server at URL '{url}' that the client is "
                "connected to does not exist or you may not have access to it. "
                "Please check the URL and your permissions and try again or "
                "connect your client to a different server by running `zenml "
                "login` or by using a service account API key."
            )

    return (
        f"The ZenML Pro server at URL '{url}' that the client is connected to "
        "does not exist, is not running, or you do not have permissions to "
        "connect to it. Please check the URL and your permissions "
        "and try again. The ZenML Pro server might have been deactivated or is "
        "currently pending maintenance. Please contact your ZenML Pro "
        "administrator for more information or try to manage the server "
        "state by visiting the ZenML Pro dashboard."
    )
is_zenml_pro_server_url(url)

Check if a given URL is a ZenML Pro server.

Parameters:

Name Type Description Default
url str

URL to check

required

Returns:

Type Description
bool

True if the URL is a ZenML Pro tenant, False otherwise

Source code in zenml/login/pro/utils.py
def is_zenml_pro_server_url(url: str) -> bool:
    """Check if a given URL is a ZenML Pro server.

    Args:
        url: URL to check

    Returns:
        True if the URL is a ZenML Pro tenant, False otherwise
    """
    domain_regex = ZENML_PRO_SERVER_SUBDOMAIN.replace(".", r"\.")
    return bool(
        re.match(
            r"^(https://)?[a-zA-Z0-9-\.]+\.{domain}/?$".format(
                domain=domain_regex
            ),
            url,
        )
    )

web_login

ZenML OAuth2 device authorization grant client support.

web_login(url=None, verify_ssl=None)

Implements the OAuth2 Device Authorization Grant flow.

This function implements the client side of the OAuth2 Device Authorization Grant flow as defined in https://tools.ietf.org/html/rfc8628, with the following customizations:

  • the unique ZenML client ID (user_id in the global config) is used as the OAuth2 client ID value
  • additional information is added to the user agent header to be used by users to identify the ZenML client

Upon completion of the flow, the access token is saved in the credentials store.

Parameters:

Name Type Description Default
url Optional[str]

The URL of the OAuth2 server. If not provided, the ZenML Pro API server is used by default.

None
verify_ssl Union[str, bool]

Whether to verify the SSL certificate of the OAuth2 server. If a string is passed, it is interpreted as the path to a CA bundle file.

None

Returns:

Type Description
APIToken

The response returned by the OAuth2 server.

Exceptions:

Type Description
AuthorizationException

If an error occurred during the authorization process.

Source code in zenml/login/web_login.py
def web_login(
    url: Optional[str] = None, verify_ssl: Optional[Union[str, bool]] = None
) -> APIToken:
    """Implements the OAuth2 Device Authorization Grant flow.

    This function implements the client side of the OAuth2 Device Authorization
    Grant flow as defined in https://tools.ietf.org/html/rfc8628, with the
    following customizations:

    * the unique ZenML client ID (`user_id` in the global config) is used
    as the OAuth2 client ID value
    * additional information is added to the user agent header to be used by
    users to identify the ZenML client

    Upon completion of the flow, the access token is saved in the credentials store.

    Args:
        url: The URL of the OAuth2 server. If not provided, the ZenML Pro API
            server is used by default.
        verify_ssl: Whether to verify the SSL certificate of the OAuth2 server.
            If a string is passed, it is interpreted as the path to a CA bundle
            file.

    Returns:
        The response returned by the OAuth2 server.

    Raises:
        AuthorizationException: If an error occurred during the authorization
            process.
    """
    from zenml.login.credentials_store import get_credentials_store
    from zenml.models import (
        OAuthDeviceAuthorizationRequest,
        OAuthDeviceAuthorizationResponse,
        OAuthDeviceTokenRequest,
        OAuthDeviceUserAgentHeader,
        OAuthTokenResponse,
    )

    credentials_store = get_credentials_store()

    # Make a request to the OAuth2 server to get the device code and user code.
    # The client ID used for the request is the unique ID of the ZenML client.
    response: Optional[requests.Response] = None

    # Add the following information in the user agent header to be used by users
    # to identify the ZenML client:
    #
    # * the ZenML version
    # * the python version
    # * the OS type
    # * the hostname
    #
    user_agent_header = OAuthDeviceUserAgentHeader(
        hostname=platform.node(),
        zenml_version=__version__,
        python_version=platform.python_version(),
        os=platform.system(),
    )

    zenml_pro = False
    if not url:
        # If no URL is provided, we use the ZenML Pro API server by default
        zenml_pro = True
        url = base_url = ZENML_PRO_API_URL
    else:
        # Get rid of any trailing slashes to prevent issues when having double
        # slashes in the URL
        url = url.rstrip("/")
        if is_zenml_pro_server_url(url):
            # This is a ZenML Pro server. The device authentication is done
            # through the ZenML Pro API.
            zenml_pro = True
            base_url = ZENML_PRO_API_URL
        else:
            base_url = url

    auth_request = OAuthDeviceAuthorizationRequest(
        client_id=GlobalConfiguration().user_id
    )

    # If an existing token is found in the credentials store, we reuse its
    # device ID to avoid creating a new device ID for the same device.
    existing_token = credentials_store.get_token(url)
    if existing_token and existing_token.device_id:
        auth_request.device_id = existing_token.device_id

    if zenml_pro:
        auth_url = base_url + AUTH + DEVICE_AUTHORIZATION
        login_url = base_url + AUTH + LOGIN
    else:
        auth_url = base_url + API + VERSION_1 + DEVICE_AUTHORIZATION
        login_url = base_url + API + VERSION_1 + LOGIN

    try:
        response = requests.post(
            auth_url,
            headers={
                "Content-Type": "application/x-www-form-urlencoded",
                "User-Agent": user_agent_header.encode(),
            },
            data=auth_request.model_dump(exclude_none=True),
            verify=verify_ssl,
            timeout=DEFAULT_HTTP_TIMEOUT,
        )
        if response.status_code == 200:
            auth_response = OAuthDeviceAuthorizationResponse(**response.json())
        else:
            logger.info(f"Error: {response.status_code} {response.text}")
            raise AuthorizationException(
                f"Could not connect to {base_url}. Please check the URL."
            )
    except (requests.exceptions.JSONDecodeError, ValueError, TypeError):
        logger.exception("Bad response received from API server.")
        raise AuthorizationException(
            "Bad response received from API server. Please check the URL."
        )
    except requests.exceptions.RequestException:
        logger.exception("Could not connect to API server.")
        raise AuthorizationException(
            f"Could not connect to {base_url}. Please check the URL."
        )

    # Open the verification URL in the user's browser
    verification_uri = (
        auth_response.verification_uri_complete
        or auth_response.verification_uri
    )
    if verification_uri.startswith("/"):
        # If the verification URI is a relative path, we need to add the base
        # URL to it
        verification_uri = base_url + verification_uri
    webbrowser.open(verification_uri)
    logger.info(
        f"If your browser did not open automatically, please open the "
        f"following URL into your browser to proceed with the authentication:"
        f"\n\n{verification_uri}\n"
    )

    # Poll the OAuth2 server until the user has authorized the device
    token_request = OAuthDeviceTokenRequest(
        device_code=auth_response.device_code,
        client_id=auth_request.client_id,
    )
    expires_in = auth_response.expires_in
    interval = auth_response.interval
    token_response: OAuthTokenResponse
    while True:
        response = requests.post(
            login_url,
            headers={"Content-Type": "application/x-www-form-urlencoded"},
            data=token_request.model_dump(),
            verify=verify_ssl,
            timeout=DEFAULT_HTTP_TIMEOUT,
        )
        if response.status_code == 200:
            # The user has authorized the device, so we can extract the access token
            token_response = OAuthTokenResponse(**response.json())
            if zenml_pro:
                logger.info("Successfully logged in to ZenML Pro.")
            else:
                logger.info(f"Successfully logged in to {url}.")
            break
        elif response.status_code == 400:
            try:
                error_response = OAuthError(**response.json())
            except (
                requests.exceptions.JSONDecodeError,
                ValueError,
                TypeError,
            ):
                raise AuthorizationException(
                    f"Error received from {base_url}: {response.text}"
                )

            if error_response.error == "authorization_pending":
                # The user hasn't authorized the device yet, so we wait for the
                # interval and try again
                pass
            elif error_response.error == "slow_down":
                # The OAuth2 server is asking us to slow down our polling
                interval += 5
            else:
                # There was another error with the request
                raise AuthorizationException(
                    f"Error: {error_response.error} {error_response.error_description}"
                )

            expires_in -= interval
            if expires_in <= 0:
                raise AuthorizationException(
                    "User did not authorize the device in time."
                )
            time.sleep(interval)
        else:
            # There was another error with the request
            raise AuthorizationException(
                f"Error: {response.status_code} {response.json()['error']}"
            )

    # Save the token in the credentials store
    return credentials_store.set_token(url, token_response)