Skip to content

Hub

zenml._hub special

ZenML Hub internal code.

client

Client for the ZenML Hub.

HubAPIError (Exception)

Exception raised when the Hub returns an error or unexpected response.

Source code in zenml/_hub/client.py
class HubAPIError(Exception):
    """Exception raised when the Hub returns an error or unexpected response."""

HubClient

Client for the ZenML Hub.

Source code in zenml/_hub/client.py
class HubClient:
    """Client for the ZenML Hub."""

    def __init__(self, url: Optional[str] = None) -> None:
        """Initialize the client.

        Args:
            url: The URL of the ZenML Hub.
        """
        self.url = url or self.get_default_url()
        self.auth_token = Client().active_user.hub_token

    @staticmethod
    def get_default_url() -> str:
        """Get the default URL of the ZenML Hub.

        Returns:
            The default URL of the ZenML Hub.
        """
        return os.getenv(ENV_ZENML_HUB_URL, default=ZENML_HUB_DEFAULT_URL)

    def list_plugins(self, **params: Any) -> List[HubPluginResponseModel]:
        """List all plugins in the hub.

        Args:
            **params: The query parameters to send in the request.

        Returns:
            The list of plugin response models.
        """
        response = self._request("GET", "/plugins", params=params)
        if not isinstance(response, list):
            return []
        return [
            HubPluginResponseModel.model_validate(plugin)
            for plugin in response
        ]

    def get_plugin(
        self,
        name: str,
        version: Optional[str] = None,
        author: Optional[str] = None,
    ) -> Optional[HubPluginResponseModel]:
        """Get a specific plugin from the hub.

        Args:
            name: The name of the plugin.
            version: The version of the plugin. If not specified, the latest
                version will be returned.
            author: Username of the author of the plugin.

        Returns:
            The plugin response model or None if the plugin does not exist.
        """
        route = "/plugins"
        if not author:
            author = ZENML_HUB_ADMIN_USERNAME
        options = [
            f"name={name}",
            f"username={author}",
        ]
        if version:
            options.append(f"version={version}")
        if options:
            route += "?" + "&".join(options)
        try:
            response = self._request("GET", route)
        except HubAPIError:
            return None
        if not isinstance(response, list) or len(response) == 0:
            return None
        return HubPluginResponseModel.model_validate(response[0])

    def create_plugin(
        self, plugin_request: HubPluginRequestModel
    ) -> HubPluginResponseModel:
        """Create a plugin in the hub.

        Args:
            plugin_request: The plugin request model.

        Returns:
            The plugin response model.
        """
        route = "/plugins"
        response = self._request(
            "POST", route, data=plugin_request.model_dump_json()
        )
        return HubPluginResponseModel.model_validate(response)

    # TODO: Potentially reenable this later if hub adds logs streaming endpoint
    # def stream_plugin_build_logs(
    #     self, plugin_name: str, plugin_version: str
    # ) -> bool:
    #     """Stream the build logs of a plugin.

    #     Args:
    #         plugin_name: The name of the plugin.
    #         plugin_version: The version of the plugin. If not specified, the
    #             latest version will be used.

    #     Returns:
    #         Whether any logs were found.

    #     Raises:
    #         HubAPIError: If the build failed.
    #     """
    #     route = f"plugins/{plugin_name}/versions/{plugin_version}/logs"
    #     logs_url = os.path.join(self.url, route)

    #     found_logs = False
    #     with requests.get(logs_url, stream=True) as response:
    #         for line in response.iter_lines(
    #             chunk_size=None, decode_unicode=True
    #         ):
    #             found_logs = True
    #             if line.startswith("Build failed"):
    #                 raise HubAPIError(line)
    #             else:
    #                 logger.info(line)
    #     return found_logs

    def login(self, username: str, password: str) -> None:
        """Login to the ZenML Hub.

        Args:
            username: The username of the user in the ZenML Hub.
            password: The password of the user in the ZenML Hub.

        Raises:
            HubAPIError: If the login failed.
        """
        route = "/auth/jwt/login"
        response = self._request(
            method="POST",
            route=route,
            data={"username": username, "password": password},
            content_type="application/x-www-form-urlencoded",
        )
        if isinstance(response, dict):
            auth_token = response.get("access_token")
            if auth_token:
                self.set_auth_token(str(auth_token))
                return
        raise HubAPIError(f"Unexpected response: {response}")

    def set_auth_token(self, auth_token: Optional[str]) -> None:
        """Set the auth token.

        Args:
            auth_token: The auth token to set.
        """
        client = Client()
        client.update_user(
            name_id_or_prefix=client.active_user.id,
            updated_hub_token=auth_token,
        )
        self.auth_token = auth_token

    def get_github_login_url(self) -> str:
        """Get the GitHub login URL.

        Returns:
            The GitHub login URL.

        Raises:
            HubAPIError: If the request failed.
        """
        route = "/auth/github/authorize"
        response = self._request("GET", route)
        if isinstance(response, dict):
            auth_url = response.get("authorization_url")
            if auth_url:
                return str(auth_url)
        raise HubAPIError(f"Unexpected response: {str(response)}")

    def get_me(self) -> Optional[HubUserResponseModel]:
        """Get the current user.

        Returns:
            The user response model or None if the user does not exist.
        """
        try:
            response = self._request("GET", "/users/me")
            return HubUserResponseModel.model_validate(response)
        except HubAPIError:
            return None

    def _request(
        self,
        method: str,
        route: str,
        data: Optional[Any] = None,
        params: Optional[Dict[str, Any]] = None,
        content_type: str = "application/json",
    ) -> Any:
        """Helper function to make a request to the hub.

        Args:
            method: The HTTP method to use.
            route: The route to send the request to, e.g., "/plugins".
            data: The data to send in the request.
            params: The query parameters to send in the request.
            content_type: The content type of the request.

        Returns:
            The response JSON.

        Raises:
            HubAPIError: If the request failed.
        """
        session = requests.Session()

        # Define headers
        headers = {
            "Accept": "application/json",
            "Content-Type": content_type,
            "Debug-Context": str(IS_DEBUG_ENV),
            "Source-Context": str(source_context.get().value),
        }
        if self.auth_token:
            headers["Authorization"] = f"Bearer {self.auth_token}"

        # Make the request
        route = route.lstrip("/")
        endpoint_url = os.path.join(self.url, route)
        response = session.request(
            method=method,
            url=endpoint_url,
            data=data,
            headers=headers,
            params=params,
            verify=ZENML_HUB_CLIENT_VERIFY,
            timeout=ZENML_HUB_CLIENT_TIMEOUT,
        )

        # Parse and return the response
        if 200 <= response.status_code < 300:
            return response.json()
        try:
            error_msg = response.json().get("detail", response.text)
        except JSONDecodeError:
            error_msg = response.text
        raise HubAPIError(f"Request to ZenML Hub failed: {error_msg}")
