Hub
zenml._hub
special
ZenML Hub internal code.
client
Client for the ZenML Hub.
HubAPIError (Exception)
Exception raised when the Hub returns an error or unexpected response.
Source code in zenml/_hub/client.py
class HubAPIError(Exception):
"""Exception raised when the Hub returns an error or unexpected response."""
HubClient
Client for the ZenML Hub.
Source code in zenml/_hub/client.py
class HubClient:
"""Client for the ZenML Hub."""
def __init__(self, url: Optional[str] = None) -> None:
"""Initialize the client.
Args:
url: The URL of the ZenML Hub.
"""
self.url = url or self.get_default_url()
self.auth_token = Client().active_user.hub_token
@staticmethod
def get_default_url() -> str:
"""Get the default URL of the ZenML Hub.
Returns:
The default URL of the ZenML Hub.
"""
return os.getenv(ENV_ZENML_HUB_URL, default=ZENML_HUB_DEFAULT_URL)
def list_plugins(self, **params: Any) -> List[HubPluginResponseModel]:
"""List all plugins in the hub.
Args:
**params: The query parameters to send in the request.
Returns:
The list of plugin response models.
"""
response = self._request("GET", "/plugins", params=params)
if not isinstance(response, list):
return []
return [
HubPluginResponseModel.model_validate(plugin)
for plugin in response
]
def get_plugin(
self,
name: str,
version: Optional[str] = None,
author: Optional[str] = None,
) -> Optional[HubPluginResponseModel]:
"""Get a specific plugin from the hub.
Args:
name: The name of the plugin.
version: The version of the plugin. If not specified, the latest
version will be returned.
author: Username of the author of the plugin.
Returns:
The plugin response model or None if the plugin does not exist.
"""
route = "/plugins"
if not author:
author = ZENML_HUB_ADMIN_USERNAME
options = [
f"name={name}",
f"username={author}",
]
if version:
options.append(f"version={version}")
if options:
route += "?" + "&".join(options)
try:
response = self._request("GET", route)
except HubAPIError:
return None
if not isinstance(response, list) or len(response) == 0:
return None
return HubPluginResponseModel.model_validate(response[0])
def create_plugin(
self, plugin_request: HubPluginRequestModel
) -> HubPluginResponseModel:
"""Create a plugin in the hub.
Args:
plugin_request: The plugin request model.
Returns:
The plugin response model.
"""
route = "/plugins"
response = self._request(
"POST", route, data=plugin_request.model_dump_json()
)
return HubPluginResponseModel.model_validate(response)
# TODO: Potentially reenable this later if hub adds logs streaming endpoint
# def stream_plugin_build_logs(
# self, plugin_name: str, plugin_version: str
# ) -> bool:
# """Stream the build logs of a plugin.
# Args:
# plugin_name: The name of the plugin.
# plugin_version: The version of the plugin. If not specified, the
# latest version will be used.
# Returns:
# Whether any logs were found.
# Raises:
# HubAPIError: If the build failed.
# """
# route = f"plugins/{plugin_name}/versions/{plugin_version}/logs"
# logs_url = os.path.join(self.url, route)
# found_logs = False
# with requests.get(logs_url, stream=True) as response:
# for line in response.iter_lines(
# chunk_size=None, decode_unicode=True
# ):
# found_logs = True
# if line.startswith("Build failed"):
# raise HubAPIError(line)
# else:
# logger.info(line)
# return found_logs
def login(self, username: str, password: str) -> None:
"""Login to the ZenML Hub.
Args:
username: The username of the user in the ZenML Hub.
password: The password of the user in the ZenML Hub.
Raises:
HubAPIError: If the login failed.
"""
route = "/auth/jwt/login"
response = self._request(
method="POST",
route=route,
data={"username": username, "password": password},
content_type="application/x-www-form-urlencoded",
)
if isinstance(response, dict):
auth_token = response.get("access_token")
if auth_token:
self.set_auth_token(str(auth_token))
return
raise HubAPIError(f"Unexpected response: {response}")
def set_auth_token(self, auth_token: Optional[str]) -> None:
"""Set the auth token.
Args:
auth_token: The auth token to set.
"""
client = Client()
client.update_user(
name_id_or_prefix=client.active_user.id,
updated_hub_token=auth_token,
)
self.auth_token = auth_token
def get_github_login_url(self) -> str:
"""Get the GitHub login URL.
Returns:
The GitHub login URL.
Raises:
HubAPIError: If the request failed.
"""
route = "/auth/github/authorize"
response = self._request("GET", route)
if isinstance(response, dict):
auth_url = response.get("authorization_url")
if auth_url:
return str(auth_url)
raise HubAPIError(f"Unexpected response: {str(response)}")
def get_me(self) -> Optional[HubUserResponseModel]:
"""Get the current user.
Returns:
The user response model or None if the user does not exist.
"""
try:
response = self._request("GET", "/users/me")
return HubUserResponseModel.model_validate(response)
except HubAPIError:
return None
def _request(
self,
method: str,
route: str,
data: Optional[Any] = None,
params: Optional[Dict[str, Any]] = None,
content_type: str = "application/json",
) -> Any:
"""Helper function to make a request to the hub.
Args:
method: The HTTP method to use.
route: The route to send the request to, e.g., "/plugins".
data: The data to send in the request.
params: The query parameters to send in the request.
content_type: The content type of the request.
Returns:
The response JSON.
Raises:
HubAPIError: If the request failed.
"""
session = requests.Session()
# Define headers
headers = {
"Accept": "application/json",
"Content-Type": content_type,
"Debug-Context": str(IS_DEBUG_ENV),
"Source-Context": str(source_context.get().value),
}
if self.auth_token:
headers["Authorization"] = f"Bearer {self.auth_token}"
# Make the request
route = route.lstrip("/")
endpoint_url = os.path.join(self.url, route)
response = session.request(
method=method,
url=endpoint_url,
data=data,
headers=headers,
params=params,
verify=ZENML_HUB_CLIENT_VERIFY,
timeout=ZENML_HUB_CLIENT_TIMEOUT,
)
# Parse and return the response
if 200 <= response.status_code < 300:
return response.json()
try:
error_msg = response.json().get("detail", response.text)
except JSONDecodeError:
error_msg = response.text
raise HubAPIError(f"Request to ZenML Hub failed: {error_msg}")
__init__(self, url=None)
special
Initialize the client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
Optional[str] |
The URL of the ZenML Hub. |
None |
Source code in zenml/_hub/client.py
def __init__(self, url: Optional[str] = None) -> None:
"""Initialize the client.
Args:
url: The URL of the ZenML Hub.
"""
self.url = url or self.get_default_url()
self.auth_token = Client().active_user.hub_token
create_plugin(self, plugin_request)
Create a plugin in the hub.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
plugin_request |
HubPluginRequestModel |
The plugin request model. |
required |
Returns:
Type | Description |
---|---|
HubPluginResponseModel |
The plugin response model. |
Source code in zenml/_hub/client.py
def create_plugin(
self, plugin_request: HubPluginRequestModel
) -> HubPluginResponseModel:
"""Create a plugin in the hub.
Args:
plugin_request: The plugin request model.
Returns:
The plugin response model.
"""
route = "/plugins"
response = self._request(
"POST", route, data=plugin_request.model_dump_json()
)
return HubPluginResponseModel.model_validate(response)
get_default_url()
staticmethod
Get the default URL of the ZenML Hub.
Returns:
Type | Description |
---|---|
str |
The default URL of the ZenML Hub. |
Source code in zenml/_hub/client.py
@staticmethod
def get_default_url() -> str:
"""Get the default URL of the ZenML Hub.
Returns:
The default URL of the ZenML Hub.
"""
return os.getenv(ENV_ZENML_HUB_URL, default=ZENML_HUB_DEFAULT_URL)
get_github_login_url(self)
Get the GitHub login URL.
Returns:
Type | Description |
---|---|
str |
The GitHub login URL. |
Exceptions:
Type | Description |
---|---|
HubAPIError |
If the request failed. |
Source code in zenml/_hub/client.py
def get_github_login_url(self) -> str:
"""Get the GitHub login URL.
Returns:
The GitHub login URL.
Raises:
HubAPIError: If the request failed.
"""
route = "/auth/github/authorize"
response = self._request("GET", route)
if isinstance(response, dict):
auth_url = response.get("authorization_url")
if auth_url:
return str(auth_url)
raise HubAPIError(f"Unexpected response: {str(response)}")
get_me(self)
Get the current user.
Returns:
Type | Description |
---|---|
Optional[zenml.models.v2.misc.hub_plugin_models.HubUserResponseModel] |
The user response model or None if the user does not exist. |
Source code in zenml/_hub/client.py
def get_me(self) -> Optional[HubUserResponseModel]:
"""Get the current user.
Returns:
The user response model or None if the user does not exist.
"""
try:
response = self._request("GET", "/users/me")
return HubUserResponseModel.model_validate(response)
except HubAPIError:
return None
get_plugin(self, name, version=None, author=None)
Get a specific plugin from the hub.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the plugin. |
required |
version |
Optional[str] |
The version of the plugin. If not specified, the latest version will be returned. |
None |
author |
Optional[str] |
Username of the author of the plugin. |
None |
Returns:
Type | Description |
---|---|
Optional[zenml.models.v2.misc.hub_plugin_models.HubPluginResponseModel] |
The plugin response model or None if the plugin does not exist. |
Source code in zenml/_hub/client.py
def get_plugin(
self,
name: str,
version: Optional[str] = None,
author: Optional[str] = None,
) -> Optional[HubPluginResponseModel]:
"""Get a specific plugin from the hub.
Args:
name: The name of the plugin.
version: The version of the plugin. If not specified, the latest
version will be returned.
author: Username of the author of the plugin.
Returns:
The plugin response model or None if the plugin does not exist.
"""
route = "/plugins"
if not author:
author = ZENML_HUB_ADMIN_USERNAME
options = [
f"name={name}",
f"username={author}",
]
if version:
options.append(f"version={version}")
if options:
route += "?" + "&".join(options)
try:
response = self._request("GET", route)
except HubAPIError:
return None
if not isinstance(response, list) or len(response) == 0:
return None
return HubPluginResponseModel.model_validate(response[0])
list_plugins(self, **params)
List all plugins in the hub.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**params |
Any |
The query parameters to send in the request. |
{} |
Returns:
Type | Description |
---|---|
List[zenml.models.v2.misc.hub_plugin_models.HubPluginResponseModel] |
The list of plugin response models. |
Source code in zenml/_hub/client.py
def list_plugins(self, **params: Any) -> List[HubPluginResponseModel]:
"""List all plugins in the hub.
Args:
**params: The query parameters to send in the request.
Returns:
The list of plugin response models.
"""
response = self._request("GET", "/plugins", params=params)
if not isinstance(response, list):
return []
return [
HubPluginResponseModel.model_validate(plugin)
for plugin in response
]
login(self, username, password)
Login to the ZenML Hub.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
username |
str |
The username of the user in the ZenML Hub. |
required |
password |
str |
The password of the user in the ZenML Hub. |
required |
Exceptions:
Type | Description |
---|---|
HubAPIError |
If the login failed. |
Source code in zenml/_hub/client.py
def login(self, username: str, password: str) -> None:
"""Login to the ZenML Hub.
Args:
username: The username of the user in the ZenML Hub.
password: The password of the user in the ZenML Hub.
Raises:
HubAPIError: If the login failed.
"""
route = "/auth/jwt/login"
response = self._request(
method="POST",
route=route,
data={"username": username, "password": password},
content_type="application/x-www-form-urlencoded",
)
if isinstance(response, dict):
auth_token = response.get("access_token")
if auth_token:
self.set_auth_token(str(auth_token))
return
raise HubAPIError(f"Unexpected response: {response}")
set_auth_token(self, auth_token)
Set the auth token.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
auth_token |
Optional[str] |
The auth token to set. |
required |
Source code in zenml/_hub/client.py
def set_auth_token(self, auth_token: Optional[str]) -> None:
"""Set the auth token.
Args:
auth_token: The auth token to set.
"""
client = Client()
client.update_user(
name_id_or_prefix=client.active_user.id,
updated_hub_token=auth_token,
)
self.auth_token = auth_token
constants
Constants for the ZenML hub.
utils
Utility functions for the ZenML Hub.
parse_plugin_name(plugin_name, author_separator='/', version_separator=':')
Helper function to parse a plugin name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
plugin_name |
str |
The user-provided plugin name. |
required |
author_separator |
str |
The separator between the author username and the plugin name. |
'/' |
version_separator |
str |
The separator between the plugin name and the plugin version. |
':' |
Returns:
Type | Description |
---|---|
Tuple[Optional[str], str, str] |
|
Exceptions:
Type | Description |
---|---|
ValueError |
If the plugin name is invalid. |
Source code in zenml/_hub/utils.py
def parse_plugin_name(
plugin_name: str, author_separator: str = "/", version_separator: str = ":"
) -> Tuple[Optional[str], str, str]:
"""Helper function to parse a plugin name.
Args:
plugin_name: The user-provided plugin name.
author_separator: The separator between the author username and the
plugin name.
version_separator: The separator between the plugin name and the
plugin version.
Returns:
- The author username or None if no author was specified.
- The plugin name.
- The plugin version or "latest" if no version was specified.
Raises:
ValueError: If the plugin name is invalid.
"""
invalid_format_err_msg = (
f"Invalid plugin name '{plugin_name}'. Expected format: "
f"`(<author_username>{author_separator})<plugin_name>({version_separator}<version>)`."
)
parts = plugin_name.split(version_separator)
if len(parts) > 2:
raise ValueError(invalid_format_err_msg)
name, version = parts[0], "latest" if len(parts) == 1 else parts[1]
parts = name.split(author_separator)
if len(parts) > 2:
raise ValueError(invalid_format_err_msg)
name, author = parts[-1], None if len(parts) == 1 else parts[0]
if not name:
raise ValueError(invalid_format_err_msg)
return author, name, version
plugin_display_name(name, version, author)
Helper function to get the display name of a plugin.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the plugin. |
required |
version |
Optional[str] |
Version of the plugin. |
required |
author |
Optional[str] |
Username of the plugin author. |
required |
Returns:
Type | Description |
---|---|
str |
Display name of the plugin. |
Source code in zenml/_hub/utils.py
def plugin_display_name(
name: str, version: Optional[str], author: Optional[str]
) -> str:
"""Helper function to get the display name of a plugin.
Args:
name: Name of the plugin.
version: Version of the plugin.
author: Username of the plugin author.
Returns:
Display name of the plugin.
"""
author_prefix = ""
if author and author != ZENML_HUB_ADMIN_USERNAME:
author_prefix = f"{author}/"
version_suffix = f":{version}" if version else ":latest"
return f"{author_prefix}{name}{version_suffix}"