Login
zenml.login
special
ZenML login utilities.
credentials
ZenML login credentials models.
APIToken (BaseModel)
Cached API Token.
Source code in zenml/login/credentials.py
class APIToken(BaseModel):
"""Cached API Token."""
access_token: str
expires_in: Optional[int] = None
expires_at: Optional[datetime] = None
leeway: Optional[int] = None
cookie_name: Optional[str] = None
device_id: Optional[UUID] = None
device_metadata: Optional[Dict[str, Any]] = None
@property
def expires_at_with_leeway(self) -> Optional[datetime]:
"""Get the token expiration time with leeway.
Returns:
The token expiration time with leeway.
"""
if not self.expires_at:
return None
if not self.leeway:
return self.expires_at
return self.expires_at - timedelta(seconds=self.leeway)
@property
def expired(self) -> bool:
"""Check if the token is expired.
Returns:
bool: True if the token is expired, False otherwise.
"""
expires_at = self.expires_at_with_leeway
if not expires_at:
return False
return expires_at < datetime.now(timezone.utc)
model_config = ConfigDict(
# Allow extra attributes to allow backwards compatibility
extra="allow",
)
expired: bool
property
readonly
Check if the token is expired.
Returns:
Type | Description |
---|---|
bool |
True if the token is expired, False otherwise. |
expires_at_with_leeway: Optional[datetime.datetime]
property
readonly
Get the token expiration time with leeway.
Returns:
Type | Description |
---|---|
Optional[datetime.datetime] |
The token expiration time with leeway. |
ServerCredentials (BaseModel)
Cached Server Credentials.
Source code in zenml/login/credentials.py
class ServerCredentials(BaseModel):
"""Cached Server Credentials."""
url: str
api_key: Optional[str] = None
api_token: Optional[APIToken] = None
username: Optional[str] = None
password: Optional[str] = None
# Extra server attributes
server_id: Optional[UUID] = None
server_name: Optional[str] = None
organization_name: Optional[str] = None
organization_id: Optional[UUID] = None
status: Optional[str] = None
version: Optional[str] = None
@property
def id(self) -> str:
"""Get the server identifier.
Returns:
The server identifier.
"""
if self.server_id:
return str(self.server_id)
return self.url
@property
def type(self) -> ServerType:
"""Get the server type.
Returns:
The server type.
"""
from zenml.login.pro.utils import is_zenml_pro_server_url
if self.url == ZENML_PRO_API_URL:
return ServerType.PRO_API
if self.organization_id or is_zenml_pro_server_url(self.url):
return ServerType.PRO
if urlparse(self.url).hostname in [
"localhost",
"127.0.0.1",
"host.docker.internal",
]:
return ServerType.LOCAL
return ServerType.REMOTE
def update_server_info(
self, server_info: Union[ServerModel, TenantRead]
) -> None:
"""Update with server information received from the server itself or from a ZenML Pro tenant descriptor.
Args:
server_info: The server information to update with.
"""
if isinstance(server_info, ServerModel):
# The server ID doesn't change during the lifetime of the server
self.server_id = self.server_id or server_info.id
# All other attributes can change during the lifetime of the server
server_name = (
server_info.metadata.get("tenant_name") or server_info.name
)
if server_name:
self.server_name = server_name
organization_id = server_info.metadata.get("organization_id")
if organization_id:
self.organization_id = UUID(organization_id)
self.version = server_info.version or self.version
# The server information was retrieved from the server itself, so we
# can assume that the server is available
self.status = "available"
else:
self.server_id = server_info.id
self.server_name = server_info.name
self.organization_name = server_info.organization_name
self.organization_id = server_info.organization_id
self.status = server_info.status
self.version = server_info.version
@property
def is_available(self) -> bool:
"""Check if the server is available (running and authenticated).
Returns:
True if the server is available, False otherwise.
"""
if self.status not in [TenantStatus.AVAILABLE, ServiceState.ACTIVE]:
return False
if (
self.api_key
or self.api_token
or self.username
and self.password is not None
or self.type in [ServerType.PRO, ServerType.LOCAL]
):
return True
if self.api_token and not self.api_token.expired:
return True
return False
@property
def auth_status(self) -> str:
"""Get the authentication status.
Returns:
The authentication status.
"""
if self.api_key:
return "API key"
if self.username and self.password is not None:
return "password"
if not self.api_token:
if self.type == ServerType.LOCAL:
return "no authentication required"
return "N/A"
expires_at = self.api_token.expires_at_with_leeway
if not expires_at:
return "never expires"
if expires_at < datetime.now(timezone.utc):
return "expired at " + self.expires_at
return f"valid until {self.expires_at} (in {self.expires_in})"
@property
def expires_at(self) -> str:
"""Get the expiration time of the token as a string.
Returns:
The expiration time of the token as a string.
"""
if not self.api_token:
return "N/A"
expires_at = self.api_token.expires_at_with_leeway
if not expires_at:
return "never"
# Convert the date in the local timezone
local_expires_at = expires_at.astimezone()
return local_expires_at.strftime("%Y-%m-%d %H:%M:%S %Z")
@property
def expires_in(self) -> str:
"""Get the time remaining until the token expires.
Returns:
The time remaining until the token expires.
"""
if not self.api_token:
return "N/A"
expires_at = self.api_token.expires_at_with_leeway
if not expires_at:
return "never"
# Get the time remaining until the token expires
expires_in = expires_at - datetime.now(timezone.utc)
return get_human_readable_time(expires_in.total_seconds())
@property
def dashboard_url(self) -> str:
"""Get the URL to the ZenML dashboard for this server.
Returns:
The URL to the ZenML dashboard for this server.
"""
if self.organization_id and self.server_id:
return (
ZENML_PRO_URL
+ f"/organizations/{str(self.organization_id)}/tenants/{str(self.server_id)}"
)
return self.url
@property
def dashboard_organization_url(self) -> str:
"""Get the URL to the ZenML Pro dashboard for this tenant's organization.
Returns:
The URL to the ZenML Pro dashboard for this tenant's organization.
"""
if self.organization_id:
return (
ZENML_PRO_URL + f"/organizations/{str(self.organization_id)}"
)
return ""
@property
def dashboard_hyperlink(self) -> str:
"""Get the hyperlink to the ZenML dashboard for this tenant.
Returns:
The hyperlink to the ZenML dashboard for this tenant.
"""
return f"[link={self.dashboard_url}]{self.dashboard_url}[/link]"
@property
def api_hyperlink(self) -> str:
"""Get the hyperlink to the ZenML OpenAPI dashboard for this tenant.
Returns:
The hyperlink to the ZenML OpenAPI dashboard for this tenant.
"""
api_url = self.url + "/docs"
return f"[link={api_url}]{self.url}[/link]"
@property
def server_name_hyperlink(self) -> str:
"""Get the hyperlink to the ZenML dashboard for this server using its name.
Returns:
The hyperlink to the ZenML dashboard for this server using its name.
"""
if self.server_name is None:
return "N/A"
return f"[link={self.dashboard_url}]{self.server_name}[/link]"
@property
def server_id_hyperlink(self) -> str:
"""Get the hyperlink to the ZenML dashboard for this server using its ID.
Returns:
The hyperlink to the ZenML dashboard for this server using its ID.
"""
if self.server_id is None:
return "N/A"
return f"[link={self.dashboard_url}]{str(self.server_id)}[/link]"
@property
def organization_hyperlink(self) -> str:
"""Get the hyperlink to the ZenML Pro dashboard for this server's organization.
Returns:
The hyperlink to the ZenML Pro dashboard for this server's
organization.
"""
if self.organization_name:
return self.organization_name_hyperlink
if self.organization_id:
return self.organization_id_hyperlink
return "N/A"
@property
def organization_name_hyperlink(self) -> str:
"""Get the hyperlink to the ZenML Pro dashboard for this server's organization using its name.
Returns:
The hyperlink to the ZenML Pro dashboard for this server's
organization using its name.
"""
if self.organization_name is None:
return "N/A"
return f"[link={self.dashboard_organization_url}]{self.organization_name}[/link]"
@property
def organization_id_hyperlink(self) -> str:
"""Get the hyperlink to the ZenML Pro dashboard for this tenant's organization using its ID.
Returns:
The hyperlink to the ZenML Pro dashboard for this tenant's
organization using its ID.
"""
if self.organization_id is None:
return "N/A"
return f"[link={self.dashboard_organization_url}]{self.organization_id}[/link]"
api_hyperlink: str
property
readonly
Get the hyperlink to the ZenML OpenAPI dashboard for this tenant.
Returns:
Type | Description |
---|---|
str |
The hyperlink to the ZenML OpenAPI dashboard for this tenant. |
auth_status: str
property
readonly
Get the authentication status.
Returns:
Type | Description |
---|---|
str |
The authentication status. |
dashboard_hyperlink: str
property
readonly
Get the hyperlink to the ZenML dashboard for this tenant.
Returns:
Type | Description |
---|---|
str |
The hyperlink to the ZenML dashboard for this tenant. |
dashboard_organization_url: str
property
readonly
Get the URL to the ZenML Pro dashboard for this tenant's organization.
Returns:
Type | Description |
---|---|
str |
The URL to the ZenML Pro dashboard for this tenant's organization. |
dashboard_url: str
property
readonly
Get the URL to the ZenML dashboard for this server.
Returns:
Type | Description |
---|---|
str |
The URL to the ZenML dashboard for this server. |
expires_at: str
property
readonly
Get the expiration time of the token as a string.
Returns:
Type | Description |
---|---|
str |
The expiration time of the token as a string. |
expires_in: str
property
readonly
Get the time remaining until the token expires.
Returns:
Type | Description |
---|---|
str |
The time remaining until the token expires. |
id: str
property
readonly
Get the server identifier.
Returns:
Type | Description |
---|---|
str |
The server identifier. |
is_available: bool
property
readonly
Check if the server is available (running and authenticated).
Returns:
Type | Description |
---|---|
bool |
True if the server is available, False otherwise. |
organization_hyperlink: str
property
readonly
Get the hyperlink to the ZenML Pro dashboard for this server's organization.
Returns:
Type | Description |
---|---|
str |
The hyperlink to the ZenML Pro dashboard for this server's organization. |
organization_id_hyperlink: str
property
readonly
Get the hyperlink to the ZenML Pro dashboard for this tenant's organization using its ID.
Returns:
Type | Description |
---|---|
str |
The hyperlink to the ZenML Pro dashboard for this tenant's organization using its ID. |
organization_name_hyperlink: str
property
readonly
Get the hyperlink to the ZenML Pro dashboard for this server's organization using its name.
Returns:
Type | Description |
---|---|
str |
The hyperlink to the ZenML Pro dashboard for this server's organization using its name. |
server_id_hyperlink: str
property
readonly
Get the hyperlink to the ZenML dashboard for this server using its ID.
Returns:
Type | Description |
---|---|
str |
The hyperlink to the ZenML dashboard for this server using its ID. |
server_name_hyperlink: str
property
readonly
Get the hyperlink to the ZenML dashboard for this server using its name.
Returns:
Type | Description |
---|---|
str |
The hyperlink to the ZenML dashboard for this server using its name. |
type: ServerType
property
readonly
Get the server type.
Returns:
Type | Description |
---|---|
ServerType |
The server type. |
update_server_info(self, server_info)
Update with server information received from the server itself or from a ZenML Pro tenant descriptor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
server_info |
Union[zenml.models.v2.misc.server_models.ServerModel, zenml.login.pro.tenant.models.TenantRead] |
The server information to update with. |
required |
Source code in zenml/login/credentials.py
def update_server_info(
self, server_info: Union[ServerModel, TenantRead]
) -> None:
"""Update with server information received from the server itself or from a ZenML Pro tenant descriptor.
Args:
server_info: The server information to update with.
"""
if isinstance(server_info, ServerModel):
# The server ID doesn't change during the lifetime of the server
self.server_id = self.server_id or server_info.id
# All other attributes can change during the lifetime of the server
server_name = (
server_info.metadata.get("tenant_name") or server_info.name
)
if server_name:
self.server_name = server_name
organization_id = server_info.metadata.get("organization_id")
if organization_id:
self.organization_id = UUID(organization_id)
self.version = server_info.version or self.version
# The server information was retrieved from the server itself, so we
# can assume that the server is available
self.status = "available"
else:
self.server_id = server_info.id
self.server_name = server_info.name
self.organization_name = server_info.organization_name
self.organization_id = server_info.organization_id
self.status = server_info.status
self.version = server_info.version
ServerType (StrEnum)
The type of server.
Source code in zenml/login/credentials.py
class ServerType(StrEnum):
"""The type of server."""
PRO_API = "PRO_API"
PRO = "PRO"
REMOTE = "REMOTE"
LOCAL = "LOCAL"
credentials_store
ZenML login credentials store support.
CredentialsStore
Login credentials store.
This is a singleton object that maintains a cache of all API tokens and API keys that are configured for the ZenML servers that the client connects to throughout its lifetime.
The cache is persistent and it is backed by a credentials.yaml
YAML file
kept in the global configuration location. The Credentials Store cache is
populated mainly in the following ways:
1. when the user runs `zenml login` to authenticate to a ZenML Pro
server, the ZenML Pro API token fetched from the web login flow is
stored in the Credentials Store.
2. when the user runs `zenml login` to authenticate to a regular ZenML
server with the web login flow, the ZenML server API token fetched
through the web login flow is stored in the Credentials Store
3. when the user runs `zenml login` to authenticate to any ZenML server
using an API key, the API key is stored in the Credentials Store
4. when the REST zen store is initialized, it starts up not yet
authenticated. Then, if/when it needs to authenticate or re-authenticate
to the remote server, it will use whatever form of credentials it finds
in the Credentials Store, in order of priority:
* if it finds an API token that is not expired (e.g. authorized
device API tokens fetched through the web login flow or short-lived
session API tokens fetched through some other means of
authentication), it will use that to authenticate
* for ZenML servers that use an API key to authenticate, it will use
that to fetch a short-lived ZenML Pro server API token that it also
stores in the Credentials Store
* for ZenML Pro servers, it exchanges the longer-lived ZenML Pro API
token into a short lived ZenML Pro server API token
Alongside credentials, the Credentials Store is also used to store
additional server information:
* ZenML Pro tenant information populated by the zenml login
command
* ZenML server information populated by the REST zen store by fetching
the server's information endpoint after authenticating
Source code in zenml/login/credentials_store.py
class CredentialsStore(metaclass=SingletonMetaClass):
"""Login credentials store.
This is a singleton object that maintains a cache of all API tokens and API
keys that are configured for the ZenML servers that the client connects to
throughout its lifetime.
The cache is persistent and it is backed by a `credentials.yaml` YAML file
kept in the global configuration location. The Credentials Store cache is
populated mainly in the following ways:
1. when the user runs `zenml login` to authenticate to a ZenML Pro
server, the ZenML Pro API token fetched from the web login flow is
stored in the Credentials Store.
2. when the user runs `zenml login` to authenticate to a regular ZenML
server with the web login flow, the ZenML server API token fetched
through the web login flow is stored in the Credentials Store
3. when the user runs `zenml login` to authenticate to any ZenML server
using an API key, the API key is stored in the Credentials Store
4. when the REST zen store is initialized, it starts up not yet
authenticated. Then, if/when it needs to authenticate or re-authenticate
to the remote server, it will use whatever form of credentials it finds
in the Credentials Store, in order of priority:
* if it finds an API token that is not expired (e.g. authorized
device API tokens fetched through the web login flow or short-lived
session API tokens fetched through some other means of
authentication), it will use that to authenticate
* for ZenML servers that use an API key to authenticate, it will use
that to fetch a short-lived ZenML Pro server API token that it also
stores in the Credentials Store
* for ZenML Pro servers, it exchanges the longer-lived ZenML Pro API
token into a short lived ZenML Pro server API token
Alongside credentials, the Credentials Store is also used to store
additional server information:
* ZenML Pro tenant information populated by the `zenml login` command
* ZenML server information populated by the REST zen store by fetching
the server's information endpoint after authenticating
"""
credentials: Dict[str, ServerCredentials]
last_modified_time: Optional[float] = None
def __init__(self) -> None:
"""Initializes the login credentials store with values loaded from the credentials YAML file.
CredentialsStore is a singleton class: only one instance can exist.
Calling this constructor multiple times will always yield the same
instance.
"""
self.credentials = {}
self._load_credentials()
@classmethod
def reset_instance(
cls, store: Optional["CredentialsStore"] = None
) -> None:
"""Reset the singleton instance of the CredentialsStore.
Args:
store: Optional instance of the CredentialsStore to set as the
singleton instance. If None, a new instance will be created.
"""
current_store = cls.get_instance()
if current_store is not None and current_store is not store:
# Delete the credentials file from disk if it exists, otherwise
# the credentials will be reloaded from the file when the new
# instance is created and this call will have no effect
current_store._delete_credentials_file()
cls._clear(store) # type: ignore[arg-type]
if store:
store._save_credentials()
@classmethod
def get_instance(cls) -> Optional["CredentialsStore"]:
"""Get the singleton instance of the CredentialsStore.
Returns:
The singleton instance of the CredentialsStore.
"""
return cast(CredentialsStore, cls._instance())
@property
def _credentials_file(self) -> str:
"""Path to the file where the credentials are stored.
Returns:
The path to the file where the credentials are stored.
"""
config_path = GlobalConfiguration().config_directory
return os.path.join(config_path, CREDENTIALS_STORE_FILENAME)
def _load_credentials(self) -> None:
"""Load the credentials from the YAML file if it exists."""
if handle_bool_env_var(ENV_ZENML_DISABLE_CREDENTIALS_DISK_CACHING):
return
credentials_file = self._credentials_file
credentials_store = {}
if fileio.exists(credentials_file):
try:
credentials_store = yaml_utils.read_yaml(credentials_file)
except Exception as e:
logger.error(
f"Failed to load credentials file {credentials_file}: {e}. "
)
self.last_modified_time = os.path.getmtime(credentials_file)
if credentials_store is None:
# This can happen for example if the config file is empty
credentials_store = {}
elif not isinstance(credentials_store, dict):
logger.warning(
f"The credentials file {credentials_file} is corrupted. "
"Creating a new credentials file."
)
credentials_store = {}
self.credentials = {}
for server_url, token_data in credentials_store.items():
try:
self.credentials[server_url] = ServerCredentials(**token_data)
except ValueError as e:
logger.warning(
f"Failed to load credentials for {server_url}: {e}. "
"Ignoring this token."
)
def _save_credentials(self) -> None:
"""Dump the current credentials store to the YAML file."""
if handle_bool_env_var(ENV_ZENML_DISABLE_CREDENTIALS_DISK_CACHING):
return
credentials_file = self._credentials_file
credentials_store = {
server_url: credential.model_dump(
mode="json", exclude_none=True, exclude_unset=True
)
for server_url, credential in self.credentials.items()
# Evict tokens that have expired past the eviction time
# and have no API key or username/password to fall back on
if credential.api_key
or credential.username
and credential.password is not None
or credential.api_token
and (
not credential.api_token.expires_at
or credential.api_token.expires_at
+ timedelta(seconds=TOKEN_STORE_EVICTION_TIME)
> datetime.now(timezone.utc)
)
}
yaml_utils.write_yaml(credentials_file, credentials_store)
self.last_modified_time = os.path.getmtime(credentials_file)
def _delete_credentials_file(self) -> None:
"""Delete the credentials file."""
if handle_bool_env_var(ENV_ZENML_DISABLE_CREDENTIALS_DISK_CACHING):
return
credentials_file = self._credentials_file
if fileio.exists(credentials_file):
fileio.remove(credentials_file)
self.last_modified_time = None
def check_and_reload_from_file(self) -> None:
"""Check if the credentials file has been modified and reload it if necessary."""
if handle_bool_env_var(ENV_ZENML_DISABLE_CREDENTIALS_DISK_CACHING):
return
if not self.last_modified_time:
return
credentials_file = self._credentials_file
try:
last_modified_time = os.path.getmtime(credentials_file)
except FileNotFoundError:
# The credentials file has been deleted
self.last_modified_time = None
return
if last_modified_time != self.last_modified_time:
self._load_credentials()
def get_password(
self, server_url: str
) -> Tuple[Optional[str], Optional[str]]:
"""Retrieve the username and password from the credentials store for a specific server URL.
Args:
server_url: The server URL for which to retrieve the username and
password.
Returns:
The stored username and password if they exist, None otherwise.
"""
self.check_and_reload_from_file()
credential = self.credentials.get(server_url)
if credential:
return credential.username, credential.password
return None, None
def get_api_key(self, server_url: str) -> Optional[str]:
"""Retrieve an API key from the credentials store for a specific server URL.
Args:
server_url: The server URL for which to retrieve the API key.
Returns:
The stored API key if it exists, None otherwise.
"""
self.check_and_reload_from_file()
credential = self.credentials.get(server_url)
if credential:
return credential.api_key
return None
def get_token(
self, server_url: str, allow_expired: bool = False
) -> Optional[APIToken]:
"""Retrieve a valid token from the credentials store for a specific server URL.
Args:
server_url: The server URL for which to retrieve the token.
allow_expired: Whether to allow expired tokens to be returned. The
default behavior is to return None if a token does exist but is
expired.
Returns:
The stored token if it exists and is not expired, None otherwise.
"""
self.check_and_reload_from_file()
credential = self.credentials.get(server_url)
if credential:
token = credential.api_token
if token and (not token.expired or allow_expired):
return token
return None
def get_credentials(self, server_url: str) -> Optional[ServerCredentials]:
"""Retrieve the credentials for a specific server URL.
Args:
server_url: The server URL for which to retrieve the credentials.
Returns:
The stored credentials if they exist, None otherwise.
"""
self.check_and_reload_from_file()
return self.credentials.get(server_url)
def get_pro_token(self, allow_expired: bool = False) -> Optional[APIToken]:
"""Retrieve a valid token from the credentials store for the ZenML Pro API server.
Args:
allow_expired: Whether to allow expired tokens to be returned. The
default behavior is to return None if a token does exist but is
expired.
Returns:
The stored token if it exists and is not expired, None otherwise.
"""
return self.get_token(ZENML_PRO_API_URL, allow_expired)
def get_pro_credentials(
self, allow_expired: bool = False
) -> Optional[ServerCredentials]:
"""Retrieve a valid token from the credentials store for the ZenML Pro API server.
Args:
allow_expired: Whether to allow expired tokens to be returned. The
default behavior is to return None if a token does exist but is
expired.
Returns:
The stored credentials if they exist and are not expired, None otherwise.
"""
credential = self.get_credentials(ZENML_PRO_API_URL)
if (
credential
and credential.api_token
and (not credential.api_token.expired or allow_expired)
):
return credential
return None
def clear_pro_credentials(self) -> None:
"""Delete the token from the store for the ZenML Pro API server."""
self.clear_token(ZENML_PRO_API_URL)
def clear_all_pro_tokens(self) -> None:
"""Delete all tokens from the store for ZenML Pro API servers."""
for server_url, server in self.credentials.copy().items():
if server.type == ServerType.PRO:
if server.api_key:
continue
self.clear_token(server_url)
def has_valid_authentication(self, url: str) -> bool:
"""Check if a valid authentication credential for the given server URL is stored.
Args:
url: The server URL for which to check the authentication.
Returns:
bool: True if a valid token or API key is stored, False otherwise.
"""
self.check_and_reload_from_file()
credential = self.credentials.get(url)
if not credential:
return False
if credential.api_key or (
credential.username and credential.password is not None
):
return True
token = credential.api_token
return token is not None and not token.expired
def has_valid_pro_authentication(self) -> bool:
"""Check if a valid token for the ZenML Pro API server is stored.
Returns:
bool: True if a valid token is stored, False otherwise.
"""
return self.get_token(ZENML_PRO_API_URL) is not None
def set_api_key(
self,
server_url: str,
api_key: str,
) -> None:
"""Store an API key in the credentials store for a specific server URL.
If an API token or a password is already stored for the server URL, they
will be replaced by the API key.
Args:
server_url: The server URL for which the token is to be stored.
api_key: The API key to store.
"""
self.check_and_reload_from_file()
credential = self.credentials.get(server_url)
if credential and credential.api_key != api_key:
# Reset the API token if a new or updated API key is set, because
# the current token might have been issued for a different account
credential.api_token = None
credential.api_key = api_key
credential.username = None
credential.password = None
else:
self.credentials[server_url] = ServerCredentials(
url=server_url, api_key=api_key
)
self._save_credentials()
def set_password(
self,
server_url: str,
username: str,
password: str,
) -> None:
"""Store a username and password in the credentials store for a specific server URL.
If an API token is already stored for the server URL, it will be
replaced by the username and password.
Args:
server_url: The server URL for which the token is to be stored.
username: The username to store.
password: The password to store.
"""
self.check_and_reload_from_file()
credential = self.credentials.get(server_url)
if credential and (
credential.username != username or credential.password != password
):
# Reset the API token if a new or updated password is set, because
# the current token might have been issued for a different account
credential.api_token = None
credential.username = username
credential.password = password
credential.api_key = None
else:
self.credentials[server_url] = ServerCredentials(
url=server_url, username=username, password=password
)
self._save_credentials()
def set_token(
self,
server_url: str,
token_response: OAuthTokenResponse,
) -> APIToken:
"""Store an API token received from an OAuth2 server.
Args:
server_url: The server URL for which the token is to be stored.
token_response: Token response received from an OAuth2 server.
Returns:
APIToken: The stored token.
"""
self.check_and_reload_from_file()
if token_response.expires_in:
expires_at = datetime.now(timezone.utc) + timedelta(
seconds=token_response.expires_in
)
# Best practice to calculate the leeway depending on the token
# expiration time:
#
# - for short-lived tokens (less than 1 hour), use a fixed leeway of
# a few seconds (e.g., 30 seconds)
# - for longer-lived tokens (e.g., 1 hour or more), use a
# percentage-based leeway of 5-10%
if token_response.expires_in < 3600:
leeway = 30
else:
leeway = token_response.expires_in // 20
else:
expires_at = None
leeway = None
api_token = APIToken(
access_token=token_response.access_token,
expires_in=token_response.expires_in,
expires_at=expires_at,
leeway=leeway,
cookie_name=token_response.cookie_name,
device_id=token_response.device_id,
device_metadata=token_response.device_metadata,
)
credential = self.credentials.get(server_url)
if credential:
credential.api_token = api_token
else:
self.credentials[server_url] = ServerCredentials(
url=server_url, api_token=api_token
)
self._save_credentials()
return api_token
def set_bare_token(
self,
server_url: str,
token: str,
) -> APIToken:
"""Store a bare API token.
Args:
server_url: The server URL for which the token is to be stored.
token: The token to store.
Returns:
APIToken: The stored token.
"""
self.check_and_reload_from_file()
api_token = APIToken(
access_token=token,
)
credential = self.credentials.get(server_url)
if credential:
credential.api_token = api_token
else:
self.credentials[server_url] = ServerCredentials(
url=server_url, api_token=api_token
)
self._save_credentials()
return api_token
def update_server_info(
self,
server_url: str,
server_info: Union[ServerModel, TenantRead],
) -> None:
"""Update the server information stored for a specific server URL.
Args:
server_url: The server URL for which the server information is to be
updated.
server_info: Updated server information.
"""
self.check_and_reload_from_file()
credential = self.credentials.get(server_url)
if not credential:
# No credentials stored for this server URL, nothing to update
return
credential.update_server_info(server_info)
self._save_credentials()
def clear_token(self, server_url: str) -> None:
"""Delete a token from the store for a specific server URL.
Args:
server_url: The server URL for which to delete the token.
"""
self.check_and_reload_from_file()
if server_url in self.credentials:
credential = self.credentials[server_url]
if (
not credential.api_key
and not credential.username
and not credential.password is not None
):
# Only delete the credential entry if there is no API key or
# username/password to fall back on
del self.credentials[server_url]
else:
credential.api_token = None
self._save_credentials()
def clear_credentials(self, server_url: str) -> None:
"""Delete all credentials from the store for a specific server URL.
Args:
server_url: The server URL for which to delete the credentials.
"""
self.check_and_reload_from_file()
if server_url in self.credentials:
del self.credentials[server_url]
self._save_credentials()
def list_credentials(
self, type: Optional[ServerType] = None
) -> List[ServerCredentials]:
"""Get all credentials stored in the credentials store.
Args:
type: Optional server type to filter the credentials by.
Returns:
A list of all credentials stored in the credentials store.
"""
self.check_and_reload_from_file()
credentials = list(self.credentials.values())
if type is not None:
credentials = [c for c in credentials if c and c.type == type]
return credentials
__init__(self)
special
Initializes the login credentials store with values loaded from the credentials YAML file.
CredentialsStore is a singleton class: only one instance can exist. Calling this constructor multiple times will always yield the same instance.
Source code in zenml/login/credentials_store.py
def __init__(self) -> None:
"""Initializes the login credentials store with values loaded from the credentials YAML file.
CredentialsStore is a singleton class: only one instance can exist.
Calling this constructor multiple times will always yield the same
instance.
"""
self.credentials = {}
self._load_credentials()
check_and_reload_from_file(self)
Check if the credentials file has been modified and reload it if necessary.
Source code in zenml/login/credentials_store.py
def check_and_reload_from_file(self) -> None:
"""Check if the credentials file has been modified and reload it if necessary."""
if handle_bool_env_var(ENV_ZENML_DISABLE_CREDENTIALS_DISK_CACHING):
return
if not self.last_modified_time:
return
credentials_file = self._credentials_file
try:
last_modified_time = os.path.getmtime(credentials_file)
except FileNotFoundError:
# The credentials file has been deleted
self.last_modified_time = None
return
if last_modified_time != self.last_modified_time:
self._load_credentials()
clear_all_pro_tokens(self)
Delete all tokens from the store for ZenML Pro API servers.
Source code in zenml/login/credentials_store.py
def clear_all_pro_tokens(self) -> None:
"""Delete all tokens from the store for ZenML Pro API servers."""
for server_url, server in self.credentials.copy().items():
if server.type == ServerType.PRO:
if server.api_key:
continue
self.clear_token(server_url)
clear_credentials(self, server_url)
Delete all credentials from the store for a specific server URL.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
server_url |
str |
The server URL for which to delete the credentials. |
required |
Source code in zenml/login/credentials_store.py
def clear_credentials(self, server_url: str) -> None:
"""Delete all credentials from the store for a specific server URL.
Args:
server_url: The server URL for which to delete the credentials.
"""
self.check_and_reload_from_file()
if server_url in self.credentials:
del self.credentials[server_url]
self._save_credentials()
clear_pro_credentials(self)
Delete the token from the store for the ZenML Pro API server.
Source code in zenml/login/credentials_store.py
def clear_pro_credentials(self) -> None:
"""Delete the token from the store for the ZenML Pro API server."""
self.clear_token(ZENML_PRO_API_URL)
clear_token(self, server_url)
Delete a token from the store for a specific server URL.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
server_url |
str |
The server URL for which to delete the token. |
required |
Source code in zenml/login/credentials_store.py
def clear_token(self, server_url: str) -> None:
"""Delete a token from the store for a specific server URL.
Args:
server_url: The server URL for which to delete the token.
"""
self.check_and_reload_from_file()
if server_url in self.credentials:
credential = self.credentials[server_url]
if (
not credential.api_key
and not credential.username
and not credential.password is not None
):
# Only delete the credential entry if there is no API key or
# username/password to fall back on
del self.credentials[server_url]
else:
credential.api_token = None
self._save_credentials()
get_api_key(self, server_url)
Retrieve an API key from the credentials store for a specific server URL.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
server_url |
str |
The server URL for which to retrieve the API key. |
required |
Returns:
Type | Description |
---|---|
Optional[str] |
The stored API key if it exists, None otherwise. |
Source code in zenml/login/credentials_store.py
def get_api_key(self, server_url: str) -> Optional[str]:
"""Retrieve an API key from the credentials store for a specific server URL.
Args:
server_url: The server URL for which to retrieve the API key.
Returns:
The stored API key if it exists, None otherwise.
"""
self.check_and_reload_from_file()
credential = self.credentials.get(server_url)
if credential:
return credential.api_key
return None
get_credentials(self, server_url)
Retrieve the credentials for a specific server URL.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
server_url |
str |
The server URL for which to retrieve the credentials. |
required |
Returns:
Type | Description |
---|---|
Optional[zenml.login.credentials.ServerCredentials] |
The stored credentials if they exist, None otherwise. |
Source code in zenml/login/credentials_store.py
def get_credentials(self, server_url: str) -> Optional[ServerCredentials]:
"""Retrieve the credentials for a specific server URL.
Args:
server_url: The server URL for which to retrieve the credentials.
Returns:
The stored credentials if they exist, None otherwise.
"""
self.check_and_reload_from_file()
return self.credentials.get(server_url)
get_instance()
classmethod
Get the singleton instance of the CredentialsStore.
Returns:
Type | Description |
---|---|
Optional[CredentialsStore] |
The singleton instance of the CredentialsStore. |
Source code in zenml/login/credentials_store.py
@classmethod
def get_instance(cls) -> Optional["CredentialsStore"]:
"""Get the singleton instance of the CredentialsStore.
Returns:
The singleton instance of the CredentialsStore.
"""
return cast(CredentialsStore, cls._instance())
get_password(self, server_url)
Retrieve the username and password from the credentials store for a specific server URL.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
server_url |
str |
The server URL for which to retrieve the username and password. |
required |
Returns:
Type | Description |
---|---|
Tuple[Optional[str], Optional[str]] |
The stored username and password if they exist, None otherwise. |
Source code in zenml/login/credentials_store.py
def get_password(
self, server_url: str
) -> Tuple[Optional[str], Optional[str]]:
"""Retrieve the username and password from the credentials store for a specific server URL.
Args:
server_url: The server URL for which to retrieve the username and
password.
Returns:
The stored username and password if they exist, None otherwise.
"""
self.check_and_reload_from_file()
credential = self.credentials.get(server_url)
if credential:
return credential.username, credential.password
return None, None
get_pro_credentials(self, allow_expired=False)
Retrieve a valid token from the credentials store for the ZenML Pro API server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
allow_expired |
bool |
Whether to allow expired tokens to be returned. The default behavior is to return None if a token does exist but is expired. |
False |
Returns:
Type | Description |
---|---|
Optional[zenml.login.credentials.ServerCredentials] |
The stored credentials if they exist and are not expired, None otherwise. |
Source code in zenml/login/credentials_store.py
def get_pro_credentials(
self, allow_expired: bool = False
) -> Optional[ServerCredentials]:
"""Retrieve a valid token from the credentials store for the ZenML Pro API server.
Args:
allow_expired: Whether to allow expired tokens to be returned. The
default behavior is to return None if a token does exist but is
expired.
Returns:
The stored credentials if they exist and are not expired, None otherwise.
"""
credential = self.get_credentials(ZENML_PRO_API_URL)
if (
credential
and credential.api_token
and (not credential.api_token.expired or allow_expired)
):
return credential
return None
get_pro_token(self, allow_expired=False)
Retrieve a valid token from the credentials store for the ZenML Pro API server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
allow_expired |
bool |
Whether to allow expired tokens to be returned. The default behavior is to return None if a token does exist but is expired. |
False |
Returns:
Type | Description |
---|---|
Optional[zenml.login.credentials.APIToken] |
The stored token if it exists and is not expired, None otherwise. |
Source code in zenml/login/credentials_store.py
def get_pro_token(self, allow_expired: bool = False) -> Optional[APIToken]:
"""Retrieve a valid token from the credentials store for the ZenML Pro API server.
Args:
allow_expired: Whether to allow expired tokens to be returned. The
default behavior is to return None if a token does exist but is
expired.
Returns:
The stored token if it exists and is not expired, None otherwise.
"""
return self.get_token(ZENML_PRO_API_URL, allow_expired)
get_token(self, server_url, allow_expired=False)
Retrieve a valid token from the credentials store for a specific server URL.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
server_url |
str |
The server URL for which to retrieve the token. |
required |
allow_expired |
bool |
Whether to allow expired tokens to be returned. The default behavior is to return None if a token does exist but is expired. |
False |
Returns:
Type | Description |
---|---|
Optional[zenml.login.credentials.APIToken] |
The stored token if it exists and is not expired, None otherwise. |
Source code in zenml/login/credentials_store.py
def get_token(
self, server_url: str, allow_expired: bool = False
) -> Optional[APIToken]:
"""Retrieve a valid token from the credentials store for a specific server URL.
Args:
server_url: The server URL for which to retrieve the token.
allow_expired: Whether to allow expired tokens to be returned. The
default behavior is to return None if a token does exist but is
expired.
Returns:
The stored token if it exists and is not expired, None otherwise.
"""
self.check_and_reload_from_file()
credential = self.credentials.get(server_url)
if credential:
token = credential.api_token
if token and (not token.expired or allow_expired):
return token
return None
has_valid_authentication(self, url)
Check if a valid authentication credential for the given server URL is stored.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The server URL for which to check the authentication. |
required |
Returns:
Type | Description |
---|---|
bool |
True if a valid token or API key is stored, False otherwise. |
Source code in zenml/login/credentials_store.py
def has_valid_authentication(self, url: str) -> bool:
"""Check if a valid authentication credential for the given server URL is stored.
Args:
url: The server URL for which to check the authentication.
Returns:
bool: True if a valid token or API key is stored, False otherwise.
"""
self.check_and_reload_from_file()
credential = self.credentials.get(url)
if not credential:
return False
if credential.api_key or (
credential.username and credential.password is not None
):
return True
token = credential.api_token
return token is not None and not token.expired
has_valid_pro_authentication(self)
Check if a valid token for the ZenML Pro API server is stored.
Returns:
Type | Description |
---|---|
bool |
True if a valid token is stored, False otherwise. |
Source code in zenml/login/credentials_store.py
def has_valid_pro_authentication(self) -> bool:
"""Check if a valid token for the ZenML Pro API server is stored.
Returns:
bool: True if a valid token is stored, False otherwise.
"""
return self.get_token(ZENML_PRO_API_URL) is not None
list_credentials(self, type=None)
Get all credentials stored in the credentials store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
type |
Optional[zenml.login.credentials.ServerType] |
Optional server type to filter the credentials by. |
None |
Returns:
Type | Description |
---|---|
List[zenml.login.credentials.ServerCredentials] |
A list of all credentials stored in the credentials store. |
Source code in zenml/login/credentials_store.py
def list_credentials(
self, type: Optional[ServerType] = None
) -> List[ServerCredentials]:
"""Get all credentials stored in the credentials store.
Args:
type: Optional server type to filter the credentials by.
Returns:
A list of all credentials stored in the credentials store.
"""
self.check_and_reload_from_file()
credentials = list(self.credentials.values())
if type is not None:
credentials = [c for c in credentials if c and c.type == type]
return credentials
reset_instance(store=None)
classmethod
Reset the singleton instance of the CredentialsStore.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
store |
Optional[CredentialsStore] |
Optional instance of the CredentialsStore to set as the singleton instance. If None, a new instance will be created. |
None |
Source code in zenml/login/credentials_store.py
@classmethod
def reset_instance(
cls, store: Optional["CredentialsStore"] = None
) -> None:
"""Reset the singleton instance of the CredentialsStore.
Args:
store: Optional instance of the CredentialsStore to set as the
singleton instance. If None, a new instance will be created.
"""
current_store = cls.get_instance()
if current_store is not None and current_store is not store:
# Delete the credentials file from disk if it exists, otherwise
# the credentials will be reloaded from the file when the new
# instance is created and this call will have no effect
current_store._delete_credentials_file()
cls._clear(store) # type: ignore[arg-type]
if store:
store._save_credentials()
set_api_key(self, server_url, api_key)
Store an API key in the credentials store for a specific server URL.
If an API token or a password is already stored for the server URL, they will be replaced by the API key.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
server_url |
str |
The server URL for which the token is to be stored. |
required |
api_key |
str |
The API key to store. |
required |
Source code in zenml/login/credentials_store.py
def set_api_key(
self,
server_url: str,
api_key: str,
) -> None:
"""Store an API key in the credentials store for a specific server URL.
If an API token or a password is already stored for the server URL, they
will be replaced by the API key.
Args:
server_url: The server URL for which the token is to be stored.
api_key: The API key to store.
"""
self.check_and_reload_from_file()
credential = self.credentials.get(server_url)
if credential and credential.api_key != api_key:
# Reset the API token if a new or updated API key is set, because
# the current token might have been issued for a different account
credential.api_token = None
credential.api_key = api_key
credential.username = None
credential.password = None
else:
self.credentials[server_url] = ServerCredentials(
url=server_url, api_key=api_key
)
self._save_credentials()
set_bare_token(self, server_url, token)
Store a bare API token.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
server_url |
str |
The server URL for which the token is to be stored. |
required |
token |
str |
The token to store. |
required |
Returns:
Type | Description |
---|---|
APIToken |
The stored token. |
Source code in zenml/login/credentials_store.py
def set_bare_token(
self,
server_url: str,
token: str,
) -> APIToken:
"""Store a bare API token.
Args:
server_url: The server URL for which the token is to be stored.
token: The token to store.
Returns:
APIToken: The stored token.
"""
self.check_and_reload_from_file()
api_token = APIToken(
access_token=token,
)
credential = self.credentials.get(server_url)
if credential:
credential.api_token = api_token
else:
self.credentials[server_url] = ServerCredentials(
url=server_url, api_token=api_token
)
self._save_credentials()
return api_token
set_password(self, server_url, username, password)
Store a username and password in the credentials store for a specific server URL.
If an API token is already stored for the server URL, it will be replaced by the username and password.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
server_url |
str |
The server URL for which the token is to be stored. |
required |
username |
str |
The username to store. |
required |
password |
str |
The password to store. |
required |
Source code in zenml/login/credentials_store.py
def set_password(
self,
server_url: str,
username: str,
password: str,
) -> None:
"""Store a username and password in the credentials store for a specific server URL.
If an API token is already stored for the server URL, it will be
replaced by the username and password.
Args:
server_url: The server URL for which the token is to be stored.
username: The username to store.
password: The password to store.
"""
self.check_and_reload_from_file()
credential = self.credentials.get(server_url)
if credential and (
credential.username != username or credential.password != password
):
# Reset the API token if a new or updated password is set, because
# the current token might have been issued for a different account
credential.api_token = None
credential.username = username
credential.password = password
credential.api_key = None
else:
self.credentials[server_url] = ServerCredentials(
url=server_url, username=username, password=password
)
self._save_credentials()
set_token(self, server_url, token_response)
Store an API token received from an OAuth2 server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
server_url |
str |
The server URL for which the token is to be stored. |
required |
token_response |
OAuthTokenResponse |
Token response received from an OAuth2 server. |
required |
Returns:
Type | Description |
---|---|
APIToken |
The stored token. |
Source code in zenml/login/credentials_store.py
def set_token(
self,
server_url: str,
token_response: OAuthTokenResponse,
) -> APIToken:
"""Store an API token received from an OAuth2 server.
Args:
server_url: The server URL for which the token is to be stored.
token_response: Token response received from an OAuth2 server.
Returns:
APIToken: The stored token.
"""
self.check_and_reload_from_file()
if token_response.expires_in:
expires_at = datetime.now(timezone.utc) + timedelta(
seconds=token_response.expires_in
)
# Best practice to calculate the leeway depending on the token
# expiration time:
#
# - for short-lived tokens (less than 1 hour), use a fixed leeway of
# a few seconds (e.g., 30 seconds)
# - for longer-lived tokens (e.g., 1 hour or more), use a
# percentage-based leeway of 5-10%
if token_response.expires_in < 3600:
leeway = 30
else:
leeway = token_response.expires_in // 20
else:
expires_at = None
leeway = None
api_token = APIToken(
access_token=token_response.access_token,
expires_in=token_response.expires_in,
expires_at=expires_at,
leeway=leeway,
cookie_name=token_response.cookie_name,
device_id=token_response.device_id,
device_metadata=token_response.device_metadata,
)
credential = self.credentials.get(server_url)
if credential:
credential.api_token = api_token
else:
self.credentials[server_url] = ServerCredentials(
url=server_url, api_token=api_token
)
self._save_credentials()
return api_token
update_server_info(self, server_url, server_info)
Update the server information stored for a specific server URL.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
server_url |
str |
The server URL for which the server information is to be updated. |
required |
server_info |
Union[zenml.models.v2.misc.server_models.ServerModel, zenml.login.pro.tenant.models.TenantRead] |
Updated server information. |
required |
Source code in zenml/login/credentials_store.py
def update_server_info(
self,
server_url: str,
server_info: Union[ServerModel, TenantRead],
) -> None:
"""Update the server information stored for a specific server URL.
Args:
server_url: The server URL for which the server information is to be
updated.
server_info: Updated server information.
"""
self.check_and_reload_from_file()
credential = self.credentials.get(server_url)
if not credential:
# No credentials stored for this server URL, nothing to update
return
credential.update_server_info(server_info)
self._save_credentials()
get_credentials_store()
Get the global credentials store instance.
Returns:
Type | Description |
---|---|
CredentialsStore |
The global credentials store instance. |
Source code in zenml/login/credentials_store.py
def get_credentials_store() -> CredentialsStore:
"""Get the global credentials store instance.
Returns:
The global credentials store instance.
"""
return CredentialsStore()
pro
special
ZenML Pro client.
client
ZenML Pro client.
ZenMLProClient
ZenML Pro client.
Source code in zenml/login/pro/client.py
class ZenMLProClient(metaclass=SingletonMetaClass):
"""ZenML Pro client."""
_url: str
_api_token: APIToken
_session: Optional[requests.Session] = None
_tenant: Optional["TenantClient"] = None
_organization: Optional["OrganizationClient"] = None
def __init__(
self, url: Optional[str] = None, api_token: Optional[APIToken] = None
) -> None:
"""Initialize the ZenML Pro client.
Args:
url: The URL of the ZenML Pro API server. If not provided, the
default ZenML Pro API server URL is used.
api_token: The API token to use for authentication. If not provided,
the token is fetched from the credentials store.
Raises:
AuthorizationException: If no API token is provided and no token
is found in the credentials store.
"""
self._url = url or ZENML_PRO_API_URL
if api_token is None:
logger.debug(
"No ZenML Pro API token provided. Fetching from credentials "
"store."
)
api_token = get_credentials_store().get_token(
server_url=self._url, allow_expired=True
)
if api_token is None:
raise AuthorizationException(
"No ZenML Pro API token found. Please run 'zenml login' to "
"login to ZenML Pro."
)
self._api_token = api_token
@property
def tenant(self) -> "TenantClient":
"""Get the tenant client.
Returns:
The tenant client.
"""
if self._tenant is None:
from zenml.login.pro.tenant.client import TenantClient
self._tenant = TenantClient(client=self)
return self._tenant
@property
def organization(self) -> "OrganizationClient":
"""Get the organization client.
Returns:
The organization client.
"""
if self._organization is None:
from zenml.login.pro.organization.client import OrganizationClient
self._organization = OrganizationClient(client=self)
return self._organization
@property
def api_token(self) -> str:
"""Get the API token.
Returns:
The API token.
"""
return self._api_token.access_token
def raise_on_expired_api_token(self) -> None:
"""Raise an exception if the API token has expired.
Raises:
AuthorizationException: If the API token has expired.
"""
if self._api_token and self._api_token.expired:
raise AuthorizationException(
"Your ZenML Pro authentication has expired. Please run "
"'zenml login' to login again."
)
@property
def session(self) -> requests.Session:
"""Authenticate to the ZenML Pro API server.
Returns:
A requests session with the authentication token.
"""
# Check if the API token has expired before every call to the server.
# This prevents unwanted authorization errors from being raised during
# the call itself.
self.raise_on_expired_api_token()
if self._session is None:
self._session = requests.Session()
retries = Retry(backoff_factor=0.1, connect=5)
self._session.mount("https://", HTTPAdapter(max_retries=retries))
self._session.mount("http://", HTTPAdapter(max_retries=retries))
self._session.headers.update(
{"Authorization": "Bearer " + self.api_token}
)
logger.debug("Authenticated to ZenML Pro server.")
return self._session
@staticmethod
def _handle_response(response: requests.Response) -> Json:
"""Handle API response, translating http status codes to Exception.
Args:
response: The response to handle.
Returns:
The parsed response.
Raises:
ValueError: if the response is not in the right format.
RuntimeError: if an error response is received from the server
and a more specific exception cannot be determined.
exc: the exception converted from an error response, if one
is returned from the server.
"""
if 200 <= response.status_code < 300:
try:
payload: Json = response.json()
return payload
except requests.exceptions.JSONDecodeError:
raise ValueError(
"Bad response from API. Expected json, got\n"
f"{response.text}"
)
elif response.status_code >= 400:
exc = exception_from_response(response)
if exc is not None:
raise exc
else:
raise RuntimeError(
f"{response.status_code} HTTP Error received from server: "
f"{response.text}"
)
else:
raise RuntimeError(
"Error retrieving from API. Got response "
f"{response.status_code} with body:\n{response.text}"
)
def _request(
self,
method: str,
url: str,
params: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Json:
"""Make a request to the REST API.
Args:
method: The HTTP method to use.
url: The URL to request.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The parsed response.
Raises:
AuthorizationException: if the request fails due to an expired
authentication token.
"""
params = {k: str(v) for k, v in params.items()} if params else {}
self.session.headers.update(
{source_context.name: source_context.get().value}
)
try:
return self._handle_response(
self.session.request(
method,
url,
params=params,
**kwargs,
)
)
except AuthorizationException:
# Check if this is caused by an expired API token.
self.raise_on_expired_api_token()
# If not, raise the exception.
raise
def get(
self,
path: str,
params: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Json:
"""Make a GET request to the given endpoint path.
Args:
path: The path to the endpoint.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending GET request to {path}...")
return self._request(
"GET",
self._url + path,
params=params,
**kwargs,
)
def delete(
self,
path: str,
params: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Json:
"""Make a DELETE request to the given endpoint path.
Args:
path: The path to the endpoint.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending DELETE request to {path}...")
return self._request(
"DELETE",
self._url + path,
params=params,
**kwargs,
)
def post(
self,
path: str,
body: BaseRestAPIModel,
params: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Json:
"""Make a POST request to the given endpoint path.
Args:
path: The path to the endpoint.
body: The body to send.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending POST request to {path}...")
return self._request(
"POST",
self._url + path,
json=body.model_dump(mode="json"),
params=params,
**kwargs,
)
def put(
self,
path: str,
body: Optional[BaseRestAPIModel] = None,
params: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Json:
"""Make a PUT request to the given endpoint path.
Args:
path: The path to the endpoint.
body: The body to send.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending PUT request to {path}...")
json = (
body.model_dump(mode="json", exclude_unset=True) if body else None
)
return self._request(
"PUT",
self._url + path,
json=json,
params=params,
**kwargs,
)
def patch(
self,
path: str,
body: Optional[BaseRestAPIModel] = None,
params: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Json:
"""Make a PATCH request to the given endpoint path.
Args:
path: The path to the endpoint.
body: The body to send.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending PATCH request to {path}...")
json = (
body.model_dump(mode="json", exclude_unset=True) if body else None
)
return self._request(
"PATCH",
self._url + path,
json=json,
params=params,
**kwargs,
)
def _create_resource(
self,
resource: BaseRestAPIModel,
response_model: Type[AnyResponse],
route: str,
params: Optional[Dict[str, Any]] = None,
) -> AnyResponse:
"""Create a new resource.
Args:
resource: The resource to create.
route: The resource REST API route to use.
response_model: Optional model to use to deserialize the response
body. If not provided, the resource class itself will be used.
params: Optional query parameters to pass to the endpoint.
Returns:
The created resource.
"""
response_body = self.post(f"{route}", body=resource, params=params)
return response_model.model_validate(response_body)
def _get_resource(
self,
resource_id: Union[str, int, UUID],
route: str,
response_model: Type[AnyResponse],
**params: Any,
) -> AnyResponse:
"""Retrieve a single resource.
Args:
resource_id: The ID of the resource to retrieve.
route: The resource REST API route to use.
response_model: Model to use to serialize the response body.
params: Optional query parameters to pass to the endpoint.
Returns:
The retrieved resource.
"""
# leave out filter params that are not supplied
params = dict(filter(lambda x: x[1] is not None, params.items()))
body = self.get(f"{route}/{str(resource_id)}", params=params)
return response_model.model_validate(body)
def _list_resources(
self,
route: str,
response_model: Type[AnyResponse],
**params: Any,
) -> List[AnyResponse]:
"""Retrieve a list of resources filtered by some criteria.
Args:
route: The resource REST API route to use.
response_model: Model to use to serialize the response body.
params: Filter parameters to use in the query.
Returns:
List of retrieved resources matching the filter criteria.
Raises:
ValueError: If the value returned by the server is not a list.
"""
# leave out filter params that are not supplied
params = dict(filter(lambda x: x[1] is not None, params.items()))
body = self.get(f"{route}", params=params)
if not isinstance(body, list):
raise ValueError(
f"Bad API Response. Expected list, got {type(body)}"
)
return [response_model.model_validate(entry) for entry in body]
def _update_resource(
self,
resource_id: Union[str, int, UUID],
resource_update: BaseRestAPIModel,
response_model: Type[AnyResponse],
route: str,
**params: Any,
) -> AnyResponse:
"""Update an existing resource.
Args:
resource_id: The id of the resource to update.
resource_update: The resource update.
response_model: Optional model to use to deserialize the response
body. If not provided, the resource class itself will be used.
route: The resource REST API route to use.
params: Optional query parameters to pass to the endpoint.
Returns:
The updated resource.
"""
# leave out filter params that are not supplied
params = dict(filter(lambda x: x[1] is not None, params.items()))
response_body = self.put(
f"{route}/{str(resource_id)}", body=resource_update, params=params
)
return response_model.model_validate(response_body)
def _delete_resource(
self, resource_id: Union[str, UUID], route: str
) -> None:
"""Delete a resource.
Args:
resource_id: The ID of the resource to delete.
route: The resource REST API route to use.
"""
self.delete(f"{route}/{str(resource_id)}")
api_token: str
property
readonly
Get the API token.
Returns:
Type | Description |
---|---|
str |
The API token. |
organization: OrganizationClient
property
readonly
Get the organization client.
Returns:
Type | Description |
---|---|
OrganizationClient |
The organization client. |
session: Session
property
readonly
Authenticate to the ZenML Pro API server.
Returns:
Type | Description |
---|---|
Session |
A requests session with the authentication token. |
tenant: TenantClient
property
readonly
Get the tenant client.
Returns:
Type | Description |
---|---|
TenantClient |
The tenant client. |
__init__(self, url=None, api_token=None)
special
Initialize the ZenML Pro client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
Optional[str] |
The URL of the ZenML Pro API server. If not provided, the default ZenML Pro API server URL is used. |
None |
api_token |
Optional[zenml.login.credentials.APIToken] |
The API token to use for authentication. If not provided, the token is fetched from the credentials store. |
None |
Exceptions:
Type | Description |
---|---|
AuthorizationException |
If no API token is provided and no token is found in the credentials store. |
Source code in zenml/login/pro/client.py
def __init__(
self, url: Optional[str] = None, api_token: Optional[APIToken] = None
) -> None:
"""Initialize the ZenML Pro client.
Args:
url: The URL of the ZenML Pro API server. If not provided, the
default ZenML Pro API server URL is used.
api_token: The API token to use for authentication. If not provided,
the token is fetched from the credentials store.
Raises:
AuthorizationException: If no API token is provided and no token
is found in the credentials store.
"""
self._url = url or ZENML_PRO_API_URL
if api_token is None:
logger.debug(
"No ZenML Pro API token provided. Fetching from credentials "
"store."
)
api_token = get_credentials_store().get_token(
server_url=self._url, allow_expired=True
)
if api_token is None:
raise AuthorizationException(
"No ZenML Pro API token found. Please run 'zenml login' to "
"login to ZenML Pro."
)
self._api_token = api_token
delete(self, path, params=None, **kwargs)
Make a DELETE request to the given endpoint path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path to the endpoint. |
required |
params |
Optional[Dict[str, Any]] |
The query parameters to pass to the endpoint. |
None |
kwargs |
Any |
Additional keyword arguments to pass to the request. |
{} |
Returns:
Type | Description |
---|---|
Union[Dict[str, Any], List[Any], str, int, float, bool] |
The response body. |
Source code in zenml/login/pro/client.py
def delete(
self,
path: str,
params: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Json:
"""Make a DELETE request to the given endpoint path.
Args:
path: The path to the endpoint.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending DELETE request to {path}...")
return self._request(
"DELETE",
self._url + path,
params=params,
**kwargs,
)
get(self, path, params=None, **kwargs)
Make a GET request to the given endpoint path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path to the endpoint. |
required |
params |
Optional[Dict[str, Any]] |
The query parameters to pass to the endpoint. |
None |
kwargs |
Any |
Additional keyword arguments to pass to the request. |
{} |
Returns:
Type | Description |
---|---|
Union[Dict[str, Any], List[Any], str, int, float, bool] |
The response body. |
Source code in zenml/login/pro/client.py
def get(
self,
path: str,
params: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Json:
"""Make a GET request to the given endpoint path.
Args:
path: The path to the endpoint.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending GET request to {path}...")
return self._request(
"GET",
self._url + path,
params=params,
**kwargs,
)
patch(self, path, body=None, params=None, **kwargs)
Make a PATCH request to the given endpoint path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path to the endpoint. |
required |
body |
Optional[zenml.login.pro.models.BaseRestAPIModel] |
The body to send. |
None |
params |
Optional[Dict[str, Any]] |
The query parameters to pass to the endpoint. |
None |
kwargs |
Any |
Additional keyword arguments to pass to the request. |
{} |
Returns:
Type | Description |
---|---|
Union[Dict[str, Any], List[Any], str, int, float, bool] |
The response body. |
Source code in zenml/login/pro/client.py
def patch(
self,
path: str,
body: Optional[BaseRestAPIModel] = None,
params: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Json:
"""Make a PATCH request to the given endpoint path.
Args:
path: The path to the endpoint.
body: The body to send.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending PATCH request to {path}...")
json = (
body.model_dump(mode="json", exclude_unset=True) if body else None
)
return self._request(
"PATCH",
self._url + path,
json=json,
params=params,
**kwargs,
)
post(self, path, body, params=None, **kwargs)
Make a POST request to the given endpoint path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path to the endpoint. |
required |
body |
BaseRestAPIModel |
The body to send. |
required |
params |
Optional[Dict[str, Any]] |
The query parameters to pass to the endpoint. |
None |
kwargs |
Any |
Additional keyword arguments to pass to the request. |
{} |
Returns:
Type | Description |
---|---|
Union[Dict[str, Any], List[Any], str, int, float, bool] |
The response body. |
Source code in zenml/login/pro/client.py
def post(
self,
path: str,
body: BaseRestAPIModel,
params: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Json:
"""Make a POST request to the given endpoint path.
Args:
path: The path to the endpoint.
body: The body to send.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending POST request to {path}...")
return self._request(
"POST",
self._url + path,
json=body.model_dump(mode="json"),
params=params,
**kwargs,
)
put(self, path, body=None, params=None, **kwargs)
Make a PUT request to the given endpoint path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path to the endpoint. |
required |
body |
Optional[zenml.login.pro.models.BaseRestAPIModel] |
The body to send. |
None |
params |
Optional[Dict[str, Any]] |
The query parameters to pass to the endpoint. |
None |
kwargs |
Any |
Additional keyword arguments to pass to the request. |
{} |
Returns:
Type | Description |
---|---|
Union[Dict[str, Any], List[Any], str, int, float, bool] |
The response body. |
Source code in zenml/login/pro/client.py
def put(
self,
path: str,
body: Optional[BaseRestAPIModel] = None,
params: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Json:
"""Make a PUT request to the given endpoint path.
Args:
path: The path to the endpoint.
body: The body to send.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending PUT request to {path}...")
json = (
body.model_dump(mode="json", exclude_unset=True) if body else None
)
return self._request(
"PUT",
self._url + path,
json=json,
params=params,
**kwargs,
)
raise_on_expired_api_token(self)
Raise an exception if the API token has expired.
Exceptions:
Type | Description |
---|---|
AuthorizationException |
If the API token has expired. |
Source code in zenml/login/pro/client.py
def raise_on_expired_api_token(self) -> None:
"""Raise an exception if the API token has expired.
Raises:
AuthorizationException: If the API token has expired.
"""
if self._api_token and self._api_token.expired:
raise AuthorizationException(
"Your ZenML Pro authentication has expired. Please run "
"'zenml login' to login again."
)
constants
ZenML Pro login constants.
models
ZenML Pro base models.
BaseRestAPIModel (BaseModel)
Base class for all REST API models.
Source code in zenml/login/pro/models.py
class BaseRestAPIModel(BaseModel):
"""Base class for all REST API models."""
model_config = ConfigDict(
# Allow extra attributes to allow compatibility with future versions
extra="allow",
)
organization
special
ZenML Pro organization client.
client
ZenML Pro organization client.
OrganizationClient
Organization management client.
Source code in zenml/login/pro/organization/client.py
class OrganizationClient:
"""Organization management client."""
def __init__(
self,
client: ZenMLProClient,
):
"""Initialize the organization client.
Args:
client: ZenML Pro client.
"""
self.client = client
def get(
self,
id_or_name: Union[UUID, str],
) -> OrganizationRead:
"""Get an organization by id or name.
Args:
id_or_name: Id or name of the organization to retrieve.
Returns:
An organization.
"""
return self.client._get_resource(
resource_id=id_or_name,
route=ORGANIZATIONS_ROUTE,
response_model=OrganizationRead,
)
async def list(
self,
offset: int = 0,
limit: int = 20,
) -> List[OrganizationRead]:
"""List organizations.
Args:
offset: Query offset.
limit: Query limit.
Returns:
List of organizations.
"""
return self.client._list_resources(
route=ORGANIZATIONS_ROUTE,
response_model=OrganizationRead,
offset=offset,
limit=limit,
)
__init__(self, client)
special
Initialize the organization client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
client |
ZenMLProClient |
ZenML Pro client. |
required |
Source code in zenml/login/pro/organization/client.py
def __init__(
self,
client: ZenMLProClient,
):
"""Initialize the organization client.
Args:
client: ZenML Pro client.
"""
self.client = client
get(self, id_or_name)
Get an organization by id or name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id_or_name |
Union[uuid.UUID, str] |
Id or name of the organization to retrieve. |
required |
Returns:
Type | Description |
---|---|
OrganizationRead |
An organization. |
Source code in zenml/login/pro/organization/client.py
def get(
self,
id_or_name: Union[UUID, str],
) -> OrganizationRead:
"""Get an organization by id or name.
Args:
id_or_name: Id or name of the organization to retrieve.
Returns:
An organization.
"""
return self.client._get_resource(
resource_id=id_or_name,
route=ORGANIZATIONS_ROUTE,
response_model=OrganizationRead,
)
list(self, offset=0, limit=20)
async
List organizations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
offset |
int |
Query offset. |
0 |
limit |
int |
Query limit. |
20 |
Returns:
Type | Description |
---|---|
List[zenml.login.pro.organization.models.OrganizationRead] |
List of organizations. |
Source code in zenml/login/pro/organization/client.py
async def list(
self,
offset: int = 0,
limit: int = 20,
) -> List[OrganizationRead]:
"""List organizations.
Args:
offset: Query offset.
limit: Query limit.
Returns:
List of organizations.
"""
return self.client._list_resources(
route=ORGANIZATIONS_ROUTE,
response_model=OrganizationRead,
offset=offset,
limit=limit,
)
models
ZenML Pro organization models.
OrganizationRead (BaseRestAPIModel)
Model for viewing organizations.
Source code in zenml/login/pro/organization/models.py
class OrganizationRead(BaseRestAPIModel):
"""Model for viewing organizations."""
id: UUID
name: str
description: Optional[str] = None
created: datetime
updated: datetime
tenant
special
ZenML Pro tenant client.
client
ZenML Pro tenant client.
TenantClient
Tenant management client.
Source code in zenml/login/pro/tenant/client.py
class TenantClient:
"""Tenant management client."""
def __init__(
self,
client: ZenMLProClient,
):
"""Initialize the tenant client.
Args:
client: ZenML Pro client.
"""
self.client = client
def get(self, id: UUID) -> TenantRead:
"""Get a tenant by id.
Args:
id: Id. of the tenant to retrieve.
Returns:
A tenant.
"""
return self.client._get_resource(
resource_id=id,
route=TENANTS_ROUTE,
response_model=TenantRead,
)
def list(
self,
offset: int = 0,
limit: int = 20,
tenant_name: Optional[str] = None,
url: Optional[str] = None,
organization_id: Optional[UUID] = None,
status: Optional[TenantStatus] = None,
member_only: bool = False,
) -> List[TenantRead]:
"""List tenants.
Args:
offset: Offset to use for filtering.
limit: Limit used for filtering.
tenant_name: Tenant name to filter by.
url: Tenant service URL to filter by.
organization_id: Organization ID to filter by.
status: Filter for only tenants with this status.
member_only: If True, only list tenants where the user is a member
(i.e. users that can connect to the tenant).
Returns:
List of tenants.
"""
return self.client._list_resources(
route=TENANTS_ROUTE,
response_model=TenantRead,
offset=offset,
limit=limit,
tenant_name=tenant_name,
url=url,
organization_id=organization_id,
status=status,
member_only=member_only,
)
__init__(self, client)
special
Initialize the tenant client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
client |
ZenMLProClient |
ZenML Pro client. |
required |
Source code in zenml/login/pro/tenant/client.py
def __init__(
self,
client: ZenMLProClient,
):
"""Initialize the tenant client.
Args:
client: ZenML Pro client.
"""
self.client = client
get(self, id)
Get a tenant by id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id |
UUID |
Id. of the tenant to retrieve. |
required |
Returns:
Type | Description |
---|---|
TenantRead |
A tenant. |
Source code in zenml/login/pro/tenant/client.py
def get(self, id: UUID) -> TenantRead:
"""Get a tenant by id.
Args:
id: Id. of the tenant to retrieve.
Returns:
A tenant.
"""
return self.client._get_resource(
resource_id=id,
route=TENANTS_ROUTE,
response_model=TenantRead,
)
list(self, offset=0, limit=20, tenant_name=None, url=None, organization_id=None, status=None, member_only=False)
List tenants.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
offset |
int |
Offset to use for filtering. |
0 |
limit |
int |
Limit used for filtering. |
20 |
tenant_name |
Optional[str] |
Tenant name to filter by. |
None |
url |
Optional[str] |
Tenant service URL to filter by. |
None |
organization_id |
Optional[uuid.UUID] |
Organization ID to filter by. |
None |
status |
Optional[zenml.login.pro.tenant.models.TenantStatus] |
Filter for only tenants with this status. |
None |
member_only |
bool |
If True, only list tenants where the user is a member (i.e. users that can connect to the tenant). |
False |
Returns:
Type | Description |
---|---|
List[zenml.login.pro.tenant.models.TenantRead] |
List of tenants. |
Source code in zenml/login/pro/tenant/client.py
def list(
self,
offset: int = 0,
limit: int = 20,
tenant_name: Optional[str] = None,
url: Optional[str] = None,
organization_id: Optional[UUID] = None,
status: Optional[TenantStatus] = None,
member_only: bool = False,
) -> List[TenantRead]:
"""List tenants.
Args:
offset: Offset to use for filtering.
limit: Limit used for filtering.
tenant_name: Tenant name to filter by.
url: Tenant service URL to filter by.
organization_id: Organization ID to filter by.
status: Filter for only tenants with this status.
member_only: If True, only list tenants where the user is a member
(i.e. users that can connect to the tenant).
Returns:
List of tenants.
"""
return self.client._list_resources(
route=TENANTS_ROUTE,
response_model=TenantRead,
offset=offset,
limit=limit,
tenant_name=tenant_name,
url=url,
organization_id=organization_id,
status=status,
member_only=member_only,
)
models
ZenML Pro tenant models.
TenantRead (BaseRestAPIModel)
Pydantic Model for viewing a Tenant.
Source code in zenml/login/pro/tenant/models.py
class TenantRead(BaseRestAPIModel):
"""Pydantic Model for viewing a Tenant."""
id: UUID
name: str
description: Optional[str] = Field(
default=None, description="The description of the tenant."
)
organization: OrganizationRead
desired_state: str = Field(description="The desired state of the tenant.")
state_reason: str = Field(
description="The reason for the current tenant state.",
)
status: str = Field(
description="The current operational state of the tenant."
)
zenml_service: ZenMLServiceRead = Field(description="The ZenML service.")
@property
def organization_id(self) -> UUID:
"""Get the organization id.
Returns:
The organization id.
"""
return self.organization.id
@property
def organization_name(self) -> str:
"""Get the organization name.
Returns:
The organization name.
"""
return self.organization.name
@property
def version(self) -> Optional[str]:
"""Get the ZenML service version.
Returns:
The ZenML service version.
"""
version = self.zenml_service.configuration.version
if self.zenml_service.status and self.zenml_service.status.version:
version = self.zenml_service.status.version
return version
@property
def url(self) -> Optional[str]:
"""Get the ZenML server URL.
Returns:
The ZenML server URL, if available.
"""
return (
self.zenml_service.status.server_url
if self.zenml_service.status
else None
)
@property
def dashboard_url(self) -> str:
"""Get the URL to the ZenML Pro dashboard for this tenant.
Returns:
The URL to the ZenML Pro dashboard for this tenant.
"""
return (
ZENML_PRO_URL
+ f"/organizations/{str(self.organization_id)}/tenants/{str(self.id)}"
)
@property
def dashboard_organization_url(self) -> str:
"""Get the URL to the ZenML Pro dashboard for this tenant's organization.
Returns:
The URL to the ZenML Pro dashboard for this tenant's organization.
"""
return ZENML_PRO_URL + f"/organizations/{str(self.organization_id)}"
dashboard_organization_url: str
property
readonly
Get the URL to the ZenML Pro dashboard for this tenant's organization.
Returns:
Type | Description |
---|---|
str |
The URL to the ZenML Pro dashboard for this tenant's organization. |
dashboard_url: str
property
readonly
Get the URL to the ZenML Pro dashboard for this tenant.
Returns:
Type | Description |
---|---|
str |
The URL to the ZenML Pro dashboard for this tenant. |
organization_id: UUID
property
readonly
Get the organization id.
Returns:
Type | Description |
---|---|
UUID |
The organization id. |
organization_name: str
property
readonly
Get the organization name.
Returns:
Type | Description |
---|---|
str |
The organization name. |
url: Optional[str]
property
readonly
Get the ZenML server URL.
Returns:
Type | Description |
---|---|
Optional[str] |
The ZenML server URL, if available. |
version: Optional[str]
property
readonly
Get the ZenML service version.
Returns:
Type | Description |
---|---|
Optional[str] |
The ZenML service version. |
TenantStatus (StrEnum)
Enum that represents the desired state or status of a tenant.
These values can be used in two places:
- in the
desired_state
field of a tenant object, to indicate the desired state of the tenant (with the exception ofPENDING
andFAILED
which are not valid values fordesired_state
) - in the
status
field of a tenant object, to indicate the current state of the tenant
Source code in zenml/login/pro/tenant/models.py
class TenantStatus(StrEnum):
"""Enum that represents the desired state or status of a tenant.
These values can be used in two places:
* in the `desired_state` field of a tenant object, to indicate the desired
state of the tenant (with the exception of `PENDING` and `FAILED` which
are not valid values for `desired_state`)
* in the `status` field of a tenant object, to indicate the current state
of the tenant
"""
# Tenant hasn't been deployed yet (i.e. newly created) or has been fully
# deleted by the infrastructure provider
NOT_INITIALIZED = "not_initialized"
# Tenant is being processed by the infrastructure provider (is being
# deployed, updated, deactivated, re-activated or deleted/cleaned up).
PENDING = "pending"
# Tenant is up and running
AVAILABLE = "available"
# Tenant is in a failure state (i.e. deployment, update or deletion failed)
FAILED = "failed"
# Tenant is deactivated
DEACTIVATED = "deactivated"
# Tenant resources have been deleted by the infrastructure provider but
# the tenant object still exists in the database
DELETED = "deleted"
ZenMLServiceConfiguration (BaseRestAPIModel)
ZenML service configuration.
Source code in zenml/login/pro/tenant/models.py
class ZenMLServiceConfiguration(BaseRestAPIModel):
"""ZenML service configuration."""
version: str = Field(
description="The ZenML version.",
)
ZenMLServiceRead (BaseRestAPIModel)
Pydantic Model for viewing a ZenML service.
Source code in zenml/login/pro/tenant/models.py
class ZenMLServiceRead(BaseRestAPIModel):
"""Pydantic Model for viewing a ZenML service."""
configuration: ZenMLServiceConfiguration = Field(
description="The service configuration."
)
status: Optional[ZenMLServiceStatus] = Field(
default=None,
description="Information about the service status. Only set if the "
"service is deployed and active.",
)
ZenMLServiceStatus (BaseRestAPIModel)
ZenML service status.
Source code in zenml/login/pro/tenant/models.py
class ZenMLServiceStatus(BaseRestAPIModel):
"""ZenML service status."""
server_url: str = Field(
description="The ZenML server URL.",
)
version: Optional[str] = Field(
default=None,
description="The ZenML server version.",
)
utils
ZenML Pro login utils.
get_troubleshooting_instructions(url)
Get troubleshooting instructions for a given ZenML Pro server URL.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
ZenML Pro server URL |
required |
Returns:
Type | Description |
---|---|
str |
Troubleshooting instructions |
Source code in zenml/login/pro/utils.py
def get_troubleshooting_instructions(url: str) -> str:
"""Get troubleshooting instructions for a given ZenML Pro server URL.
Args:
url: ZenML Pro server URL
Returns:
Troubleshooting instructions
"""
credentials_store = get_credentials_store()
if credentials_store.has_valid_pro_authentication():
client = ZenMLProClient()
try:
servers = client.tenant.list(url=url, member_only=False)
except Exception as e:
logger.debug(f"Failed to list tenants: {e}")
else:
if servers:
server = servers[0]
if server.status == TenantStatus.AVAILABLE:
return (
f"The '{server.name}' ZenML Pro server that the client "
"is connected to is currently running but you may not "
"have the necessary permissions to access it. Please "
"contact your ZenML Pro administrator for more "
"information or try to manage the server members "
"yourself if you have the necessary permissions by "
f"visiting the ZenML Pro tenant page at {server.dashboard_url}."
)
if server.status == TenantStatus.DEACTIVATED:
return (
f"The '{server.name}' ZenML Pro server that the client "
"is connected to has been deactivated. "
"Please contact your ZenML Pro administrator for more "
"information or to reactivate the server yourself if "
"you have the necessary permissions by visiting the "
f"ZenML Pro Organization page at {server.dashboard_organization_url}."
)
if server.status == TenantStatus.PENDING:
return (
f"The '{server.name}' ZenML Pro server that the client "
"is connected to is currently undergoing maintenance "
"(e.g. being deployed, upgraded or re-activated). "
"Please try again later or contact your ZenML Pro "
"administrator for more information. You can also "
f"visit the ZenML Pro tenant page at {server.dashboard_url}."
)
return (
f"The '{server.name}' ZenML Pro server that the client "
"is connected to is currently in a failed "
"state. Please contact your ZenML Pro administrator for "
"more information or try to re-deploy the server "
"yourself if you have the necessary permissions by "
"visiting the ZenML Pro Organization page at "
f"{server.dashboard_organization_url}."
)
return (
f"The ZenML Pro server at URL '{url}' that the client is "
"connected to does not exist or you may not have access to it. "
"Please check the URL and your permissions and try again or "
"connect your client to a different server by running `zenml "
"login` or by using a service account API key."
)
return (
f"The ZenML Pro server at URL '{url}' that the client is connected to "
"does not exist, is not running, or you do not have permissions to "
"connect to it. Please check the URL and your permissions "
"and try again. The ZenML Pro server might have been deactivated or is "
"currently pending maintenance. Please contact your ZenML Pro "
"administrator for more information or try to manage the server "
"state by visiting the ZenML Pro dashboard."
)
is_zenml_pro_server_url(url)
Check if a given URL is a ZenML Pro server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
URL to check |
required |
Returns:
Type | Description |
---|---|
bool |
True if the URL is a ZenML Pro tenant, False otherwise |
Source code in zenml/login/pro/utils.py
def is_zenml_pro_server_url(url: str) -> bool:
"""Check if a given URL is a ZenML Pro server.
Args:
url: URL to check
Returns:
True if the URL is a ZenML Pro tenant, False otherwise
"""
domain_regex = ZENML_PRO_SERVER_SUBDOMAIN.replace(".", r"\.")
return bool(
re.match(
r"^(https://)?[a-zA-Z0-9-\.]+\.{domain}/?$".format(
domain=domain_regex
),
url,
)
)
web_login
ZenML OAuth2 device authorization grant client support.
web_login(url=None, verify_ssl=None)
Implements the OAuth2 Device Authorization Grant flow.
This function implements the client side of the OAuth2 Device Authorization Grant flow as defined in https://tools.ietf.org/html/rfc8628, with the following customizations:
- the unique ZenML client ID (
user_id
in the global config) is used as the OAuth2 client ID value - additional information is added to the user agent header to be used by users to identify the ZenML client
Upon completion of the flow, the access token is saved in the credentials store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
Optional[str] |
The URL of the OAuth2 server. If not provided, the ZenML Pro API server is used by default. |
None |
verify_ssl |
Union[str, bool] |
Whether to verify the SSL certificate of the OAuth2 server. If a string is passed, it is interpreted as the path to a CA bundle file. |
None |
Returns:
Type | Description |
---|---|
APIToken |
The response returned by the OAuth2 server. |
Exceptions:
Type | Description |
---|---|
AuthorizationException |
If an error occurred during the authorization process. |
Source code in zenml/login/web_login.py
def web_login(
url: Optional[str] = None, verify_ssl: Optional[Union[str, bool]] = None
) -> APIToken:
"""Implements the OAuth2 Device Authorization Grant flow.
This function implements the client side of the OAuth2 Device Authorization
Grant flow as defined in https://tools.ietf.org/html/rfc8628, with the
following customizations:
* the unique ZenML client ID (`user_id` in the global config) is used
as the OAuth2 client ID value
* additional information is added to the user agent header to be used by
users to identify the ZenML client
Upon completion of the flow, the access token is saved in the credentials store.
Args:
url: The URL of the OAuth2 server. If not provided, the ZenML Pro API
server is used by default.
verify_ssl: Whether to verify the SSL certificate of the OAuth2 server.
If a string is passed, it is interpreted as the path to a CA bundle
file.
Returns:
The response returned by the OAuth2 server.
Raises:
AuthorizationException: If an error occurred during the authorization
process.
"""
from zenml.login.credentials_store import get_credentials_store
from zenml.models import (
OAuthDeviceAuthorizationRequest,
OAuthDeviceAuthorizationResponse,
OAuthDeviceTokenRequest,
OAuthDeviceUserAgentHeader,
OAuthTokenResponse,
)
credentials_store = get_credentials_store()
# Make a request to the OAuth2 server to get the device code and user code.
# The client ID used for the request is the unique ID of the ZenML client.
response: Optional[requests.Response] = None
# Add the following information in the user agent header to be used by users
# to identify the ZenML client:
#
# * the ZenML version
# * the python version
# * the OS type
# * the hostname
#
user_agent_header = OAuthDeviceUserAgentHeader(
hostname=platform.node(),
zenml_version=__version__,
python_version=platform.python_version(),
os=platform.system(),
)
zenml_pro = False
if not url:
# If no URL is provided, we use the ZenML Pro API server by default
zenml_pro = True
url = base_url = ZENML_PRO_API_URL
else:
# Get rid of any trailing slashes to prevent issues when having double
# slashes in the URL
url = url.rstrip("/")
if is_zenml_pro_server_url(url):
# This is a ZenML Pro server. The device authentication is done
# through the ZenML Pro API.
zenml_pro = True
base_url = ZENML_PRO_API_URL
else:
base_url = url
auth_request = OAuthDeviceAuthorizationRequest(
client_id=GlobalConfiguration().user_id
)
# If an existing token is found in the credentials store, we reuse its
# device ID to avoid creating a new device ID for the same device.
existing_token = credentials_store.get_token(url)
if existing_token and existing_token.device_id:
auth_request.device_id = existing_token.device_id
if zenml_pro:
auth_url = base_url + AUTH + DEVICE_AUTHORIZATION
login_url = base_url + AUTH + LOGIN
else:
auth_url = base_url + API + VERSION_1 + DEVICE_AUTHORIZATION
login_url = base_url + API + VERSION_1 + LOGIN
try:
response = requests.post(
auth_url,
headers={
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": user_agent_header.encode(),
},
data=auth_request.model_dump(exclude_none=True),
verify=verify_ssl,
timeout=DEFAULT_HTTP_TIMEOUT,
)
if response.status_code == 200:
auth_response = OAuthDeviceAuthorizationResponse(**response.json())
else:
logger.info(f"Error: {response.status_code} {response.text}")
raise AuthorizationException(
f"Could not connect to {base_url}. Please check the URL."
)
except (requests.exceptions.JSONDecodeError, ValueError, TypeError):
logger.exception("Bad response received from API server.")
raise AuthorizationException(
"Bad response received from API server. Please check the URL."
)
except requests.exceptions.RequestException:
logger.exception("Could not connect to API server.")
raise AuthorizationException(
f"Could not connect to {base_url}. Please check the URL."
)
# Open the verification URL in the user's browser
verification_uri = (
auth_response.verification_uri_complete
or auth_response.verification_uri
)
if verification_uri.startswith("/"):
# If the verification URI is a relative path, we need to add the base
# URL to it
verification_uri = base_url + verification_uri
webbrowser.open(verification_uri)
logger.info(
f"If your browser did not open automatically, please open the "
f"following URL into your browser to proceed with the authentication:"
f"\n\n{verification_uri}\n"
)
# Poll the OAuth2 server until the user has authorized the device
token_request = OAuthDeviceTokenRequest(
device_code=auth_response.device_code,
client_id=auth_request.client_id,
)
expires_in = auth_response.expires_in
interval = auth_response.interval
token_response: OAuthTokenResponse
while True:
response = requests.post(
login_url,
headers={"Content-Type": "application/x-www-form-urlencoded"},
data=token_request.model_dump(),
verify=verify_ssl,
timeout=DEFAULT_HTTP_TIMEOUT,
)
if response.status_code == 200:
# The user has authorized the device, so we can extract the access token
token_response = OAuthTokenResponse(**response.json())
if zenml_pro:
logger.info("Successfully logged in to ZenML Pro.")
else:
logger.info(f"Successfully logged in to {url}.")
break
elif response.status_code == 400:
try:
error_response = OAuthError(**response.json())
except (
requests.exceptions.JSONDecodeError,
ValueError,
TypeError,
):
raise AuthorizationException(
f"Error received from {base_url}: {response.text}"
)
if error_response.error == "authorization_pending":
# The user hasn't authorized the device yet, so we wait for the
# interval and try again
pass
elif error_response.error == "slow_down":
# The OAuth2 server is asking us to slow down our polling
interval += 5
else:
# There was another error with the request
raise AuthorizationException(
f"Error: {error_response.error} {error_response.error_description}"
)
expires_in -= interval
if expires_in <= 0:
raise AuthorizationException(
"User did not authorize the device in time."
)
time.sleep(interval)
else:
# There was another error with the request
raise AuthorizationException(
f"Error: {response.status_code} {response.json()['error']}"
)
# Save the token in the credentials store
return credentials_store.set_token(url, token_response)