__init__(self, url=None) special

Initialize the client.

Parameters:

Name Type Description Default
url Optional[str]

The URL of the ZenML Hub.

None
Source code in zenml/_hub/client.py
def __init__(self, url: Optional[str] = None) -> None:
    """Initialize the client.

    Args:
        url: The URL of the ZenML Hub.
    """
    self.url = url or self.get_default_url()
    self.auth_token = Client().active_user.hub_token
create_plugin(self, plugin_request)

Create a plugin in the hub.

Parameters:

Name Type Description Default
plugin_request HubPluginRequestModel

The plugin request model.

required

Returns:

Type Description
HubPluginResponseModel

The plugin response model.

Source code in zenml/_hub/client.py
def create_plugin(
    self, plugin_request: HubPluginRequestModel
) -> HubPluginResponseModel:
    """Create a plugin in the hub.

    Args:
        plugin_request: The plugin request model.

    Returns:
        The plugin response model.
    """
    route = "/plugins"
    response = self._request(
        "POST", route, data=plugin_request.model_dump_json()
    )
    return HubPluginResponseModel.model_validate(response)
get_default_url() staticmethod

Get the default URL of the ZenML Hub.

Returns:

Type Description
str

The default URL of the ZenML Hub.

Source code in zenml/_hub/client.py
@staticmethod
def get_default_url() -> str:
    """Get the default URL of the ZenML Hub.

    Returns:
        The default URL of the ZenML Hub.
    """
    return os.getenv(ENV_ZENML_HUB_URL, default=ZENML_HUB_DEFAULT_URL)
get_github_login_url(self)

Get the GitHub login URL.

Returns:

Type Description
str

The GitHub login URL.

Exceptions:

Type Description
HubAPIError

If the request failed.

Source code in zenml/_hub/client.py
def get_github_login_url(self) -> str:
    """Get the GitHub login URL.

    Returns:
        The GitHub login URL.

    Raises:
        HubAPIError: If the request failed.
    """
    route = "/auth/github/authorize"
    response = self._request("GET", route)
    if isinstance(response, dict):
        auth_url = response.get("authorization_url")
        if auth_url:
            return str(auth_url)
    raise HubAPIError(f"Unexpected response: {str(response)}")
get_me(self)

Get the current user.

Returns:

Type Description
Optional[zenml.models.v2.misc.hub_plugin_models.HubUserResponseModel]

The user response model or None if the user does not exist.

Source code in zenml/_hub/client.py
def get_me(self) -> Optional[HubUserResponseModel]:
    """Get the current user.

    Returns:
        The user response model or None if the user does not exist.
    """
    try:
        response = self._request("GET", "/users/me")
        return HubUserResponseModel.model_validate(response)
    except HubAPIError:
        return None
get_plugin(self, name, version=None, author=None)

Get a specific plugin from the hub.

Parameters:

Name Type Description Default
name str

The name of the plugin.

required
version Optional[str]

The version of the plugin. If not specified, the latest version will be returned.

None
author Optional[str]

Username of the author of the plugin.

None

Returns:

Type Description
Optional[zenml.models.v2.misc.hub_plugin_models.HubPluginResponseModel]

The plugin response model or None if the plugin does not exist.

Source code in zenml/_hub/client.py
def get_plugin(
    self,
    name: str,
    version: Optional[str] = None,
    author: Optional[str] = None,
) -> Optional[HubPluginResponseModel]:
    """Get a specific plugin from the hub.

    Args:
        name: The name of the plugin.
        version: The version of the plugin. If not specified, the latest
            version will be returned.
        author: Username of the author of the plugin.

    Returns:
        The plugin response model or None if the plugin does not exist.
    """
    route = "/plugins"
    if not author:
        author = ZENML_HUB_ADMIN_USERNAME
    options = [
        f"name={name}",
        f"username={author}",
    ]
    if version:
        options.append(f"version={version}")
    if options:
        route += "?" + "&".join(options)
    try:
        response = self._request("GET", route)
    except HubAPIError:
        return None
    if not isinstance(response, list) or len(response) == 0:
        return None
    return HubPluginResponseModel.model_validate(response[0])
list_plugins(self, **params)

List all plugins in the hub.

Parameters:

Name Type Description Default
**params Any

The query parameters to send in the request.

{}

Returns:

Type Description
List[zenml.models.v2.misc.hub_plugin_models.HubPluginResponseModel]

The list of plugin response models.

Source code in zenml/_hub/client.py
def list_plugins(self, **params: Any) -> List[HubPluginResponseModel]:
    """List all plugins in the hub.

    Args:
        **params: The query parameters to send in the request.

    Returns:
        The list of plugin response models.
    """
    response = self._request("GET", "/plugins", params=params)
    if not isinstance(response, list):
        return []
    return [
        HubPluginResponseModel.model_validate(plugin)
        for plugin in response
    ]
login(self, username, password)

Login to the ZenML Hub.

Parameters:

Name Type Description Default
username str

The username of the user in the ZenML Hub.

required
password str

The password of the user in the ZenML Hub.

required

Exceptions:

Type Description
HubAPIError

If the login failed.

Source code in zenml/_hub/client.py
def login(self, username: str, password: str) -> None:
    """Login to the ZenML Hub.

    Args:
        username: The username of the user in the ZenML Hub.
        password: The password of the user in the ZenML Hub.

    Raises:
        HubAPIError: If the login failed.
    """
    route = "/auth/jwt/login"
    response = self._request(
        method="POST",
        route=route,
        data={"username": username, "password": password},
        content_type="application/x-www-form-urlencoded",
    )
    if isinstance(response, dict):
        auth_token = response.get("access_token")
        if auth_token:
            self.set_auth_token(str(auth_token))
            return
    raise HubAPIError(f"Unexpected response: {response}")
set_auth_token(self, auth_token)

Set the auth token.

Parameters:

Name Type Description Default
auth_token Optional[str]

The auth token to set.

required
Source code in zenml/_hub/client.py
def set_auth_token(self, auth_token: Optional[str]) -> None:
    """Set the auth token.

    Args:
        auth_token: The auth token to set.
    """
    client = Client()
    client.update_user(
        name_id_or_prefix=client.active_user.id,
        updated_hub_token=auth_token,
    )
    self.auth_token = auth_token

constants

Constants for the ZenML hub.

utils

Utility functions for the ZenML Hub.

parse_plugin_name(plugin_name, author_separator='/', version_separator=':')

Helper function to parse a plugin name.

Parameters:

Name Type Description Default
plugin_name str

The user-provided plugin name.

required
author_separator str

The separator between the author username and the plugin name.

'/'
version_separator str

The separator between the plugin name and the plugin version.

':'

Returns:

Type Description
Tuple[Optional[str], str, str]
  • The author username or None if no author was specified.
  • The plugin name.
  • The plugin version or "latest" if no version was specified.

Exceptions:

Type Description
ValueError

If the plugin name is invalid.

Source code in zenml/_hub/utils.py
def parse_plugin_name(
    plugin_name: str, author_separator: str = "/", version_separator: str = ":"
) -> Tuple[Optional[str], str, str]:
    """Helper function to parse a plugin name.

    Args:
        plugin_name: The user-provided plugin name.
        author_separator: The separator between the author username and the
            plugin name.
        version_separator: The separator between the plugin name and the
            plugin version.

    Returns:
        - The author username or None if no author was specified.
        - The plugin name.
        - The plugin version or "latest" if no version was specified.

    Raises:
        ValueError: If the plugin name is invalid.
    """
    invalid_format_err_msg = (
        f"Invalid plugin name '{plugin_name}'. Expected format: "
        f"`(<author_username>{author_separator})<plugin_name>({version_separator}<version>)`."
    )

    parts = plugin_name.split(version_separator)
    if len(parts) > 2:
        raise ValueError(invalid_format_err_msg)
    name, version = parts[0], "latest" if len(parts) == 1 else parts[1]

    parts = name.split(author_separator)
    if len(parts) > 2:
        raise ValueError(invalid_format_err_msg)
    name, author = parts[-1], None if len(parts) == 1 else parts[0]

    if not name:
        raise ValueError(invalid_format_err_msg)

    return author, name, version

plugin_display_name(name, version, author)

Helper function to get the display name of a plugin.

Parameters:

Name Type Description Default
name str

Name of the plugin.

required
version Optional[str]

Version of the plugin.

required
author Optional[str]

Username of the plugin author.

required

Returns:

Type Description
str

Display name of the plugin.

Source code in zenml/_hub/utils.py
def plugin_display_name(
    name: str, version: Optional[str], author: Optional[str]
) -> str:
    """Helper function to get the display name of a plugin.

    Args:
        name: Name of the plugin.
        version: Version of the plugin.
        author: Username of the plugin author.

    Returns:
        Display name of the plugin.
    """
    author_prefix = ""
    if author and author != ZENML_HUB_ADMIN_USERNAME:
        author_prefix = f"{author}/"
    version_suffix = f":{version}" if version else ":latest"
    return f"{author_prefix}{name}{version_suffix}"