Skip to content

Client

zenml.client

Client implementation.

Client

ZenML client class.

The ZenML client manages configuration options for ZenML stacks as well as their components.

Source code in zenml/client.py
class Client(metaclass=ClientMetaClass):
    """ZenML client class.

    The ZenML client manages configuration options for ZenML stacks as well
    as their components.
    """

    _active_user: Optional[UserResponseModel] = None

    def __init__(
        self,
        root: Optional[Path] = None,
    ) -> None:
        """Initializes the global client instance.

        Client is a singleton class: only one instance can exist. Calling
        this constructor multiple times will always yield the same instance (see
        the exception below).

        The `root` argument is only meant for internal use and testing purposes.
        User code must never pass them to the constructor.
        When a custom `root` value is passed, an anonymous Client instance
        is created and returned independently of the Client singleton and
        that will have no effect as far as the rest of the ZenML core code is
        concerned.

        Instead of creating a new Client instance to reflect a different
        repository root, to change the active root in the global Client,
        call `Client().activate_root(<new-root>)`.

        Args:
            root: (internal use) custom root directory for the client. If
                no path is given, the repository root is determined using the
                environment variable `ZENML_REPOSITORY_PATH` (if set) and by
                recursively searching in the parent directories of the
                current working directory. Only used to initialize new
                clients internally.
        """
        self._root: Optional[Path] = None
        self._config: Optional[ClientConfiguration] = None

        self._set_active_root(root)

    @classmethod
    def get_instance(cls) -> Optional["Client"]:
        """Return the Client singleton instance.

        Returns:
            The Client singleton instance or None, if the Client hasn't
            been initialized yet.
        """
        return cls._global_client

    @classmethod
    def _reset_instance(cls, client: Optional["Client"] = None) -> None:
        """Reset the Client singleton instance.

        This method is only meant for internal use and testing purposes.

        Args:
            client: The Client instance to set as the global singleton.
                If None, the global Client singleton is reset to an empty
                value.
        """
        cls._global_client = client

    def _set_active_root(self, root: Optional[Path] = None) -> None:
        """Set the supplied path as the repository root.

        If a client configuration is found at the given path or the
        path, it is loaded and used to initialize the client.
        If no client configuration is found, the global configuration is
        used instead to manage the active stack, workspace etc.

        Args:
            root: The path to set as the active repository root. If not set,
                the repository root is determined using the environment
                variable `ZENML_REPOSITORY_PATH` (if set) and by recursively
                searching in the parent directories of the current working
                directory.
        """
        enable_warnings = handle_bool_env_var(
            ENV_ZENML_ENABLE_REPO_INIT_WARNINGS, False
        )
        self._root = self.find_repository(
            root, enable_warnings=enable_warnings
        )

        if not self._root:
            self._config = None
            if enable_warnings:
                logger.info("Running without an active repository root.")
        else:
            logger.debug("Using repository root %s.", self._root)
            self._config = self._load_config()

        # Sanitize the client configuration to reflect the current
        # settings
        self._sanitize_config()

    def _config_path(self) -> Optional[str]:
        """Path to the client configuration file.

        Returns:
            Path to the client configuration file or None if the client
            root has not been initialized yet.
        """
        if not self.config_directory:
            return None
        return str(self.config_directory / "config.yaml")

    def _sanitize_config(self) -> None:
        """Sanitize and save the client configuration.

        This method is called to ensure that the client configuration
        doesn't contain outdated information, such as an active stack or
        workspace that no longer exists.
        """
        if not self._config:
            return

        active_workspace, active_stack = self.zen_store.validate_active_config(
            self._config.active_workspace_id,
            self._config.active_stack_id,
            config_name="repo",
        )
        self._config.set_active_stack(active_stack)
        self._config.set_active_workspace(active_workspace)

    def _load_config(self) -> Optional[ClientConfiguration]:
        """Loads the client configuration from disk.

        This happens if the client has an active root and the configuration
        file exists. If the configuration file doesn't exist, an empty
        configuration is returned.

        Returns:
            Loaded client configuration or None if the client does not
            have an active root.
        """
        config_path = self._config_path()
        if not config_path:
            return None

        # load the client configuration file if it exists, otherwise use
        # an empty configuration as default
        if fileio.exists(config_path):
            logger.debug(f"Loading client configuration from {config_path}.")
        else:
            logger.debug(
                "No client configuration file found, creating default "
                "configuration."
            )

        return ClientConfiguration(config_file=config_path)

    @staticmethod
    def initialize(
        root: Optional[Path] = None,
    ) -> None:
        """Initializes a new ZenML repository at the given path.

        Args:
            root: The root directory where the repository should be created.
                If None, the current working directory is used.

        Raises:
            InitializationException: If the root directory already contains a
                ZenML repository.
        """
        with event_handler(AnalyticsEvent.INITIALIZE_REPO):
            root = root or Path.cwd()
            logger.debug("Initializing new repository at path %s.", root)
            if Client.is_repository_directory(root):
                raise InitializationException(
                    f"Found existing ZenML repository at path '{root}'."
                )

            config_directory = str(root / REPOSITORY_DIRECTORY_NAME)
            io_utils.create_dir_recursive_if_not_exists(config_directory)
            # Initialize the repository configuration at the custom path
            Client(root=root)

    @property
    def uses_local_configuration(self) -> bool:
        """Check if the client is using a local configuration.

        Returns:
            True if the client is using a local configuration,
            False otherwise.
        """
        return self._config is not None

    @staticmethod
    def is_repository_directory(path: Path) -> bool:
        """Checks whether a ZenML client exists at the given path.

        Args:
            path: The path to check.

        Returns:
            True if a ZenML client exists at the given path,
            False otherwise.
        """
        config_dir = path / REPOSITORY_DIRECTORY_NAME
        return fileio.isdir(str(config_dir))

    @staticmethod
    def find_repository(
        path: Optional[Path] = None, enable_warnings: bool = False
    ) -> Optional[Path]:
        """Search for a ZenML repository directory.

        Args:
            path: Optional path to look for the repository. If no path is
                given, this function tries to find the repository using the
                environment variable `ZENML_REPOSITORY_PATH` (if set) and
                recursively searching in the parent directories of the current
                working directory.
            enable_warnings: If `True`, warnings are printed if the repository
                root cannot be found.

        Returns:
            Absolute path to a ZenML repository directory or None if no
            repository directory was found.
        """
        if not path:
            # try to get path from the environment variable
            env_var_path = os.getenv(ENV_ZENML_REPOSITORY_PATH)
            if env_var_path:
                path = Path(env_var_path)

        if path:
            # explicit path via parameter or environment variable, don't search
            # parent directories
            search_parent_directories = False
            warning_message = (
                f"Unable to find ZenML repository at path '{path}'. Make sure "
                f"to create a ZenML repository by calling `zenml init` when "
                f"specifying an explicit repository path in code or via the "
                f"environment variable '{ENV_ZENML_REPOSITORY_PATH}'."
            )
        else:
            # try to find the repository in the parent directories of the
            # current working directory
            path = Path.cwd()
            search_parent_directories = True
            warning_message = (
                f"Unable to find ZenML repository in your current working "
                f"directory ({path}) or any parent directories. If you "
                f"want to use an existing repository which is in a different "
                f"location, set the environment variable "
                f"'{ENV_ZENML_REPOSITORY_PATH}'. If you want to create a new "
                f"repository, run `zenml init`."
            )

        def _find_repository_helper(path_: Path) -> Optional[Path]:
            """Recursively search parent directories for a ZenML repository.

            Args:
                path_: The path to search.

            Returns:
                Absolute path to a ZenML repository directory or None if no
                repository directory was found.
            """
            if Client.is_repository_directory(path_):
                return path_

            if not search_parent_directories or io_utils.is_root(str(path_)):
                return None

            return _find_repository_helper(path_.parent)

        repository_path = _find_repository_helper(path)

        if repository_path:
            return repository_path.resolve()
        if enable_warnings:
            logger.warning(warning_message)
        return None

    @staticmethod
    def is_inside_repository(file_path: str) -> bool:
        """Returns whether a file is inside the active ZenML repository.

        Args:
            file_path: A file path.

        Returns:
            True if the file is inside the active ZenML repository, False
            otherwise.
        """
        repo_path = Client.find_repository()
        if not repo_path:
            return False

        return repo_path in Path(file_path).resolve().parents

    @property
    def zen_store(self) -> "BaseZenStore":
        """Shortcut to return the global zen store.

        Returns:
            The global zen store.
        """
        return GlobalConfiguration().zen_store

    @property
    def root(self) -> Optional[Path]:
        """The root directory of this client.

        Returns:
            The root directory of this client, or None, if the client
            has not been initialized.
        """
        return self._root

    @property
    def config_directory(self) -> Optional[Path]:
        """The configuration directory of this client.

        Returns:
            The configuration directory of this client, or None, if the
            client doesn't have an active root.
        """
        if not self.root:
            return None
        return self.root / REPOSITORY_DIRECTORY_NAME

    def activate_root(self, root: Optional[Path] = None) -> None:
        """Set the active repository root directory.

        Args:
            root: The path to set as the active repository root. If not set,
                the repository root is determined using the environment
                variable `ZENML_REPOSITORY_PATH` (if set) and by recursively
                searching in the parent directories of the current working
                directory.
        """
        self._set_active_root(root)

    @track(event=AnalyticsEvent.SET_WORKSPACE)
    def set_active_workspace(
        self, workspace_name_or_id: Union[str, UUID]
    ) -> "WorkspaceResponseModel":
        """Set the workspace for the local client.

        Args:
            workspace_name_or_id: The name or ID of the workspace to set active.

        Returns:
            The model of the active workspace.
        """
        workspace = self.zen_store.get_workspace(
            workspace_name_or_id=workspace_name_or_id
        )  # raises KeyError
        if self._config:
            self._config.set_active_workspace(workspace)
            # Sanitize the client configuration to reflect the current
            # settings
            self._sanitize_config()
        else:
            # set the active workspace globally only if the client doesn't use
            # a local configuration
            GlobalConfiguration().set_active_workspace(workspace)
        return workspace

    # ---- #
    # USER #
    # ---- #

    @property
    def active_user(self) -> "UserResponseModel":
        """Get the user that is currently in use.

        Returns:
            The active user.
        """
        if self._active_user is None:
            self._active_user = self.zen_store.get_user(include_private=True)
        return self._active_user

    def create_user(
        self,
        name: str,
        initial_role: Optional[str] = None,
        password: Optional[str] = None,
    ) -> UserResponseModel:
        """Create a new user.

        Args:
            name: The name of the user.
            initial_role: Optionally, an initial role to assign to the user.
            password: The password of the user. If not provided, the user will
                be created with empty password.

        Returns:
            The model of the created user.
        """
        user = UserRequestModel(name=name, password=password or None)
        if self.zen_store.type != StoreType.REST:
            user.active = password != ""
        else:
            user.active = True

        created_user = self.zen_store.create_user(user=user)

        if initial_role:
            self.create_user_role_assignment(
                role_name_or_id=initial_role,
                user_name_or_id=created_user.id,
                workspace_name_or_id=None,
            )

        return created_user

    def get_user(
        self,
        name_id_or_prefix: Union[str, UUID],
        allow_name_prefix_match: bool = True,
    ) -> UserResponseModel:
        """Gets a user.

        Args:
            name_id_or_prefix: The name or ID of the user.
            allow_name_prefix_match: If True, allow matching by name prefix.

        Returns:
            The User
        """
        return self._get_entity_by_id_or_name_or_prefix(
            get_method=self.zen_store.get_user,
            list_method=self.list_users,
            name_id_or_prefix=name_id_or_prefix,
            allow_name_prefix_match=allow_name_prefix_match,
        )

    def list_users(
        self,
        sort_by: str = "created",
        page: int = PAGINATION_STARTING_PAGE,
        size: int = PAGE_SIZE_DEFAULT,
        logical_operator: LogicalOperators = LogicalOperators.AND,
        id: Optional[Union[UUID, str]] = None,
        created: Optional[Union[datetime, str]] = None,
        updated: Optional[Union[datetime, str]] = None,
        name: Optional[str] = None,
        full_name: Optional[str] = None,
        email: Optional[str] = None,
        active: Optional[bool] = None,
        email_opted_in: Optional[bool] = None,
    ) -> Page[UserResponseModel]:
        """List all users.

        Args:
            sort_by: The column to sort by
            page: The page of items
            size: The maximum size of all pages
            logical_operator: Which logical operator to use [and, or]
            id: Use the id of stacks to filter by.
            created: Use to filter by time of creation
            updated: Use the last updated date for filtering
            name: Use the username for filtering
            full_name: Use the user full name for filtering
            email: Use the user email for filtering
            active: User the user active status for filtering
            email_opted_in: Use the user opt in status for filtering

        Returns:
            The User
        """
        return self.zen_store.list_users(
            UserFilterModel(
                sort_by=sort_by,
                page=page,
                size=size,
                logical_operator=logical_operator,
                id=id,
                created=created,
                updated=updated,
                name=name,
                full_name=full_name,
                email=email,
                active=active,
                email_opted_in=email_opted_in,
            )
        )

    def delete_user(self, name_id_or_prefix: str) -> None:
        """Delete a user.

        Args:
            name_id_or_prefix: The name or ID of the user to delete.
        """
        user = self.get_user(name_id_or_prefix, allow_name_prefix_match=False)
        self.zen_store.delete_user(user_name_or_id=user.name)

    def update_user(
        self,
        name_id_or_prefix: Union[str, UUID],
        updated_name: Optional[str] = None,
        updated_full_name: Optional[str] = None,
        updated_email: Optional[str] = None,
        updated_email_opt_in: Optional[bool] = None,
        updated_hub_token: Optional[str] = None,
    ) -> UserResponseModel:
        """Update a user.

        Args:
            name_id_or_prefix: The name or ID of the user to update.
            updated_name: The new name of the user.
            updated_full_name: The new full name of the user.
            updated_email: The new email of the user.
            updated_email_opt_in: The new email opt-in status of the user.
            updated_hub_token: Update the hub token

        Returns:
            The updated user.
        """
        user = self.get_user(
            name_id_or_prefix=name_id_or_prefix, allow_name_prefix_match=False
        )
        user_update = UserUpdateModel(name=updated_name or user.name)
        if updated_full_name:
            user_update.full_name = updated_full_name
        if updated_email is not None:
            user_update.email = updated_email
            user_update.email_opted_in = (
                updated_email_opt_in or user.email_opted_in
            )
        if updated_email_opt_in is not None:
            user_update.email_opted_in = updated_email_opt_in
        if updated_hub_token is not None:
            user_update.hub_token = updated_hub_token

        return self.zen_store.update_user(
            user_id=user.id, user_update=user_update
        )

    # ---- #
    # TEAM #
    # ---- #

    def get_team(
        self,
        name_id_or_prefix: Union[str, UUID],
        allow_name_prefix_match: bool = True,
    ) -> TeamResponseModel:
        """Gets a team.

        Args:
            name_id_or_prefix: The name or ID of the team.
            allow_name_prefix_match: If True, allow matching by name prefix.

        Returns:
            The Team
        """
        return self._get_entity_by_id_or_name_or_prefix(
            get_method=self.zen_store.get_team,
            list_method=self.list_teams,
            name_id_or_prefix=name_id_or_prefix,
            allow_name_prefix_match=allow_name_prefix_match,
        )

    def list_teams(
        self,
        sort_by: str = "created",
        page: int = PAGINATION_STARTING_PAGE,
        size: int = PAGE_SIZE_DEFAULT,
        logical_operator: LogicalOperators = LogicalOperators.AND,
        id: Optional[Union[UUID, str]] = None,
        created: Optional[Union[datetime, str]] = None,
        updated: Optional[Union[datetime, str]] = None,
        name: Optional[str] = None,
    ) -> Page[TeamResponseModel]:
        """List all teams.

        Args:
            sort_by: The column to sort by
            page: The page of items
            size: The maximum size of all pages
            logical_operator: Which logical operator to use [and, or]
            id: Use the id of teams to filter by.
            created: Use to filter by time of creation
            updated: Use the last updated date for filtering
            name: Use the team name for filtering

        Returns:
            The Team
        """
        return self.zen_store.list_teams(
            TeamFilterModel(
                sort_by=sort_by,
                page=page,
                size=size,
                logical_operator=logical_operator,
                id=id,
                created=created,
                updated=updated,
                name=name,
            )
        )

    def create_team(
        self, name: str, users: Optional[List[str]] = None
    ) -> TeamResponseModel:
        """Create a team.

        Args:
            name: Name of the team.
            users: Users to add to the team.

        Returns:
            The created team.
        """
        user_list = []
        if users:
            for user_name_or_id in users:
                user_list.append(
                    self.get_user(name_id_or_prefix=user_name_or_id).id
                )

        team = TeamRequestModel(name=name, users=user_list)

        return self.zen_store.create_team(team=team)

    def delete_team(self, name_id_or_prefix: str) -> None:
        """Delete a team.

        Args:
            name_id_or_prefix: The name or ID of the team to delete.
        """
        team = self.get_team(name_id_or_prefix, allow_name_prefix_match=False)
        self.zen_store.delete_team(team_name_or_id=team.id)

    def update_team(
        self,
        name_id_or_prefix: str,
        new_name: Optional[str] = None,
        remove_users: Optional[List[str]] = None,
        add_users: Optional[List[str]] = None,
    ) -> TeamResponseModel:
        """Update a team.

        Args:
            name_id_or_prefix: The name or ID of the team to update.
            new_name: The new name of the team.
            remove_users: The users to remove from the team.
            add_users: The users to add to the team.

        Returns:
            The updated team.

        Raises:
            RuntimeError: If the same user is in both `remove_users` and
                `add_users`.
        """
        team = self.get_team(name_id_or_prefix, allow_name_prefix_match=False)

        team_update = TeamUpdateModel(name=new_name or team.name)
        if remove_users is not None and add_users is not None:
            union_add_rm = set(remove_users) & set(add_users)
            if union_add_rm:
                raise RuntimeError(
                    f"The `remove_user` and `add_user` "
                    f"options both contain the same value(s): "
                    f"`{union_add_rm}`. Please rerun command and make sure "
                    f"that the same user does not show up for "
                    f"`remove_user` and `add_user`."
                )

        # Only if permissions are being added or removed will they need to be
        #  set for the update model
        team_users = []

        if remove_users or add_users:
            team_users = [u.id for u in team.users]
        if remove_users:
            for rm_p in remove_users:
                user = self.get_user(rm_p)
                try:
                    team_users.remove(user.id)
                except KeyError:
                    logger.warning(
                        f"Role {remove_users} was already not "
                        f"part of the '{team.name}' Team."
                    )
        if add_users:
            for add_u in add_users:
                team_users.append(self.get_user(add_u).id)

        if team_users:
            team_update.users = team_users

        return self.zen_store.update_team(
            team_id=team.id, team_update=team_update
        )

    # ----- #
    # ROLES #
    # ----- #

    def get_role(
        self,
        name_id_or_prefix: Union[str, UUID],
        allow_name_prefix_match: bool = True,
    ) -> RoleResponseModel:
        """Gets a role.

        Args:
            name_id_or_prefix: The name or ID of the role.
            allow_name_prefix_match: If True, allow matching by name prefix.

        Returns:
            The fetched role.
        """
        return self._get_entity_by_id_or_name_or_prefix(
            get_method=self.zen_store.get_role,
            list_method=self.list_roles,
            name_id_or_prefix=name_id_or_prefix,
            allow_name_prefix_match=allow_name_prefix_match,
        )

    def list_roles(
        self,
        sort_by: str = "created",
        page: int = PAGINATION_STARTING_PAGE,
        size: int = PAGE_SIZE_DEFAULT,
        logical_operator: LogicalOperators = LogicalOperators.AND,
        id: Optional[Union[UUID, str]] = None,
        created: Optional[Union[datetime, str]] = None,
        updated: Optional[Union[datetime, str]] = None,
        name: Optional[str] = None,
    ) -> Page[RoleResponseModel]:
        """List all roles.

        Args:
            sort_by: The column to sort by
            page: The page of items
            size: The maximum size of all pages
            logical_operator: The logical operator to use between column filters
            id: Use the id of roles to filter by.
            created: Use to filter by time of creation
            updated: Use the last updated date for filtering
            name: Use the role name for filtering

        Returns:
            The Role
        """
        return self.zen_store.list_roles(
            RoleFilterModel(
                sort_by=sort_by,
                page=page,
                size=size,
                logical_operator=logical_operator,
                id=id,
                created=created,
                updated=updated,
                name=name,
            )
        )

    def create_role(
        self, name: str, permissions_list: List[str]
    ) -> RoleResponseModel:
        """Creates a role.

        Args:
            name: The name for the new role.
            permissions_list: The permissions to attach to this role.

        Returns:
            The newly created role.
        """
        permissions: Set[PermissionType] = set()
        for permission in permissions_list:
            if permission in PermissionType.values():
                permissions.add(PermissionType(permission))

        new_role = RoleRequestModel(name=name, permissions=permissions)
        return self.zen_store.create_role(new_role)

    def update_role(
        self,
        name_id_or_prefix: str,
        new_name: Optional[str] = None,
        remove_permission: Optional[List[str]] = None,
        add_permission: Optional[List[str]] = None,
    ) -> RoleResponseModel:
        """Updates a role.

        Args:
            name_id_or_prefix: The name or ID of the role.
            new_name: The new name for the role
            remove_permission: Permissions to remove from this role.
            add_permission: Permissions to add to this role.

        Returns:
            The updated role.

        Raises:
            RuntimeError: If the same permission is in both the
                `remove_permission` and `add_permission` lists.
        """
        role = self.get_role(
            name_id_or_prefix=name_id_or_prefix, allow_name_prefix_match=False
        )

        role_update = RoleUpdateModel(name=new_name or role.name)  # type: ignore[call-arg]

        if remove_permission is not None and add_permission is not None:
            union_add_rm = set(remove_permission) & set(add_permission)
            if union_add_rm:
                raise RuntimeError(
                    f"The `remove_permission` and `add_permission` "
                    f"options both contain the same value(s): "
                    f"`{union_add_rm}`. Please rerun command and make sure "
                    f"that the same role does not show up for "
                    f"`remove_permission` and `add_permission`."
                )

        # Only if permissions are being added or removed will they need to be
        #  set for the update model
        if remove_permission or add_permission:
            role_permissions = role.permissions

            if remove_permission:
                for rm_p in remove_permission:
                    if rm_p in PermissionType:
                        try:
                            role_permissions.remove(PermissionType(rm_p))
                        except KeyError:
                            logger.warning(
                                f"Role {remove_permission} was already not "
                                f"part of the {role} Role."
                            )
            if add_permission:
                for add_p in add_permission:
                    if add_p in PermissionType.values():
                        # Set won't throw an error if the item was already in it
                        role_permissions.add(PermissionType(add_p))

            if role_permissions is not None:
                role_update.permissions = set(role_permissions)

        return Client().zen_store.update_role(
            role_id=role.id, role_update=role_update
        )

    def delete_role(self, name_id_or_prefix: str) -> None:
        """Deletes a role.

        Args:
            name_id_or_prefix: The name or ID of the role.
        """
        role = self.get_role(
            name_id_or_prefix=name_id_or_prefix, allow_name_prefix_match=False
        )
        self.zen_store.delete_role(role_name_or_id=role.id)

    # --------------------- #
    # USER ROLE ASSIGNMENTS #
    # --------------------- #

    def get_user_role_assignment(
        self, role_assignment_id: UUID
    ) -> UserRoleAssignmentResponseModel:
        """Get a role assignment.

        Args:
            role_assignment_id: The id of the role assignments

        Returns:
            The role assignment.
        """
        return self.zen_store.get_user_role_assignment(
            user_role_assignment_id=role_assignment_id
        )

    def create_user_role_assignment(
        self,
        role_name_or_id: Union[str, UUID],
        user_name_or_id: Union[str, UUID],
        workspace_name_or_id: Optional[Union[str, UUID]] = None,
    ) -> UserRoleAssignmentResponseModel:
        """Create a role assignment.

        Args:
            role_name_or_id: Name or ID of the role to assign.
            user_name_or_id: Name or ID of the user or team to assign
                the role to.
            workspace_name_or_id: workspace scope within which to assign the role.

        Returns:
            The newly created role assignment.
        """
        role = self.get_role(name_id_or_prefix=role_name_or_id)
        workspace = None
        if workspace_name_or_id:
            workspace = self.get_workspace(
                name_id_or_prefix=workspace_name_or_id
            )
        user = self.get_user(name_id_or_prefix=user_name_or_id)
        role_assignment = UserRoleAssignmentRequestModel(
            role=role.id,
            user=user.id,
            workspace=workspace,
        )
        return self.zen_store.create_user_role_assignment(
            user_role_assignment=role_assignment
        )

    def delete_user_role_assignment(self, role_assignment_id: UUID) -> None:
        """Delete a role assignment.

        Args:
            role_assignment_id: The id of the role assignments

        """
        self.zen_store.delete_user_role_assignment(role_assignment_id)

    def list_user_role_assignment(
        self,
        sort_by: str = "created",
        page: int = PAGINATION_STARTING_PAGE,
        size: int = PAGE_SIZE_DEFAULT,
        logical_operator: LogicalOperators = LogicalOperators.AND,
        id: Optional[Union[UUID, str]] = None,
        created: Optional[Union[datetime, str]] = None,
        updated: Optional[Union[datetime, str]] = None,
        workspace_id: Optional[Union[str, UUID]] = None,
        user_id: Optional[Union[str, UUID]] = None,
        role_id: Optional[Union[str, UUID]] = None,
    ) -> Page[UserRoleAssignmentResponseModel]:
        """List all user role assignments.

        Args:
            sort_by: The column to sort by
            page: The page of items
            size: The maximum size of all pages
            logical_operator: Which logical operator to use [and, or]
            id: Use the id of the user role assignment to filter by.
            created: Use to filter by time of creation
            updated: Use the last updated date for filtering
            workspace_id: The id of the workspace to filter by.
            user_id: The id of the user to filter by.
            role_id: The id of the role to filter by.

        Returns:
            The Team
        """
        return self.zen_store.list_user_role_assignments(
            UserRoleAssignmentFilterModel(
                sort_by=sort_by,
                page=page,
                size=size,
                logical_operator=logical_operator,
                id=id,
                created=created,
                updated=updated,
                workspace_id=workspace_id,
                user_id=user_id,
                role_id=role_id,
            )
        )

    # --------------------- #
    # TEAM ROLE ASSIGNMENTS #
    # --------------------- #

    def get_team_role_assignment(
        self, team_role_assignment_id: UUID
    ) -> TeamRoleAssignmentResponseModel:
        """Get a role assignment.

        Args:
            team_role_assignment_id: The id of the role assignments

        Returns:
            The role assignment.
        """
        return self.zen_store.get_team_role_assignment(
            team_role_assignment_id=team_role_assignment_id
        )

    def create_team_role_assignment(
        self,
        role_name_or_id: Union[str, UUID],
        team_name_or_id: Union[str, UUID],
        workspace_name_or_id: Optional[Union[str, UUID]] = None,
    ) -> TeamRoleAssignmentResponseModel:
        """Create a role assignment.

        Args:
            role_name_or_id: Name or ID of the role to assign.
            team_name_or_id: Name or ID of the team to assign
                the role to.
            workspace_name_or_id: workspace scope within which to assign the role.

        Returns:
            The newly created role assignment.
        """
        role = self.get_role(name_id_or_prefix=role_name_or_id)
        workspace = None
        if workspace_name_or_id:
            workspace = self.get_workspace(
                name_id_or_prefix=workspace_name_or_id
            )
        team = self.get_team(name_id_or_prefix=team_name_or_id)
        role_assignment = TeamRoleAssignmentRequestModel(
            role=role.id,
            team=team.id,
            workspace=workspace,
        )
        return self.zen_store.create_team_role_assignment(
            team_role_assignment=role_assignment
        )

    def delete_team_role_assignment(self, role_assignment_id: UUID) -> None:
        """Delete a role assignment.

        Args:
            role_assignment_id: The id of the role assignments

        """
        self.zen_store.delete_team_role_assignment(role_assignment_id)

    def list_team_role_assignment(
        self,
        sort_by: str = "created",
        page: int = PAGINATION_STARTING_PAGE,
        size: int = PAGE_SIZE_DEFAULT,
        logical_operator: LogicalOperators = LogicalOperators.AND,
        id: Optional[Union[UUID, str]] = None,
        created: Optional[Union[datetime, str]] = None,
        updated: Optional[Union[datetime, str]] = None,
        workspace_id: Optional[Union[str, UUID]] = None,
        team_id: Optional[Union[str, UUID]] = None,
        role_id: Optional[Union[str, UUID]] = None,
    ) -> Page[TeamRoleAssignmentResponseModel]:
        """List all team role assignments.

        Args:
            sort_by: The column to sort by
            page: The page of items
            size: The maximum size of all pages
            logical_operator: Which logical operator to use [and, or]
            id: Use the id of the team role assignment to filter by.
            created: Use to filter by time of creation
            updated: Use the last updated date for filtering
            workspace_id: The id of the workspace to filter by.
            team_id: The id of the team to filter by.
            role_id: The id of the role to filter by.

        Returns:
            The Team
        """
        return self.zen_store.list_team_role_assignments(
            TeamRoleAssignmentFilterModel(
                sort_by=sort_by,
                page=page,
                size=size,
                logical_operator=logical_operator,
                id=id,
                created=created,
                updated=updated,
                workspace_id=workspace_id,
                team_id=team_id,
                role_id=role_id,
            )
        )

    # --------- #
    # WORKSPACE #
    # --------- #

    @property
    def active_workspace(self) -> "WorkspaceResponseModel":
        """Get the currently active workspace of the local client.

        If no active workspace is configured locally for the client, the
        active workspace in the global configuration is used instead.

        Returns:
            The active workspace.

        Raises:
            RuntimeError: If the active workspace is not set.
        """
        if ENV_ZENML_ACTIVE_WORKSPACE_ID in os.environ:
            workspace_id = os.environ[ENV_ZENML_ACTIVE_WORKSPACE_ID]
            return self.get_workspace(workspace_id)

        workspace: Optional["WorkspaceResponseModel"] = None
        if self._config:
            workspace = self._config.active_workspace

        if not workspace:
            workspace = GlobalConfiguration().get_active_workspace()

        if not workspace:
            raise RuntimeError(
                "No active workspace is configured. Run "
                "`zenml workspace set WORKSPACE_NAME` to set the active "
                "workspace."
            )

        from zenml.zen_stores.base_zen_store import DEFAULT_WORKSPACE_NAME

        if workspace.name != DEFAULT_WORKSPACE_NAME:
            logger.warning(
                f"You are running with a non-default workspace "
                f"'{workspace.name}'. Any stacks, components, "
                f"pipelines and pipeline runs produced in this "
                f"workspace will currently not be accessible through "
                f"the dashboard. However, this will be possible "
                f"in the near future."
            )
        return workspace

    def get_workspace(
        self,
        name_id_or_prefix: Optional[Union[UUID, str]],
        allow_name_prefix_match: bool = True,
    ) -> WorkspaceResponseModel:
        """Gets a workspace.

        Args:
            name_id_or_prefix: The name or ID of the workspace.
            allow_name_prefix_match: If True, allow matching by name prefix.

        Returns:
            The workspace
        """
        if not name_id_or_prefix:
            return self.active_workspace
        return self._get_entity_by_id_or_name_or_prefix(
            get_method=self.zen_store.get_workspace,
            list_method=self.list_workspaces,
            name_id_or_prefix=name_id_or_prefix,
            allow_name_prefix_match=allow_name_prefix_match,
        )

    def list_workspaces(
        self,
        sort_by: str = "created",
        page: int = PAGINATION_STARTING_PAGE,
        size: int = PAGE_SIZE_DEFAULT,
        logical_operator: LogicalOperators = LogicalOperators.AND,
        id: Optional[Union[UUID, str]] = None,
        created: Optional[Union[datetime, str]] = None,
        updated: Optional[Union[datetime, str]] = None,
        name: Optional[str] = None,
    ) -> Page[WorkspaceResponseModel]:
        """List all workspaces.

        Args:
            sort_by: The column to sort by
            page: The page of items
            size: The maximum size of all pages
            logical_operator: Which logical operator to use [and, or]
            id: Use the id of teams to filter by.
            created: Use to filter by time of creation
            updated: Use the last updated date for filtering
            name: Use the team name for filtering

        Returns:
            The Team
        """
        return self.zen_store.list_workspaces(
            WorkspaceFilterModel(
                sort_by=sort_by,
                page=page,
                size=size,
                logical_operator=logical_operator,
                id=id,
                created=created,
                updated=updated,
                name=name,
            )
        )

    def create_workspace(
        self, name: str, description: str
    ) -> "WorkspaceResponseModel":
        """Create a new workspace.

        Args:
            name: Name of the workspace.
            description: Description of the workspace.

        Returns:
            The created workspace.
        """
        return self.zen_store.create_workspace(
            WorkspaceRequestModel(name=name, description=description)
        )

    def update_workspace(
        self,
        name_id_or_prefix: Optional[Union[UUID, str]],
        new_name: Optional[str] = None,
        new_description: Optional[str] = None,
    ) -> "WorkspaceResponseModel":
        """Update a workspace.

        Args:
            name_id_or_prefix: Name, ID or prefix of the workspace to update.
            new_name: New name of the workspace.
            new_description: New description of the workspace.

        Returns:
            The updated workspace.
        """
        workspace = self.get_workspace(
            name_id_or_prefix=name_id_or_prefix, allow_name_prefix_match=False
        )
        workspace_update = WorkspaceUpdateModel(
            name=new_name or workspace.name
        )
        if new_description:
            workspace_update.description = new_description
        return self.zen_store.update_workspace(
            workspace_id=workspace.id,
            workspace_update=workspace_update,
        )

    def delete_workspace(self, name_id_or_prefix: str) -> None:
        """Delete a workspace.

        Args:
            name_id_or_prefix: The name or ID of the workspace to delete.

        Raises:
            IllegalOperationError: If the workspace to delete is the active
                workspace.
        """
        workspace = self.get_workspace(
            name_id_or_prefix, allow_name_prefix_match=False
        )
        if self.active_workspace.id == workspace.id:
            raise IllegalOperationError(
                f"Workspace '{name_id_or_prefix}' cannot be deleted since "
                "it is currently active. Please set another workspace as "
                "active first."
            )
        self.zen_store.delete_workspace(workspace_name_or_id=workspace.id)

    # ------ #
    # STACKS #
    # ------ #
    @property
    def active_stack_model(self) -> "StackResponseModel":
        """The model of the active stack for this client.

        If no active stack is configured locally for the client, the active
        stack in the global configuration is used instead.

        Returns:
            The model of the active stack for this client.

        Raises:
            RuntimeError: If the active stack is not set.
        """
        stack: Optional["StackResponseModel"] = None

        if ENV_ZENML_ACTIVE_STACK_ID in os.environ:
            stack_id = os.environ[ENV_ZENML_ACTIVE_STACK_ID]
            return self.get_stack(stack_id)

        if self._config:
            stack = self.get_stack(self._config.active_stack_id)

        if not stack:
            stack = self.get_stack(GlobalConfiguration().get_active_stack_id())

        if not stack:
            raise RuntimeError(
                "No active stack is configured. Run "
                "`zenml stack set STACK_NAME` to set the active stack."
            )

        return stack

    @property
    def active_stack(self) -> "Stack":
        """The active stack for this client.

        Returns:
            The active stack for this client.
        """
        from zenml.stack.stack import Stack

        return Stack.from_model(self.active_stack_model)

    def get_stack(
        self,
        name_id_or_prefix: Optional[Union[UUID, str]] = None,
        allow_name_prefix_match: bool = True,
    ) -> "StackResponseModel":
        """Get a stack by name, ID or prefix.

        If no name, ID or prefix is provided, the active stack is returned.

        Args:
            name_id_or_prefix: The name, ID or prefix of the stack.
            allow_name_prefix_match: If True, allow matching by name prefix.

        Returns:
            The stack.
        """
        if name_id_or_prefix is not None:
            return self._get_entity_by_id_or_name_or_prefix(
                get_method=self.zen_store.get_stack,
                list_method=self.list_stacks,
                name_id_or_prefix=name_id_or_prefix,
                allow_name_prefix_match=allow_name_prefix_match,
            )
        else:
            return self.active_stack_model

    def create_stack(
        self,
        name: str,
        components: Mapping[StackComponentType, Union[str, UUID]],
        is_shared: bool = False,
    ) -> "StackResponseModel":
        """Registers a stack and its components.

        Args:
            name: The name of the stack to register.
            components: dictionary which maps component types to component names
            is_shared: boolean to decide whether the stack is shared

        Returns:
            The model of the registered stack.

        Raises:
            ValueError: If the stack contains private components and is
                attempted to be registered as shared.
        """
        stack_components = dict()

        for c_type, c_identifier in components.items():
            # Skip non-existent components.
            if not c_identifier:
                continue

            # Get the component.
            component = self.get_stack_component(
                name_id_or_prefix=c_identifier,
                component_type=c_type,
            )
            stack_components[c_type] = [component.id]

            # Raise an error if private components are used in a shared stack.
            if is_shared and not component.is_shared:
                raise ValueError(
                    f"You attempted to include the private {c_type} "
                    f"'{component.name}' in a shared stack. This is not "
                    f"supported. You can either share the {c_type} with the "
                    f"following command:\n"
                    f"`zenml {c_type.replace('_', '-')} share`{component.id}`\n"
                    f"or create the stack privately and then share it and all "
                    f"of its components using:\n`zenml stack share {name} -r`"
                )

        stack = StackRequestModel(
            name=name,
            components=stack_components,
            is_shared=is_shared,
            workspace=self.active_workspace.id,
            user=self.active_user.id,
        )

        self._validate_stack_configuration(stack=stack)

        return self.zen_store.create_stack(stack=stack)

    def update_stack(
        self,
        name_id_or_prefix: Optional[Union[UUID, str]] = None,
        name: Optional[str] = None,
        is_shared: Optional[bool] = None,
        description: Optional[str] = None,
        component_updates: Optional[
            Dict[StackComponentType, List[Union[UUID, str]]]
        ] = None,
    ) -> "StackResponseModel":
        """Updates a stack and its components.

        Args:
            name_id_or_prefix: The name, id or prefix of the stack to update.
            name: the new name of the stack.
            is_shared: the new shared status of the stack.
            description: the new description of the stack.
            component_updates: dictionary which maps stack component types to
                lists of new stack component names or ids.

        Returns:
            The model of the updated stack.

        Raises:
            ValueError: If the stack contains private components and is
                attempted to be shared.
            EntityExistsError: If the stack name is already taken.
        """
        # First, get the stack
        stack = self.get_stack(
            name_id_or_prefix=name_id_or_prefix, allow_name_prefix_match=False
        )

        # Create the update model
        update_model = StackUpdateModel(  # type: ignore[call-arg]
            workspace=self.active_workspace.id,
            user=self.active_user.id,
        )

        if name:
            shared_status = is_shared or stack.is_shared

            existing_stacks = self.list_stacks(
                name=name, is_shared=shared_status
            )
            if existing_stacks:
                raise EntityExistsError(
                    "There are already existing stacks with the name "
                    f"'{name}'."
                )

            update_model.name = name

        if is_shared:
            current_name = update_model.name or stack.name
            existing_stacks = self.list_stacks(
                name=current_name, is_shared=True
            )
            if existing_stacks:
                raise EntityExistsError(
                    "There are already existing shared stacks with the name "
                    f"'{current_name}'."
                )

            for component_type, components in stack.components.items():
                for c in components:
                    if not c.is_shared:
                        raise ValueError(
                            f"A Stack can only be shared when all its "
                            f"components are also shared. Component "
                            f"'{component_type}:{c.name}' is not shared. Set "
                            f"the {component_type} to shared like this and "
                            f"then try re-sharing your stack:\n "
                            f"`zenml {component_type.replace('_', '-')} "
                            f"share {c.id}`\nAlternatively, you can rerun "
                            f"your command with `-r` to recursively "
                            f"share all components within the stack."
                        )

            update_model.is_shared = is_shared

        if description:
            update_model.description = description

        # Get the current components
        if component_updates:
            components_dict = {}
            for component_type, component_list in stack.components.items():
                components_dict[component_type] = [
                    c.id for c in component_list
                ]

            for component_type, component_id_list in component_updates.items():
                if component_id_list is not None:
                    components_dict[component_type] = [
                        self.get_stack_component(
                            name_id_or_prefix=c,
                            component_type=component_type,
                        ).id
                        for c in component_id_list
                    ]

            update_model.components = components_dict

        return self.zen_store.update_stack(
            stack_id=stack.id,
            stack_update=update_model,
        )

    def delete_stack(
        self, name_id_or_prefix: Union[str, UUID], recursive: bool = False
    ) -> None:
        """Deregisters a stack.

        Args:
            name_id_or_prefix: The name, id or prefix id of the stack
                to deregister.
            recursive: If `True`, all components of the stack which are not
                associated with any other stack will also be deleted.

        Raises:
            ValueError: If the stack is the currently active stack for this
                client.
        """
        stack = self.get_stack(
            name_id_or_prefix=name_id_or_prefix, allow_name_prefix_match=False
        )

        if stack.id == self.active_stack_model.id:
            raise ValueError(
                f"Unable to deregister active stack '{stack.name}'. Make "
                f"sure to designate a new active stack before deleting this "
                f"one."
            )

        cfg = GlobalConfiguration()
        if stack.id == cfg.active_stack_id:
            raise ValueError(
                f"Unable to deregister '{stack.name}' as it is the active "
                f"stack within your global configuration. Make "
                f"sure to designate a new active stack before deleting this "
                f"one."
            )

        if recursive:
            stack_components_free_for_deletion = []

            # Get all stack components associated with this stack
            for component_type, component_model in stack.components.items():
                # Get stack associated with the stack component

                stacks = self.list_stacks(
                    component_id=component_model[0].id, size=2, page=1
                )

                # Check if the stack component is part of another stack
                if len(stacks) == 1:
                    if stack.id == stacks[0].id:
                        stack_components_free_for_deletion.append(
                            (component_type, component_model)
                        )

            self.delete_stack(stack.id)

            for (
                stack_component_type,
                stack_component_model,
            ) in stack_components_free_for_deletion:
                self.delete_stack_component(
                    stack_component_model[0].name, stack_component_type
                )

            logger.info("Deregistered stack with name '%s'.", stack.name)
            return

        self.zen_store.delete_stack(stack_id=stack.id)
        logger.info("Deregistered stack with name '%s'.", stack.name)

    def list_stacks(
        self,
        sort_by: str = "created",
        page: int = PAGINATION_STARTING_PAGE,
        size: int = PAGE_SIZE_DEFAULT,
        logical_operator: LogicalOperators = LogicalOperators.AND,
        id: Optional[Union[UUID, str]] = None,
        created: Optional[datetime] = None,
        updated: Optional[datetime] = None,
        is_shared: Optional[bool] = None,
        name: Optional[str] = None,
        description: Optional[str] = None,
        workspace_id: Optional[Union[str, UUID]] = None,
        user_id: Optional[Union[str, UUID]] = None,
        component_id: Optional[Union[str, UUID]] = None,
    ) -> Page[StackResponseModel]:
        """Lists all stacks.

        Args:
            sort_by: The column to sort by
            page: The page of items
            size: The maximum size of all pages
            logical_operator: Which logical operator to use [and, or]
            id: Use the id of stacks to filter by.
            created: Use to filter by time of creation
            updated: Use the last updated date for filtering
            description: Use the stack description for filtering
            workspace_id: The id of the workspace to filter by.
            user_id: The  id of the user to filter by.
            component_id: The id of the component to filter by.
            name: The name of the stack to filter by.
            is_shared: The shared status of the stack to filter by.

        Returns:
            A page of stacks.
        """
        stack_filter_model = StackFilterModel(
            page=page,
            size=size,
            sort_by=sort_by,
            logical_operator=logical_operator,
            workspace_id=workspace_id,
            user_id=user_id,
            component_id=component_id,
            name=name,
            is_shared=is_shared,
            description=description,
            id=id,
            created=created,
            updated=updated,
        )
        stack_filter_model.set_scope_workspace(self.active_workspace.id)
        return self.zen_store.list_stacks(stack_filter_model)

    @track(event=AnalyticsEvent.SET_STACK)
    def activate_stack(
        self, stack_name_id_or_prefix: Union[str, UUID]
    ) -> None:
        """Sets the stack as active.

        Args:
            stack_name_id_or_prefix: Model of the stack to activate.

        Raises:
            KeyError: If the stack is not registered.
        """
        # Make sure the stack is registered
        try:
            stack = self.get_stack(name_id_or_prefix=stack_name_id_or_prefix)
        except KeyError:
            raise KeyError(
                f"Stack '{stack_name_id_or_prefix}' cannot be activated since "
                f"it is not registered yet. Please register it first."
            )

        if self._config:
            self._config.set_active_stack(stack=stack)

        else:
            # set the active stack globally only if the client doesn't use
            # a local configuration
            GlobalConfiguration().set_active_stack(stack=stack)

    def _validate_stack_configuration(
        self, stack: "StackRequestModel"
    ) -> None:
        """Validates the configuration of a stack.

        Args:
            stack: The stack to validate.

        Raises:
            KeyError: If the stack references missing components.
            ValidationError: If the stack configuration is invalid.
        """
        local_components: List[str] = []
        remote_components: List[str] = []
        assert stack.components is not None
        for component_type, component_ids in stack.components.items():
            for component_id in component_ids:
                try:
                    component = self.get_stack_component(
                        name_id_or_prefix=component_id,
                        component_type=component_type,
                    )
                except KeyError:
                    raise KeyError(
                        f"Cannot register stack '{stack.name}' since it has an "
                        f"unregistered {component_type} with id "
                        f"'{component_id}'."
                    )
            # Get the flavor model
            flavor_model = self.get_flavor_by_name_and_type(
                name=component.flavor, component_type=component.type
            )

            # Create and validate the configuration
            from zenml.stack import Flavor

            flavor = Flavor.from_model(flavor_model)
            configuration = flavor.config_class(**component.configuration)
            if configuration.is_local:
                local_components.append(
                    f"{component.type.value}: {component.name}"
                )
            elif configuration.is_remote:
                remote_components.append(
                    f"{component.type.value}: {component.name}"
                )

        if local_components and remote_components:
            logger.warning(
                f"You are configuring a stack that is composed of components "
                f"that are relying on local resources "
                f"({', '.join(local_components)}) as well as "
                f"components that are running remotely "
                f"({', '.join(remote_components)}). This is not recommended as "
                f"it can lead to unexpected behavior, especially if the remote "
                f"components need to access the local resources. Please make "
                f"sure that your stack is configured correctly, or try to use "
                f"component flavors or configurations that do not require "
                f"local resources."
            )

        if not stack.is_valid:
            raise ValidationError(
                "Stack configuration is invalid. A valid"
                "stack must contain an Artifact Store and "
                "an Orchestrator."
            )

    # .------------.
    # | COMPONENTS |
    # '------------'
    def get_stack_component(
        self,
        component_type: StackComponentType,
        name_id_or_prefix: Optional[Union[str, UUID]] = None,
        allow_name_prefix_match: bool = True,
    ) -> "ComponentResponseModel":
        """Fetches a registered stack component.

        If the name_id_or_prefix is provided, it will try to fetch the component
        with the corresponding identifier. If not, it will try to fetch the
        active component of the given type.

        Args:
            component_type: The type of the component to fetch
            name_id_or_prefix: The id of the component to fetch.
            allow_name_prefix_match: If True, allow matching by name prefix.

        Returns:
            The registered stack component.

        Raises:
            KeyError: If no name_id_or_prefix is provided and no such component
                is part of the active stack.
        """
        # If no `name_id_or_prefix` provided, try to get the active component.
        if not name_id_or_prefix:
            components = self.active_stack_model.components.get(
                component_type, None
            )
            if components:
                return components[0]
            raise KeyError(
                "No name_id_or_prefix provided and there is no active "
                f"{component_type} in the current active stack."
            )

        # Else, try to fetch the component with an explicit type filter
        def type_scoped_list_method(
            **kwargs: Any,
        ) -> Page[ComponentResponseModel]:
            """Call `zen_store.list_stack_components` with type scoping.

            Args:
                **kwargs: Keyword arguments to pass to `ComponentFilterModel`.

            Returns:
                The type-scoped list of components.
            """
            component_filter_model = ComponentFilterModel(**kwargs)
            component_filter_model.set_scope_type(
                component_type=component_type
            )
            component_filter_model.set_scope_workspace(
                self.active_workspace.id
            )
            return self.zen_store.list_stack_components(
                component_filter_model=component_filter_model,
            )

        return self._get_entity_by_id_or_name_or_prefix(
            get_method=self.zen_store.get_stack_component,
            list_method=type_scoped_list_method,
            name_id_or_prefix=name_id_or_prefix,
            allow_name_prefix_match=allow_name_prefix_match,
        )

    def list_stack_components(
        self,
        sort_by: str = "created",
        page: int = PAGINATION_STARTING_PAGE,
        size: int = PAGE_SIZE_DEFAULT,
        logical_operator: LogicalOperators = LogicalOperators.AND,
        id: Optional[Union[UUID, str]] = None,
        created: Optional[datetime] = None,
        updated: Optional[datetime] = None,
        is_shared: Optional[bool] = None,
        name: Optional[str] = None,
        flavor: Optional[str] = None,
        type: Optional[str] = None,
        workspace_id: Optional[Union[str, UUID]] = None,
        user_id: Optional[Union[str, UUID]] = None,
        connector_id: Optional[Union[str, UUID]] = None,
    ) -> Page[ComponentResponseModel]:
        """Lists all registered stack components.

        Args:
            sort_by: The column to sort by
            page: The page of items
            size: The maximum size of all pages
            logical_operator: Which logical operator to use [and, or]
            id: Use the id of component to filter by.
            created: Use to component by time of creation
            updated: Use the last updated date for filtering
            flavor: Use the component flavor for filtering
            type: Use the component type for filtering
            workspace_id: The id of the workspace to filter by.
            user_id: The id of the user to filter by.
            connector_id: The id of the connector to filter by.
            name: The name of the component to filter by.
            is_shared: The shared status of the component to filter by.

        Returns:
            A page of stack components.
        """
        component_filter_model = ComponentFilterModel(
            page=page,
            size=size,
            sort_by=sort_by,
            logical_operator=logical_operator,
            workspace_id=workspace_id or self.active_workspace.id,
            user_id=user_id,
            connector_id=connector_id,
            name=name,
            is_shared=is_shared,
            flavor=flavor,
            type=type,
            id=id,
            created=created,
            updated=updated,
        )
        component_filter_model.set_scope_workspace(self.active_workspace.id)

        return self.zen_store.list_stack_components(
            component_filter_model=component_filter_model
        )

    def create_stack_component(
        self,
        name: str,
        flavor: str,
        component_type: StackComponentType,
        configuration: Dict[str, str],
        labels: Optional[Dict[str, Any]] = None,
        is_shared: bool = False,
    ) -> "ComponentResponseModel":
        """Registers a stack component.

        Args:
            name: The name of the stack component.
            flavor: The flavor of the stack component.
            component_type: The type of the stack component.
            configuration: The configuration of the stack component.
            labels: The labels of the stack component.
            is_shared: Whether the stack component is shared or not.

        Returns:
            The model of the registered component.
        """
        # Get the flavor model
        flavor_model = self.get_flavor_by_name_and_type(
            name=flavor,
            component_type=component_type,
        )

        # Create and validate the configuration
        from zenml.stack import Flavor

        flavor_class = Flavor.from_model(flavor_model)
        configuration_obj = flavor_class.config_class(
            warn_about_plain_text_secrets=True, **configuration
        )

        self._validate_stack_component_configuration(
            component_type, configuration=configuration_obj
        )

        create_component_model = ComponentRequestModel(
            name=name,
            type=component_type,
            flavor=flavor,
            configuration=configuration,
            is_shared=is_shared,
            user=self.active_user.id,
            workspace=self.active_workspace.id,
            labels=labels,
        )

        # Register the new model
        return self.zen_store.create_stack_component(
            component=create_component_model
        )

    def update_stack_component(
        self,
        name_id_or_prefix: Optional[Union[UUID, str]],
        component_type: StackComponentType,
        name: Optional[str] = None,
        configuration: Optional[Dict[str, Any]] = None,
        labels: Optional[Dict[str, Any]] = None,
        is_shared: Optional[bool] = None,
        connector_id: Optional[UUID] = None,
        connector_resource_id: Optional[str] = None,
    ) -> "ComponentResponseModel":
        """Updates a stack component.

        Args:
            name_id_or_prefix: The name, id or prefix of the stack component to
                update.
            component_type: The type of the stack component to update.
            name: The new name of the stack component.
            configuration: The new configuration of the stack component.
            labels: The new labels of the stack component.
            is_shared: The new shared status of the stack component.
            connector_id: The new connector id of the stack component.
            connector_resource_id: The new connector resource id of the
                stack component.

        Returns:
            The updated stack component.

        Raises:
            EntityExistsError: If the new name is already taken.
        """
        # Get the existing component model
        component = self.get_stack_component(
            name_id_or_prefix=name_id_or_prefix,
            component_type=component_type,
            allow_name_prefix_match=False,
        )

        update_model = ComponentUpdateModel(  # type: ignore[call-arg]
            workspace=self.active_workspace.id,
            user=self.active_user.id,
        )

        if name is not None:
            shared_status = is_shared or component.is_shared

            existing_components = self.list_stack_components(
                name=name,
                is_shared=shared_status,
                type=component_type,
            )
            if existing_components.total > 0:
                raise EntityExistsError(
                    f"There are already existing "
                    f"{'shared' if shared_status else 'unshared'} components "
                    f"with the name '{name}'."
                )
            update_model.name = name

        if is_shared is not None:
            current_name = update_model.name or component.name
            existing_components = self.list_stack_components(
                name=current_name, is_shared=is_shared, type=component_type
            )
            if any([e.id != component.id for e in existing_components.items]):
                raise EntityExistsError(
                    f"There are already existing shared components with "
                    f"the name '{current_name}'"
                )
            update_model.is_shared = is_shared

        if configuration is not None:
            existing_configuration = component.configuration
            existing_configuration.update(configuration)

            existing_configuration = {
                k: v
                for k, v in existing_configuration.items()
                if v is not None
            }

            flavor_model = self.get_flavor_by_name_and_type(
                name=component.flavor,
                component_type=component.type,
            )

            from zenml.stack import Flavor

            flavor = Flavor.from_model(flavor_model)
            configuration_obj = flavor.config_class(**existing_configuration)

            self._validate_stack_component_configuration(
                component.type, configuration=configuration_obj
            )
            update_model.configuration = existing_configuration

        if labels is not None:
            existing_labels = component.labels or {}
            existing_labels.update(labels)

            existing_labels = {
                k: v for k, v in existing_labels.items() if v is not None
            }
            update_model.labels = existing_labels

        if connector_id is not None:
            update_model.connector = connector_id
        if connector_resource_id is not None:
            update_model.connector_resource_id = connector_resource_id

        # Send the updated component to the ZenStore
        return self.zen_store.update_stack_component(
            component_id=component.id,
            component_update=update_model,
        )

    def delete_stack_component(
        self,
        name_id_or_prefix: Union[str, UUID],
        component_type: StackComponentType,
    ) -> None:
        """Deletes a registered stack component.

        Args:
            name_id_or_prefix: The model of the component to delete.
            component_type: The type of the component to delete.
        """
        component = self.get_stack_component(
            name_id_or_prefix=name_id_or_prefix,
            component_type=component_type,
            allow_name_prefix_match=False,
        )

        self.zen_store.delete_stack_component(component_id=component.id)
        logger.info(
            "Deregistered stack component (type: %s) with name '%s'.",
            component.type,
            component.name,
        )

    def deploy_stack_component(
        self,
        name: str,
        flavor: str,
        cloud: str,
        component_type: StackComponentType,
        configuration: Optional[Dict[str, Any]] = {},
        labels: Optional[Dict[str, Any]] = None,
    ) -> Optional["ComponentResponseModel"]:
        """Deploys a stack component.

        Args:
            name: The name of the deployed stack component.
            flavor: The flavor of the deployed stack component.
            cloud: The cloud of the deployed stack component.
            component_type: The type of the stack component to deploy.
            configuration: The configuration of the deployed stack component.
            labels: The labels of the deployed stack component.

        Returns:
            The deployed stack component.
        """
        STACK_COMPONENT_RECIPE_DIR = "deployed_stack_components"

        if component_type.value not in [
            "artifact_store",
            "container_registry",
            "secrets_manager",
        ]:
            enabled_services = [f"{component_type.value}_{flavor}"]
        else:
            enabled_services = [f"{component_type.value}"]

        # path should be fixed at a constant in the
        # global config directory
        path = Path(
            os.path.join(
                io_utils.get_global_config_directory(),
                STACK_COMPONENT_RECIPE_DIR,
                f"{cloud}-modular",
            )
        )

        with event_handler(
            event=AnalyticsEvent.DEPLOY_STACK_COMPONENT,
            v2=True,
        ) as handler:
            handler.metadata.update({component_type.value: flavor})

            import python_terraform

            from zenml.recipes import (
                StackRecipeService,
                StackRecipeServiceConfig,
            )

            # create the stack recipe service.
            stack_recipe_service_config = StackRecipeServiceConfig(
                directory_path=str(path),
                enabled_services=enabled_services,
                input_variables=configuration,
            )

            stack_recipe_service = StackRecipeService.get_service(str(path))

            if stack_recipe_service:
                logger.info(
                    "An existing deployment of the recipe found. "
                    f"with path {path}. "
                    "Proceeding to update or create resources. "
                )
            else:
                stack_recipe_service = StackRecipeService(
                    config=stack_recipe_service_config,
                    stack_recipe_name=f"{cloud}-modular",
                )

            try:
                # start the service (the init and apply operation)
                stack_recipe_service.start()

            except python_terraform.TerraformCommandError:
                logger.error(
                    "Deployment of the stack component failed or was "
                    "interrupted. "
                )
                return None

            # get the outputs from the deployed recipe
            outputs = stack_recipe_service.get_outputs()
            outputs = {k: v for k, v in outputs.items() if v != ""}

            # get all outputs that start with the component type into a map
            comp_outputs = {
                k: v
                for k, v in outputs.items()
                if k.startswith(component_type.value)
            }

            logger.info(
                "Registering a new stack component of type %s with name '%s'.",
                component_type,
                name or comp_outputs[f"{component_type.value}_name"],
            )

            # call the register stack component function using the values of the outputs
            # truncate the component type from the output
            stack_comp = self.create_stack_component(
                name=name or comp_outputs[f"{component_type.value}_name"],
                flavor=comp_outputs[f"{component_type.value}_flavor"],
                component_type=component_type,
                configuration=eval(
                    comp_outputs[f"{component_type.value}_configuration"]
                ),
                labels=labels,
            )

            # if the component is an experiment tracker of flavor mlflow, then
            # output the name of the mlflow bucket if it exists
            if (
                component_type == StackComponentType.EXPERIMENT_TRACKER
                and flavor == "mlflow"
            ):
                mlflow_bucket = outputs.get("mlflow-bucket")
                if mlflow_bucket:
                    logger.info(
                        "The bucket used for MLflow is: %s "
                        "You can use this bucket as an artifact store to "
                        "avoid having to create a new one.",
                        mlflow_bucket,
                    )

            # if the cloud is k3d, then check the container registry
            # outputs. If they are set, then create one.
            if cloud == "k3d":
                container_registry_outputs = {
                    k: v
                    for k, v in outputs.items()
                    if k.startswith("container_registry")
                }
                if container_registry_outputs:
                    self.create_stack_component(
                        name=container_registry_outputs[
                            "container_registry_name"
                        ],
                        flavor=container_registry_outputs[
                            "container_registry_flavor"
                        ],
                        component_type=StackComponentType.CONTAINER_REGISTRY,
                        configuration=eval(
                            container_registry_outputs[
                                "container_registry_configuration"
                            ]
                        ),
                    )

        return stack_comp

    def destroy_stack_component(
        self,
        component: ComponentResponseModel,
    ) -> None:
        """Destroys a stack component.

        Args:
            component: The stack component to destroy.

        Returns:
            None
        """
        STACK_COMPONENT_RECIPE_DIR = "deployed_stack_components"

        if component.type.value not in [
            "artifact_store",
            "container_registry",
            "secrets_manager",
        ]:
            disabled_services = [f"{component.type.value}_{component.flavor}"]
        else:
            disabled_services = [f"{component.type.value}"]

        # assert that labels is not None
        assert component.labels is not None
        # path should be fixed at a constant in the
        # global config directory
        path = Path(
            os.path.join(
                io_utils.get_global_config_directory(),
                STACK_COMPONENT_RECIPE_DIR,
                f"{component.labels['cloud']}-modular",
            )
        )

        with event_handler(
            event=AnalyticsEvent.DESTROY_STACK_COMPONENT,
            v2=True,
        ) as handler:
            handler.metadata.update({component.type.value: component.flavor})

            import python_terraform

            from zenml.recipes import (
                StackRecipeService,
            )

            stack_recipe_service = StackRecipeService.get_service(str(path))

            if not stack_recipe_service:
                logger.error(
                    f"No deployed {component.type.value} found with "
                    f"flavor {component.flavor} and name {component.name}."
                )
                return None

            stack_recipe_service.config.disabled_services = disabled_services

            try:
                # start the service (the init and apply operation)
                stack_recipe_service.stop()

            except python_terraform.TerraformCommandError:
                logger.error(
                    "Destruction of the stack component failed or was "
                    "interrupted. "
                )
                return None

        logger.info(
            "Deregistering stack component %s...",
            component.name,
        )

        # call the delete stack component function
        self.delete_stack_component(
            name_id_or_prefix=component.name,
            component_type=component.type,
        )

    def _validate_stack_component_configuration(
        self,
        component_type: "StackComponentType",
        configuration: "StackComponentConfig",
    ) -> None:
        """Validates the configuration of a stack component.

        Args:
            component_type: The type of the component.
            configuration: The component configuration to validate.

        Raises:
            StackComponentValidationError: in case the stack component configuration is invalid.
        """
        from zenml.enums import StoreType

        if configuration.is_remote and self.zen_store.is_local_store():
            if self.zen_store.type != StoreType.REST:
                logger.warning(
                    "You are configuring a stack component that is running "
                    "remotely while using a local ZenML server. The component "
                    "may not be able to reach the local ZenML server and will "
                    "therefore not be functional. Please consider deploying "
                    "and/or using a remote ZenML server instead."
                )
        elif configuration.is_local and not self.zen_store.is_local_store():
            logger.warning(
                "You are configuring a stack component that is using "
                "local resources while connected to a remote ZenML server. The "
                "stack component may not be usable from other hosts or by "
                "other users. You should consider using a non-local stack "
                "component alternative instead."
            )
        if not configuration.is_valid:
            raise StackComponentValidationError(
                f"Invalid stack component configuration. please verify "
                f"the configurations set for {component_type}."
            )

    # .---------.
    # | FLAVORS |
    # '---------'

    def create_flavor(
        self,
        source: str,
        component_type: StackComponentType,
    ) -> "FlavorResponseModel":
        """Creates a new flavor.

        Args:
            source: The flavor to create.
            component_type: The type of the flavor.

        Returns:
            The created flavor (in model form).

        Raises:
            ValueError: in case the config_schema of the flavor is too large.
        """
        from zenml.stack.flavor import validate_flavor_source

        flavor = validate_flavor_source(
            source=source, component_type=component_type
        )()

        if len(flavor.config_schema) > TEXT_FIELD_MAX_LENGTH:
            raise ValueError(
                "Json representation of configuration schema"
                "exceeds max length. This could be caused by an"
                "overly long docstring on the flavors "
                "configuration class' docstring."
            )

        create_flavor_request = FlavorRequestModel(
            source=source,
            type=flavor.type,
            name=flavor.name,
            config_schema=flavor.config_schema,
            integration="custom",
            user=self.active_user.id,
            workspace=self.active_workspace.id,
        )

        return self.zen_store.create_flavor(flavor=create_flavor_request)

    def get_flavor(
        self,
        name_id_or_prefix: str,
        allow_name_prefix_match: bool = True,
    ) -> "FlavorResponseModel":
        """Get a stack component flavor.

        Args:
            name_id_or_prefix: The name, ID or prefix to the id of the flavor
                to get.
            allow_name_prefix_match: If True, allow matching by name prefix.

        Returns:
            The stack component flavor.
        """
        return self._get_entity_by_id_or_name_or_prefix(
            get_method=self.zen_store.get_flavor,
            list_method=self.list_flavors,
            name_id_or_prefix=name_id_or_prefix,
            allow_name_prefix_match=allow_name_prefix_match,
        )

    def delete_flavor(self, name_id_or_prefix: str) -> None:
        """Deletes a flavor.

        Args:
            name_id_or_prefix: The name, id or prefix of the id for the
                flavor to delete.
        """
        flavor = self.get_flavor(
            name_id_or_prefix, allow_name_prefix_match=False
        )
        self.zen_store.delete_flavor(flavor_id=flavor.id)

        logger.info(f"Deleted flavor '{flavor.name}' of type '{flavor.type}'.")

    def list_flavors(
        self,
        sort_by: str = "created",
        page: int = PAGINATION_STARTING_PAGE,
        size: int = PAGE_SIZE_DEFAULT,
        logical_operator: LogicalOperators = LogicalOperators.AND,
        id: Optional[Union[UUID, str]] = None,
        created: Optional[datetime] = None,
        updated: Optional[datetime] = None,
        name: Optional[str] = None,
        type: Optional[str] = None,
        integration: Optional[str] = None,
        user_id: Optional[Union[str, UUID]] = None,
    ) -> Page[FlavorResponseModel]:
        """Fetches all the flavor models.

        Args:
            sort_by: The column to sort by
            page: The page of items
            size: The maximum size of all pages
            logical_operator: Which logical operator to use [and, or]
            id: Use the id of flavors to filter by.
            created: Use to flavors by time of creation
            updated: Use the last updated date for filtering
            user_id: The  id of the user to filter by.
            name: The name of the flavor to filter by.
            type: The type of the flavor to filter by.
            integration: The integration of the flavor to filter by.

        Returns:
            A list of all the flavor models.
        """
        flavor_filter_model = FlavorFilterModel(
            page=page,
            size=size,
            sort_by=sort_by,
            logical_operator=logical_operator,
            user_id=user_id,
            name=name,
            type=type,
            integration=integration,
            id=id,
            created=created,
            updated=updated,
        )
        flavor_filter_model.set_scope_workspace(self.active_workspace.id)
        return self.zen_store.list_flavors(
            flavor_filter_model=flavor_filter_model
        )

    def get_flavors_by_type(
        self, component_type: "StackComponentType"
    ) -> Page[FlavorResponseModel]:
        """Fetches the list of flavor for a stack component type.

        Args:
            component_type: The type of the component to fetch.

        Returns:
            The list of flavors.
        """
        logger.debug(f"Fetching the flavors of type {component_type}.")

        return self.list_flavors(
            type=component_type,
        )

    def get_flavor_by_name_and_type(
        self, name: str, component_type: "StackComponentType"
    ) -> "FlavorResponseModel":
        """Fetches a registered flavor.

        Args:
            component_type: The type of the component to fetch.
            name: The name of the flavor to fetch.

        Returns:
            The registered flavor.

        Raises:
            KeyError: If no flavor exists for the given type and name.
        """
        logger.debug(
            f"Fetching the flavor of type {component_type} with name {name}."
        )

        flavors = self.list_flavors(
            type=component_type,
            name=name,
        ).items

        if flavors:
            if len(flavors) > 1:
                raise KeyError(
                    f"More than one flavor with name {name} and type "
                    f"{component_type} exists."
                )

            return flavors[0]
        else:
            raise KeyError(
                f"No flavor with name '{name}' and type '{component_type}' "
                "exists."
            )

    # -------------
    # - PIPELINES -
    # -------------

    def list_pipelines(
        self,
        sort_by: str = "created",
        page: int = PAGINATION_STARTING_PAGE,
        size: int = PAGE_SIZE_DEFAULT,
        logical_operator: LogicalOperators = LogicalOperators.AND,
        id: Optional[Union[UUID, str]] = None,
        created: Optional[Union[datetime, str]] = None,
        updated: Optional[Union[datetime, str]] = None,
        name: Optional[str] = None,
        version: Optional[str] = None,
        version_hash: Optional[str] = None,
        docstring: Optional[str] = None,
        workspace_id: Optional[Union[str, UUID]] = None,
        user_id: Optional[Union[str, UUID]] = None,
    ) -> Page[PipelineResponseModel]:
        """List all pipelines.

        Args:
            sort_by: The column to sort by
            page: The page of items
            size: The maximum size of all pages
            logical_operator: Which logical operator to use [and, or]
            id: Use the id of pipeline to filter by.
            created: Use to filter by time of creation
            updated: Use the last updated date for filtering
            name: The name of the pipeline to filter by.
            version: The version of the pipeline to filter by.
            version_hash: The version hash of the pipeline to filter by.
            docstring: The docstring of the pipeline to filter by.
            workspace_id: The id of the workspace to filter by.
            user_id: The id of the user to filter by.

        Returns:
            A page with Pipeline fitting the filter description
        """
        pipeline_filter_model = PipelineFilterModel(
            sort_by=sort_by,
            page=page,
            size=size,
            logical_operator=logical_operator,
            id=id,
            created=created,
            updated=updated,
            name=name,
            version=version,
            version_hash=version_hash,
            docstring=docstring,
            workspace_id=workspace_id,
            user_id=user_id,
        )
        pipeline_filter_model.set_scope_workspace(self.active_workspace.id)
        return self.zen_store.list_pipelines(
            pipeline_filter_model=pipeline_filter_model
        )

    def get_pipeline(
        self,
        name_id_or_prefix: Union[str, UUID],
        version: Optional[str] = None,
    ) -> PipelineResponseModel:
        """Get a pipeline by name, id or prefix.

        Args:
            name_id_or_prefix: The name, ID or ID prefix of the pipeline.
            version: The pipeline version. If not specified, the latest
                version is returned.

        Returns:
            The pipeline.

        Raises:
            KeyError: If no pipelines were found for the given ID/name and
                version.
            ZenKeyError: If multiple pipelines match the ID prefix.
        """
        from zenml.utils.uuid_utils import is_valid_uuid

        if is_valid_uuid(name_id_or_prefix):
            if version:
                logger.warning(
                    "You specified both an ID as well as a version of the "
                    "pipeline. Ignoring the version and fetching the "
                    "pipeline by ID."
                )
            if not isinstance(name_id_or_prefix, UUID):
                name_id_or_prefix = UUID(name_id_or_prefix, version=4)

            return self.zen_store.get_pipeline(name_id_or_prefix)

        assert not isinstance(name_id_or_prefix, UUID)
        exact_name_matches = self.list_pipelines(
            size=1,
            sort_by="desc:created",
            name=f"equals:{name_id_or_prefix}",
            version=version,
        )

        if len(exact_name_matches) == 1:
            # If the name matches exactly, use the explicitly specified version
            # or fallback to the latest if not given
            return exact_name_matches.items[0]

        partial_id_matches = self.list_pipelines(
            id=f"startswith:{name_id_or_prefix}"
        )
        if partial_id_matches.total == 1:
            if version:
                logger.warning(
                    "You specified both an ID as well as a version of the "
                    "pipeline. Ignoring the version and fetching the "
                    "pipeline by ID."
                )
            return partial_id_matches[0]
        elif partial_id_matches.total == 0:
            raise KeyError(
                f"No pipelines found for name, ID or prefix "
                f"{name_id_or_prefix}."
            )
        else:
            raise ZenKeyError(
                f"{partial_id_matches.total} pipelines have been found that "
                "have an id prefix that matches the provided string "
                f"'{name_id_or_prefix}':\n"
                f"{partial_id_matches.items}.\n"
                f"Please provide more characters to uniquely identify "
                f"only one of the pipelines."
            )

    def delete_pipeline(
        self,
        name_id_or_prefix: Union[str, UUID],
        version: Optional[str] = None,
    ) -> None:
        """Delete a pipeline.

        Args:
            name_id_or_prefix: The name, ID or ID prefix of the pipeline.
            version: The pipeline version. If left empty, will delete
                the latest version.
        """
        pipeline = self.get_pipeline(
            name_id_or_prefix=name_id_or_prefix, version=version
        )
        self.zen_store.delete_pipeline(pipeline_id=pipeline.id)

    # ----------
    # - BUILDS -
    # ----------

    def list_builds(
        self,
        sort_by: str = "created",
        page: int = PAGINATION_STARTING_PAGE,
        size: int = PAGE_SIZE_DEFAULT,
        logical_operator: LogicalOperators = LogicalOperators.AND,
        id: Optional[Union[UUID, str]] = None,
        created: Optional[Union[datetime, str]] = None,
        updated: Optional[Union[datetime, str]] = None,
        workspace_id: Optional[Union[str, UUID]] = None,
        user_id: Optional[Union[str, UUID]] = None,
        pipeline_id: Optional[Union[str, UUID]] = None,
        stack_id: Optional[Union[str, UUID]] = None,
        is_local: Optional[bool] = None,
        contains_code: Optional[bool] = None,
        zenml_version: Optional[str] = None,
        python_version: Optional[str] = None,
        checksum: Optional[str] = None,
    ) -> Page[PipelineBuildResponseModel]:
        """List all builds.

        Args:
            sort_by: The column to sort by
            page: The page of items
            size: The maximum size of all pages
            logical_operator: Which logical operator to use [and, or]
            id: Use the id of build to filter by.
            created: Use to filter by time of creation
            updated: Use the last updated date for filtering
            workspace_id: The id of the workspace to filter by.
            user_id: The  id of the user to filter by.
            pipeline_id: The id of the pipeline to filter by.
            stack_id: The id of the stack to filter by.
            is_local: Use to filter local builds.
            contains_code: Use to filter builds that contain code.
            zenml_version: The version of ZenML to filter by.
            python_version: The Python version to filter by.
            checksum: The build checksum to filter by.

        Returns:
            A page with builds fitting the filter description
        """
        build_filter_model = PipelineBuildFilterModel(
            sort_by=sort_by,
            page=page,
            size=size,
            logical_operator=logical_operator,
            id=id,
            created=created,
            updated=updated,
            workspace_id=workspace_id,
            user_id=user_id,
            pipeline_id=pipeline_id,
            stack_id=stack_id,
            is_local=is_local,
            contains_code=contains_code,
            zenml_version=zenml_version,
            python_version=python_version,
            checksum=checksum,
        )
        build_filter_model.set_scope_workspace(self.active_workspace.id)
        return self.zen_store.list_builds(
            build_filter_model=build_filter_model
        )

    def get_build(self, id_or_prefix: str) -> PipelineBuildResponseModel:
        """Get a build by id or prefix.

        Args:
            id_or_prefix: The id or id prefix of the build.

        Returns:
            The build.

        Raises:
            KeyError: If no build was found for the given id or prefix.
            ZenKeyError: If multiple builds were found that match the given
                id or prefix.
        """
        from zenml.utils.uuid_utils import is_valid_uuid

        # First interpret as full UUID
        if is_valid_uuid(id_or_prefix):
            return self.zen_store.get_build(UUID(id_or_prefix))

        entity = self.list_builds(
            id=f"startswith:{id_or_prefix}",
        )

        # If only a single entity is found, return it.
        if entity.total == 1:
            return entity.items[0]

        # If no entity is found, raise an error.
        if entity.total == 0:
            raise KeyError(
                f"No builds have been found that have either an id or prefix "
                f"that matches the provided string '{id_or_prefix}'."
            )

        raise ZenKeyError(
            f"{entity.total} builds have been found that have "
            f"an ID that matches the provided "
            f"string '{id_or_prefix}':\n"
            f"{[entity.items]}.\n"
            f"Please use the id to uniquely identify "
            f"only one of the builds."
        )

    def delete_build(self, id_or_prefix: str) -> None:
        """Delete a build.

        Args:
            id_or_prefix: The id or id prefix of the build.
        """
        build = self.get_build(id_or_prefix=id_or_prefix)
        self.zen_store.delete_build(build_id=build.id)

    # ---------------
    # - DEPLOYMENTS -
    # ---------------

    def list_deployments(
        self,
        sort_by: str = "created",
        page: int = PAGINATION_STARTING_PAGE,
        size: int = PAGE_SIZE_DEFAULT,
        logical_operator: LogicalOperators = LogicalOperators.AND,
        id: Optional[Union[UUID, str]] = None,
        created: Optional[Union[datetime, str]] = None,
        updated: Optional[Union[datetime, str]] = None,
        workspace_id: Optional[Union[str, UUID]] = None,
        user_id: Optional[Union[str, UUID]] = None,
        pipeline_id: Optional[Union[str, UUID]] = None,
        stack_id: Optional[Union[str, UUID]] = None,
        build_id: Optional[Union[str, UUID]] = None,
    ) -> Page[PipelineDeploymentResponseModel]:
        """List all deployments.

        Args:
            sort_by: The column to sort by
            page: The page of items
            size: The maximum size of all pages
            logical_operator: Which logical operator to use [and, or]
            id: Use the id of build to filter by.
            created: Use to filter by time of creation
            updated: Use the last updated date for filtering
            workspace_id: The id of the workspace to filter by.
            user_id: The  id of the user to filter by.
            pipeline_id: The id of the pipeline to filter by.
            stack_id: The id of the stack to filter by.
            build_id: The id of the build to filter by.

        Returns:
            A page with deployments fitting the filter description
        """
        deployment_filter_model = PipelineDeploymentFilterModel(
            sort_by=sort_by,
            page=page,
            size=size,
            logical_operator=logical_operator,
            id=id,
            created=created,
            updated=updated,
            workspace_id=workspace_id,
            user_id=user_id,
            pipeline_id=pipeline_id,
            stack_id=stack_id,
            build_id=build_id,
        )
        deployment_filter_model.set_scope_workspace(self.active_workspace.id)
        return self.zen_store.list_deployments(
            deployment_filter_model=deployment_filter_model
        )

    def get_deployment(
        self, id_or_prefix: str
    ) -> PipelineDeploymentResponseModel:
        """Get a deployment by id or prefix.

        Args:
            id_or_prefix: The id or id prefix of the build.

        Returns:
            The deployment.

        Raises:
            KeyError: If no deployment was found for the given id or prefix.
            ZenKeyError: If multiple deployments were found that match the given
                id or prefix.
        """
        from zenml.utils.uuid_utils import is_valid_uuid

        # First interpret as full UUID
        if is_valid_uuid(id_or_prefix):
            return self.zen_store.get_deployment(UUID(id_or_prefix))

        entity = self.list_deployments(
            id=f"startswith:{id_or_prefix}",
        )

        # If only a single entity is found, return it.
        if entity.total == 1:
            return entity.items[0]

        # If no entity is found, raise an error.
        if entity.total == 0:
            raise KeyError(
                f"No deployment have been found that have either an id or "
                f"prefix that matches the provided string '{id_or_prefix}'."
            )

        raise ZenKeyError(
            f"{entity.total} deployments have been found that have "
            f"an ID that matches the provided "
            f"string '{id_or_prefix}':\n"
            f"{[entity.items]}.\n"
            f"Please use the id to uniquely identify "
            f"only one of the deployments."
        )

    def delete_deployment(self, id_or_prefix: str) -> None:
        """Delete a deployment.

        Args:
            id_or_prefix: The id or id prefix of the deployment.
        """
        deployment = self.get_deployment(id_or_prefix=id_or_prefix)
        self.zen_store.delete_deployment(deployment_id=deployment.id)

    # -------------
    # - SCHEDULES -
    # -------------

    def list_schedules(
        self,
        sort_by: str = "created",
        page: int = PAGINATION_STARTING_PAGE,
        size: int = PAGE_SIZE_DEFAULT,
        logical_operator: LogicalOperators = LogicalOperators.AND,
        id: Optional[Union[UUID, str]] = None,
        created: Optional[Union[datetime, str]] = None,
        updated: Optional[Union[datetime, str]] = None,
        name: Optional[str] = None,
        workspace_id: Optional[Union[str, UUID]] = None,
        user_id: Optional[Union[str, UUID]] = None,
        pipeline_id: Optional[Union[str, UUID]] = None,
        orchestrator_id: Optional[Union[str, UUID]] = None,
        active: Optional[Union[str, bool]] = None,
        cron_expression: Optional[str] = None,
        start_time: Optional[Union[datetime, str]] = None,
        end_time: Optional[Union[datetime, str]] = None,
        interval_second: Optional[int] = None,
        catchup: Optional[Union[str, bool]] = None,
    ) -> Page[ScheduleResponseModel]:
        """List schedules.

        Args:
            sort_by: The column to sort by
            page: The page of items
            size: The maximum size of all pages
            logical_operator: Which logical operator to use [and, or]
            id: Use the id of stacks to filter by.
            created: Use to filter by time of creation
            updated: Use the last updated date for filtering
            name: The name of the stack to filter by.
            workspace_id: The id of the workspace to filter by.
            user_id: The  id of the user to filter by.
            pipeline_id: The id of the pipeline to filter by.
            orchestrator_id: The id of the orchestrator to filter by.
            active: Use to filter by active status.
            cron_expression: Use to filter by cron expression.
            start_time: Use to filter by start time.
            end_time: Use to filter by end time.
            interval_second: Use to filter by interval second.
            catchup: Use to filter by catchup.

        Returns:
            A list of schedules.
        """
        schedule_filter_model = ScheduleFilterModel(
            sort_by=sort_by,
            page=page,
            size=size,
            logical_operator=logical_operator,
            id=id,
            created=created,
            updated=updated,
            name=name,
            workspace_id=workspace_id,
            user_id=user_id,
            pipeline_id=pipeline_id,
            orchestrator_id=orchestrator_id,
            active=active,
            cron_expression=cron_expression,
            start_time=start_time,
            end_time=end_time,
            interval_second=interval_second,
            catchup=catchup,
        )
        schedule_filter_model.set_scope_workspace(self.active_workspace.id)
        return self.zen_store.list_schedules(
            schedule_filter_model=schedule_filter_model
        )

    def get_schedule(
        self,
        name_id_or_prefix: Union[str, UUID],
        allow_name_prefix_match: bool = True,
    ) -> ScheduleResponseModel:
        """Get a schedule by name, id or prefix.

        Args:
            name_id_or_prefix: The name, id or prefix of the schedule.
            allow_name_prefix_match: If True, allow matching by name prefix.

        Returns:
            The schedule.
        """
        return self._get_entity_by_id_or_name_or_prefix(
            get_method=self.zen_store.get_schedule,
            list_method=self.list_schedules,
            name_id_or_prefix=name_id_or_prefix,
            allow_name_prefix_match=allow_name_prefix_match,
        )

    def delete_schedule(self, name_id_or_prefix: Union[str, UUID]) -> None:
        """Delete a schedule.

        Args:
            name_id_or_prefix: The name, id or prefix id of the schedule
                to delete.
        """
        schedule = self.get_schedule(
            name_id_or_prefix=name_id_or_prefix, allow_name_prefix_match=False
        )
        logger.warning(
            f"Deleting schedule '{name_id_or_prefix}'... This will only delete "
            "the reference of the schedule from ZenML. Please make sure to "
            "manually stop/delete this schedule in your orchestrator as well!"
        )
        self.zen_store.delete_schedule(schedule_id=schedule.id)

    # -----------------
    # - PIPELINE RUNS -
    # -----------------

    def list_pipeline_runs(
        self,
        sort_by: str = "desc:created",
        page: int = PAGINATION_STARTING_PAGE,
        size: int = PAGE_SIZE_DEFAULT,
        logical_operator: LogicalOperators = LogicalOperators.AND,
        id: Optional[Union[UUID, str]] = None,
        created: Optional[Union[datetime, str]] = None,
        updated: Optional[Union[datetime, str]] = None,
        name: Optional[str] = None,
        workspace_id: Optional[Union[str, UUID]] = None,
        pipeline_id: Optional[Union[str, UUID]] = None,
        user_id: Optional[Union[str, UUID]] = None,
        stack_id: Optional[Union[str, UUID]] = None,
        schedule_id: Optional[Union[str, UUID]] = None,
        build_id: Optional[Union[str, UUID]] = None,
        deployment_id: Optional[Union[str, UUID]] = None,
        code_repository_id: Optional[Union[str, UUID]] = None,
        orchestrator_run_id: Optional[str] = None,
        status: Optional[str] = None,
        start_time: Optional[Union[datetime, str]] = None,
        end_time: Optional[Union[datetime, str]] = None,
        num_steps: Optional[Union[int, str]] = None,
        unlisted: Optional[bool] = None,
    ) -> Page[PipelineRunResponseModel]:
        """List all pipeline runs.

        Args:
            sort_by: The column to sort by
            page: The page of items
            size: The maximum size of all pages
            logical_operator: Which logical operator to use [and, or]
            id: The id of the runs to filter by.
            created: Use to filter by time of creation
            updated: Use the last updated date for filtering
            workspace_id: The id of the workspace to filter by.
            pipeline_id: The id of the pipeline to filter by.
            user_id: The id of the user to filter by.
            stack_id: The id of the stack to filter by.
            schedule_id: The id of the schedule to filter by.
            build_id: The id of the build to filter by.
            deployment_id: The id of the deployment to filter by.
            code_repository_id: The id of the code repository to filter by.
            orchestrator_run_id: The run id of the orchestrator to filter by.
            name: The name of the run to filter by.
            status: The status of the pipeline run
            start_time: The start_time for the pipeline run
            end_time: The end_time for the pipeline run
            num_steps: The number of steps for the pipeline run
            unlisted: If the runs should be unlisted or not.

        Returns:
            A page with Pipeline Runs fitting the filter description
        """
        runs_filter_model = PipelineRunFilterModel(
            sort_by=sort_by,
            page=page,
            size=size,
            logical_operator=logical_operator,
            id=id,
            created=created,
            updated=updated,
            name=name,
            workspace_id=workspace_id,
            pipeline_id=pipeline_id,
            schedule_id=schedule_id,
            build_id=build_id,
            deployment_id=deployment_id,
            code_repository_id=code_repository_id,
            orchestrator_run_id=orchestrator_run_id,
            user_id=user_id,
            stack_id=stack_id,
            status=status,
            start_time=start_time,
            end_time=end_time,
            num_steps=num_steps,
            unlisted=unlisted,
        )
        runs_filter_model.set_scope_workspace(self.active_workspace.id)
        return self.zen_store.list_runs(runs_filter_model=runs_filter_model)

    def list_runs(self, **kwargs: Any) -> Page[PipelineRunResponseModel]:
        """(Deprecated) List all pipeline runs.

        Args:
            **kwargs: The filter arguments passed to `list_pipeline_runs`.

        Returns:
            A page with Pipeline Runs fitting the filter description
        """
        logger.warning(
            "`Client.list_runs()` is deprecated and will be removed in a "
            "future release. Please use `Client.list_pipeline_runs()` instead."
        )
        return self.list_pipeline_runs(**kwargs)

    def get_pipeline_run(
        self,
        name_id_or_prefix: Union[str, UUID],
        allow_name_prefix_match: bool = True,
    ) -> PipelineRunResponseModel:
        """Gets a pipeline run by name, ID, or prefix.

        Args:
            name_id_or_prefix: Name, ID, or prefix of the pipeline run.
            allow_name_prefix_match: If True, allow matching by name prefix.

        Returns:
            The pipeline run.
        """
        return self._get_entity_by_id_or_name_or_prefix(
            get_method=self.zen_store.get_run,
            list_method=self.list_pipeline_runs,
            name_id_or_prefix=name_id_or_prefix,
            allow_name_prefix_match=allow_name_prefix_match,
        )

    def delete_pipeline_run(
        self,
        name_id_or_prefix: Union[str, UUID],
    ) -> None:
        """Deletes a pipeline run.

        Args:
            name_id_or_prefix: Name, ID, or prefix of the pipeline run.
        """
        run = self.get_pipeline_run(
            name_id_or_prefix=name_id_or_prefix, allow_name_prefix_match=False
        )
        self.zen_store.delete_run(run_id=run.id)

    # -------------
    # - STEP RUNS -
    # -------------

    def list_run_steps(
        self,
        sort_by: str = "created",
        page: int = PAGINATION_STARTING_PAGE,
        size: int = PAGE_SIZE_DEFAULT,
        logical_operator: LogicalOperators = LogicalOperators.AND,
        id: Optional[Union[UUID, str]] = None,
        created: Optional[Union[datetime, str]] = None,
        updated: Optional[Union[datetime, str]] = None,
        name: Optional[str] = None,
        entrypoint_name: Optional[str] = None,
        code_hash: Optional[str] = None,
        cache_key: Optional[str] = None,
        status: Optional[str] = None,
        start_time: Optional[Union[datetime, str]] = None,
        end_time: Optional[Union[datetime, str]] = None,
        pipeline_run_id: Optional[Union[str, UUID]] = None,
        original_step_run_id: Optional[Union[str, UUID]] = None,
        workspace_id: Optional[Union[str, UUID]] = None,
        user_id: Optional[Union[str, UUID]] = None,
        num_outputs: Optional[Union[int, str]] = None,
    ) -> Page[StepRunResponseModel]:
        """List all pipelines.

        Args:
            sort_by: The column to sort by
            page: The page of items
            size: The maximum size of all pages
            logical_operator: Which logical operator to use [and, or]
            id: Use the id of runs to filter by.
            created: Use to filter by time of creation
            updated: Use the last updated date for filtering
            start_time: Use to filter by the time when the step started running
            end_time: Use to filter by the time when the step finished running
            workspace_id: The id of the workspace to filter by.
            user_id: The  id of the user to filter by.
            pipeline_run_id: The  id of the pipeline run to filter by.
            original_step_run_id: The  id of the pipeline run to filter by.
            name: The name of the run to filter by.
            entrypoint_name: The entrypoint_name of the run to filter by.
            code_hash: The code_hash of the run to filter by.
            cache_key: The cache_key of the run to filter by.
            status: The name of the run to filter by.
            num_outputs: The number of outputs for the step run

        Returns:
            A page with Pipeline fitting the filter description
        """
        step_run_filter_model = StepRunFilterModel(
            sort_by=sort_by,
            page=page,
            size=size,
            logical_operator=logical_operator,
            id=id,
            entrypoint_name=entrypoint_name,
            code_hash=code_hash,
            cache_key=cache_key,
            pipeline_run_id=pipeline_run_id,
            original_step_run_id=original_step_run_id,
            status=status,
            created=created,
            updated=updated,
            start_time=start_time,
            end_time=end_time,
            name=name,
            workspace_id=workspace_id,
            user_id=user_id,
            num_outputs=num_outputs,
        )
        step_run_filter_model.set_scope_workspace(self.active_workspace.id)
        return self.zen_store.list_run_steps(
            step_run_filter_model=step_run_filter_model
        )

    def get_run_step(self, step_run_id: UUID) -> StepRunResponseModel:
        """Get a step run by ID.

        Args:
            step_run_id: The ID of the step run to get.

        Returns:
            The step run.
        """
        return self.zen_store.get_run_step(step_run_id)

    # -------------
    # - Artifacts -
    # -------------

    def list_artifacts(
        self,
        sort_by: str = "created",
        page: int = PAGINATION_STARTING_PAGE,
        size: int = PAGE_SIZE_DEFAULT,
        logical_operator: LogicalOperators = LogicalOperators.AND,
        id: Optional[Union[UUID, str]] = None,
        created: Optional[Union[datetime, str]] = None,
        updated: Optional[Union[datetime, str]] = None,
        name: Optional[str] = None,
        artifact_store_id: Optional[Union[str, UUID]] = None,
        type: Optional[ArtifactType] = None,
        data_type: Optional[str] = None,
        uri: Optional[str] = None,
        materializer: Optional[str] = None,
        workspace_id: Optional[Union[str, UUID]] = None,
        user_id: Optional[Union[str, UUID]] = None,
        only_unused: Optional[bool] = False,
    ) -> Page[ArtifactResponseModel]:
        """Get all artifacts.

        Args:
            sort_by: The column to sort by
            page: The page of items
            size: The maximum size of all pages
            logical_operator: Which logical operator to use [and, or]
            id: Use the id of runs to filter by.
            created: Use to filter by time of creation
            updated: Use the last updated date for filtering
            name: The name of the run to filter by.
            artifact_store_id: The id of the artifact store to filter by.
            type: The type of the artifact to filter by.
            data_type: The data type of the artifact to filter by.
            uri: The uri of the artifact to filter by.
            materializer: The materializer of the artifact to filter by.
            workspace_id: The id of the workspace to filter by.
            user_id: The  id of the user to filter by.
            only_unused: Only return artifacts that are not used in any runs.

        Returns:
            A list of artifacts.
        """
        artifact_filter_model = ArtifactFilterModel(
            sort_by=sort_by,
            page=page,
            size=size,
            logical_operator=logical_operator,
            id=id,
            created=created,
            updated=updated,
            name=name,
            artifact_store_id=artifact_store_id,
            type=type,
            data_type=data_type,
            uri=uri,
            materializer=materializer,
            workspace_id=workspace_id,
            user_id=user_id,
            only_unused=only_unused,
        )
        artifact_filter_model.set_scope_workspace(self.active_workspace.id)
        return self.zen_store.list_artifacts(artifact_filter_model)

    def get_artifact(self, artifact_id: UUID) -> ArtifactResponseModel:
        """Get an artifact by ID.

        Args:
            artifact_id: The ID of the artifact to get.

        Returns:
            The artifact.
        """
        return self.zen_store.get_artifact(artifact_id)

    def delete_artifact(
        self,
        artifact_id: UUID,
        delete_metadata: bool = True,
        delete_from_artifact_store: bool = False,
    ) -> None:
        """Delete an artifact.

        By default, this will delete only the metadata of the artifact from the
        database, not the artifact itself.

        Args:
            artifact_id: The ID of the artifact to delete.
            delete_metadata: If True, delete the metadata of the artifact from
                the database.
            delete_from_artifact_store: If True, delete the artifact itself from
                the artifact store.
        """
        artifact = self.get_artifact(artifact_id=artifact_id)
        if delete_from_artifact_store:
            self._delete_artifact_from_artifact_store(artifact=artifact)
        if delete_metadata:
            self._delete_artifact_metadata(artifact=artifact)

    def _delete_artifact_from_artifact_store(
        self, artifact: ArtifactResponseModel
    ) -> None:
        """Delete an artifact from the artifact store.

        Args:
            artifact: The artifact to delete.

        Raises:
            Exception: If the artifact store is inaccessible.
        """
        from zenml.artifact_stores.base_artifact_store import BaseArtifactStore
        from zenml.stack.stack_component import StackComponent

        if not artifact.artifact_store_id:
            logger.warning(
                f"Artifact '{artifact.uri}' does not have an artifact store "
                "associated with it. Skipping deletion from artifact store."
            )
            return
        try:
            artifact_store_model = self.get_stack_component(
                component_type=StackComponentType.ARTIFACT_STORE,
                name_id_or_prefix=artifact.artifact_store_id,
            )
            artifact_store = StackComponent.from_model(artifact_store_model)
            assert isinstance(artifact_store, BaseArtifactStore)
            artifact_store.rmtree(artifact.uri)
        except Exception as e:
            logger.error(
                f"Failed to delete artifact '{artifact.uri}' from the "
                "artifact store. This might happen if your local client "
                "does not have access to the artifact store or does not "
                "have the required integrations installed. Full error: "
                f"{e}"
            )
            raise e
        else:
            logger.info(
                f"Deleted artifact '{artifact.uri}' from the artifact store."
            )

    def _delete_artifact_metadata(
        self, artifact: ArtifactResponseModel
    ) -> None:
        """Delete the metadata of an artifact from the database.

        Args:
            artifact: The artifact to delete.

        Raises:
            ValueError: If the artifact is still used in any runs.
        """
        if artifact not in depaginate(
            partial(self.list_artifacts, only_unused=True)
        ):
            raise ValueError(
                "The metadata of artifacts that are used in runs cannot be "
                "deleted. Please delete all runs that use this artifact "
                "first."
            )
        self.zen_store.delete_artifact(artifact.id)
        logger.info(f"Deleted metadata of artifact '{artifact.uri}'.")

    # ----------------
    # - Run Metadata -
    # ----------------

    def create_run_metadata(
        self,
        metadata: Dict[str, "MetadataType"],
        pipeline_run_id: Optional[UUID] = None,
        step_run_id: Optional[UUID] = None,
        artifact_id: Optional[UUID] = None,
        stack_component_id: Optional[UUID] = None,
    ) -> Dict[str, RunMetadataResponseModel]:
        """Create run metadata.

        Args:
            metadata: The metadata to create as a dictionary of key-value pairs.
            pipeline_run_id: The ID of the pipeline run during which the
                metadata was produced. If provided, `step_run_id` and
                `artifact_id` must be None.
            step_run_id: The ID of the step run during which the metadata was
                produced. If provided, `pipeline_run_id` and `artifact_id` must
                be None.
            artifact_id: The ID of the artifact for which the metadata was
                produced. If provided, `pipeline_run_id` and `step_run_id` must
                be None.
            stack_component_id: The ID of the stack component that produced
                the metadata.

        Returns:
            The created metadata, as string to model dictionary.

        Raises:
            ValueError: If not exactly one of either `pipeline_run_id`,
                `step_run_id`, or `artifact_id` is provided.
        """
        from zenml.metadata.metadata_types import get_metadata_type

        if not (pipeline_run_id or step_run_id or artifact_id):
            raise ValueError(
                "Cannot create run metadata without linking it to any entity. "
                "Please provide either a `pipeline_run_id`, `step_run_id`, or "
                "`artifact_id`."
            )
        if (
            (pipeline_run_id and step_run_id)
            or (pipeline_run_id and artifact_id)
            or (step_run_id and artifact_id)
        ):
            raise ValueError(
                "Cannot create run metadata linked to multiple entities. "
                "Please provide only a `pipeline_run_id` or only a "
                "`step_run_id` or only an `artifact_id`."
            )

        created_metadata: Dict[str, RunMetadataResponseModel] = {}
        for key, value in metadata.items():
            # Skip metadata that is too large to be stored in the database.
            if len(json.dumps(value)) > TEXT_FIELD_MAX_LENGTH:
                logger.warning(
                    f"Metadata value for key '{key}' is too large to be "
                    "stored in the database. Skipping."
                )
                continue

            # Skip metadata that is not of a supported type.
            try:
                metadata_type = get_metadata_type(value)
            except ValueError as e:
                logger.warning(
                    f"Metadata value for key '{key}' is not of a supported "
                    f"type. Skipping. Full error: {e}"
                )
                continue

            run_metadata = RunMetadataRequestModel(
                workspace=self.active_workspace.id,
                user=self.active_user.id,
                pipeline_run_id=pipeline_run_id,
                step_run_id=step_run_id,
                artifact_id=artifact_id,
                stack_component_id=stack_component_id,
                key=key,
                value=value,
                type=metadata_type,
            )
            metadata_model = self.zen_store.create_run_metadata(run_metadata)
            created_metadata[key] = metadata_model
        return created_metadata

    def list_run_metadata(
        self,
        sort_by: str = "created",
        page: int = PAGINATION_STARTING_PAGE,
        size: int = PAGE_SIZE_DEFAULT,
        logical_operator: LogicalOperators = LogicalOperators.AND,
        id: Optional[Union[UUID, str]] = None,
        created: Optional[Union[datetime, str]] = None,
        updated: Optional[Union[datetime, str]] = None,
        workspace_id: Optional[UUID] = None,
        user_id: Optional[UUID] = None,
        pipeline_run_id: Optional[UUID] = None,
        step_run_id: Optional[UUID] = None,
        artifact_id: Optional[UUID] = None,
        stack_component_id: Optional[UUID] = None,
        key: Optional[str] = None,
        value: Optional["MetadataType"] = None,
        type: Optional[str] = None,
    ) -> Page[RunMetadataResponseModel]:
        """List run metadata.

        Args:
            sort_by: The field to sort the results by.
            page: The page number to return.
            size: The number of results to return per page.
            logical_operator: The logical operator to use for filtering.
            id: The ID of the metadata.
            created: The creation time of the metadata.
            updated: The last update time of the metadata.
            workspace_id: The ID of the workspace the metadata belongs to.
            user_id: The ID of the user that created the metadata.
            pipeline_run_id: The ID of the pipeline run the metadata belongs to.
            step_run_id: The ID of the step run the metadata belongs to.
            artifact_id: The ID of the artifact the metadata belongs to.
            stack_component_id: The ID of the stack component that produced
                the metadata.
            key: The key of the metadata.
            value: The value of the metadata.
            type: The type of the metadata.

        Returns:
            The run metadata.
        """
        metadata_filter_model = RunMetadataFilterModel(
            sort_by=sort_by,
            page=page,
            size=size,
            logical_operator=logical_operator,
            id=id,
            created=created,
            updated=updated,
            workspace_id=workspace_id,
            user_id=user_id,
            pipeline_run_id=pipeline_run_id,
            step_run_id=step_run_id,
            artifact_id=artifact_id,
            stack_component_id=stack_component_id,
            key=key,
            value=value,
            type=type,
        )
        metadata_filter_model.set_scope_workspace(self.active_workspace.id)
        return self.zen_store.list_run_metadata(metadata_filter_model)

    # .---------.
    # | SECRETS |
    # '---------'

    def create_secret(
        self,
        name: str,
        values: Dict[str, str],
        scope: SecretScope = SecretScope.WORKSPACE,
    ) -> "SecretResponseModel":
        """Creates a new secret.

        Args:
            name: The name of the secret.
            values: The values of the secret.
            scope: The scope of the secret.

        Returns:
            The created secret (in model form).

        Raises:
            NotImplementedError: If centralized secrets management is not
                enabled.
        """
        create_secret_request = SecretRequestModel(
            name=name,
            values=values,
            scope=scope,
            user=self.active_user.id,
            workspace=self.active_workspace.id,
        )
        try:
            return self.zen_store.create_secret(secret=create_secret_request)
        except NotImplementedError:
            raise NotImplementedError(
                "centralized secrets management is not supported or explicitly "
                "disabled in the target ZenML deployment."
            )

    def get_secret(
        self,
        name_id_or_prefix: Union[str, UUID],
        scope: Optional[SecretScope] = None,
        allow_partial_name_match: bool = True,
        allow_partial_id_match: bool = True,
    ) -> "SecretResponseModel":
        """Get a secret.

        Get a secret identified by a name, ID or prefix of the name or ID and
        optionally a scope.

        If a scope is not provided, the secret will be searched for in all
        scopes starting with the innermost scope (user) to the outermost scope
        (workspace). When a name or prefix is used instead of a UUID value, each
        scope is first searched for an exact match, then for a ID prefix or
        name substring match before moving on to the next scope.

        Args:
            name_id_or_prefix: The name, ID or prefix to the id of the secret
                to get.
            scope: The scope of the secret. If not set, all scopes will be
                searched starting with the innermost scope (user) to the
                outermost scope (global) until a secret is found.
            allow_partial_name_match: If True, allow partial name matches.
            allow_partial_id_match: If True, allow partial ID matches.

        Returns:
            The secret.

        Raises:
            KeyError: If no secret is found.
            ZenKeyError: If multiple secrets are found.
            NotImplementedError: If centralized secrets management is not
                enabled.
        """
        from zenml.utils.uuid_utils import is_valid_uuid

        try:
            # First interpret as full UUID
            if is_valid_uuid(name_id_or_prefix):
                # Fetch by ID; filter by scope if provided
                secret = self.zen_store.get_secret(
                    secret_id=UUID(name_id_or_prefix)
                    if isinstance(name_id_or_prefix, str)
                    else name_id_or_prefix
                )
                if scope is not None and secret.scope != scope:
                    raise KeyError(
                        f"No secret found with ID {str(name_id_or_prefix)}"
                    )

                return secret
        except NotImplementedError:
            raise NotImplementedError(
                "centralized secrets management is not supported or explicitly "
                "disabled in the target ZenML deployment."
            )

        # If not a UUID, try to find by name and then by prefix
        assert not isinstance(name_id_or_prefix, UUID)

        # Scopes to search in order of priority
        search_scopes = (
            [SecretScope.USER, SecretScope.WORKSPACE]
            if scope is None
            else [scope]
        )

        secrets = self.list_secrets(
            logical_operator=LogicalOperators.OR,
            name=f"contains:{name_id_or_prefix}"
            if allow_partial_name_match
            else f"equals:{name_id_or_prefix}",
            id=f"startswith:{name_id_or_prefix}"
            if allow_partial_id_match
            else None,
        )

        for search_scope in search_scopes:
            partial_matches: List[SecretResponseModel] = []
            for secret in secrets.items:
                if secret.scope != search_scope:
                    continue
                # Exact match
                if secret.name == name_id_or_prefix:
                    # Need to fetch the secret again to get the secret values
                    return self.zen_store.get_secret(secret_id=secret.id)
                # Partial match
                partial_matches.append(secret)

            if len(partial_matches) > 1:
                match_summary = "\n".join(
                    [
                        f"[{secret.id}]: name = {secret.name}"
                        for secret in partial_matches
                    ]
                )
                raise ZenKeyError(
                    f"{len(partial_matches)} secrets have been found that have "
                    f"a name or ID that matches the provided "
                    f"string '{name_id_or_prefix}':\n"
                    f"{match_summary}.\n"
                    f"Please use the id to uniquely identify "
                    f"only one of the secrets."
                )

            # If only a single secret is found, return it
            if len(partial_matches) == 1:
                # Need to fetch the secret again to get the secret values
                return self.zen_store.get_secret(
                    secret_id=partial_matches[0].id
                )

        msg = (
            f"No secret found with name, ID or prefix "
            f"'{name_id_or_prefix}'"
        )
        if scope is not None:
            msg += f" in scope '{scope}'"

        raise KeyError(msg)

    def get_secret_by_name_and_scope(
        self, name: str, scope: Optional[SecretScope] = None
    ) -> "SecretResponseModel":
        """Fetches a registered secret with a given name and optional scope.

        This is a version of get_secret that restricts the search to a given
        name and an optional scope, without doing any prefix or UUID matching.

        If no scope is provided, the search will be done first in the user
        scope, then in the workspace scope.

        Args:
            name: The name of the secret to get.
            scope: The scope of the secret to get.

        Returns:
            The registered secret.

        Raises:
            KeyError: If no secret exists for the given name in the given scope.
        """
        logger.debug(
            f"Fetching the secret with name '{name}' and scope '{scope}'."
        )

        # Scopes to search in order of priority
        search_scopes = (
            [SecretScope.USER, SecretScope.WORKSPACE]
            if scope is None
            else [scope]
        )

        for search_scope in search_scopes:
            secrets = self.list_secrets(
                logical_operator=LogicalOperators.AND,
                name=f"equals:{name}",
                scope=search_scope,
            )

            if len(secrets.items) >= 1:
                # Need to fetch the secret again to get the secret values
                return self.zen_store.get_secret(secret_id=secrets.items[0].id)

        msg = f"No secret with name '{name}' was found"
        if scope is not None:
            msg += f" in scope '{scope.value}'"

        raise KeyError(msg)

    def list_secrets(
        self,
        sort_by: str = "created",
        page: int = PAGINATION_STARTING_PAGE,
        size: int = PAGE_SIZE_DEFAULT,
        logical_operator: LogicalOperators = LogicalOperators.AND,
        id: Optional[Union[UUID, str]] = None,
        created: Optional[datetime] = None,
        updated: Optional[datetime] = None,
        name: Optional[str] = None,
        scope: Optional[SecretScope] = None,
        workspace_id: Optional[Union[str, UUID]] = None,
        user_id: Optional[Union[str, UUID]] = None,
    ) -> Page[SecretResponseModel]:
        """Fetches all the secret models.

        The returned secrets do not contain the secret values. To get the
        secret values, use `get_secret` individually for each secret.

        Args:
            sort_by: The column to sort by
            page: The page of items
            size: The maximum size of all pages
            logical_operator: Which logical operator to use [and, or]
            id: Use the id of secrets to filter by.
            created: Use to secrets by time of creation
            updated: Use the last updated date for filtering
            name: The name of the secret to filter by.
            scope: The scope of the secret to filter by.
            workspace_id: The id of the workspace to filter by.
            user_id: The  id of the user to filter by.

        Returns:
            A list of all the secret models without the secret values.

        Raises:
            NotImplementedError: If centralized secrets management is not
                enabled.
        """
        secret_filter_model = SecretFilterModel(
            page=page,
            size=size,
            sort_by=sort_by,
            logical_operator=logical_operator,
            user_id=user_id,
            workspace_id=workspace_id,
            name=name,
            scope=scope,
            id=id,
            created=created,
            updated=updated,
        )
        secret_filter_model.set_scope_workspace(self.active_workspace.id)
        try:
            return self.zen_store.list_secrets(
                secret_filter_model=secret_filter_model
            )
        except NotImplementedError:
            raise NotImplementedError(
                "centralized secrets management is not supported or explicitly "
                "disabled in the target ZenML deployment."
            )

    def list_secrets_in_scope(
        self,
        scope: SecretScope,
    ) -> Page[SecretResponseModel]:
        """Fetches the list of secret in a given scope.

        The returned secrets do not contain the secret values. To get the
        secret values, use `get_secret` individually for each secret.

        Args:
            scope: The secrets scope to search for.

        Returns:
            The list of secrets in the given scope without the secret values.
        """
        logger.debug(f"Fetching the secrets in scope {scope.value}.")

        return self.list_secrets(
            scope=scope,
        )

    def update_secret(
        self,
        name_id_or_prefix: Union[str, UUID],
        scope: Optional[SecretScope] = None,
        new_name: Optional[str] = None,
        new_scope: Optional[SecretScope] = None,
        add_or_update_values: Optional[Dict[str, str]] = None,
        remove_values: Optional[List[str]] = None,
    ) -> SecretResponseModel:
        """Updates a secret.

        Args:
            name_id_or_prefix: The name, id or prefix of the id for the
                secret to update.
            scope: The scope of the secret to update.
            new_name: The new name of the secret.
            new_scope: The new scope of the secret.
            add_or_update_values: The values to add or update.
            remove_values: The values to remove.

        Returns:
            The updated secret.

        Raises:
            KeyError: If trying to remove a value that doesn't exist.
            ValueError: If a key is provided in both add_or_update_values and
                remove_values.
        """
        secret = self.get_secret(
            name_id_or_prefix=name_id_or_prefix,
            scope=scope,
            # Don't allow partial name matches, but allow partial ID matches
            allow_partial_name_match=False,
            allow_partial_id_match=True,
        )

        secret_update = SecretUpdateModel(name=new_name or secret.name)  # type: ignore[call-arg]

        if new_scope:
            secret_update.scope = new_scope
        values: Dict[str, Optional[SecretStr]] = {}
        if add_or_update_values:
            values.update(
                {
                    key: SecretStr(value)
                    for key, value in add_or_update_values.items()
                }
            )
        if remove_values:
            for key in remove_values:
                if key not in secret.values:
                    raise KeyError(
                        f"Cannot remove value '{key}' from secret "
                        f"'{secret.name}' because it does not exist."
                    )
                if key in values:
                    raise ValueError(
                        f"Key '{key}' is supplied both in the values to add or "
                        f"update and the values to be removed."
                    )
                values[key] = None
        if values:
            secret_update.values = values

        return Client().zen_store.update_secret(
            secret_id=secret.id, secret_update=secret_update
        )

    def delete_secret(
        self, name_id_or_prefix: str, scope: Optional[SecretScope] = None
    ) -> None:
        """Deletes a secret.

        Args:
            name_id_or_prefix: The name or ID of the secret.
            scope: The scope of the secret to delete.
        """
        secret = self.get_secret(
            name_id_or_prefix=name_id_or_prefix,
            scope=scope,
            # Don't allow partial name matches, but allow partial ID matches
            allow_partial_name_match=False,
            allow_partial_id_match=True,
        )

        self.zen_store.delete_secret(secret_id=secret.id)

    # .-------------------.
    # | CODE REPOSITORIES |
    # '-------------------'

    def create_code_repository(
        self,
        name: str,
        config: Dict[str, Any],
        source: Source,
        description: Optional[str] = None,
        logo_url: Optional[str] = None,
    ) -> CodeRepositoryResponseModel:
        """Create a new code repository.

        Args:
            name: Name of the code repository.
            config: The configuration for the code repository.
            source: The code repository implementation source.
            description: The code repository description.
            logo_url: URL of a logo (png, jpg or svg) for the code repository.

        Returns:
            The created code repository.

        Raises:
            RuntimeError: If the provided config is invalid.
        """
        from zenml.code_repositories import BaseCodeRepository

        code_repo_class: Type[
            BaseCodeRepository
        ] = source_utils.load_and_validate_class(
            source=source, expected_class=BaseCodeRepository
        )
        try:
            # Validate the repo config
            code_repo_class(id=uuid4(), config=config)
        except Exception as e:
            raise RuntimeError(
                "Failed to validate code repository config."
            ) from e

        repo_request = CodeRepositoryRequestModel(
            user=self.active_user.id,
            workspace=self.active_workspace.id,
            name=name,
            config=config,
            source=source,
            description=description,
            logo_url=logo_url,
        )
        return self.zen_store.create_code_repository(
            code_repository=repo_request
        )

    def list_code_repositories(
        self,
        sort_by: str = "created",
        page: int = PAGINATION_STARTING_PAGE,
        size: int = PAGE_SIZE_DEFAULT,
        logical_operator: LogicalOperators = LogicalOperators.AND,
        id: Optional[Union[UUID, str]] = None,
        created: Optional[Union[datetime, str]] = None,
        updated: Optional[Union[datetime, str]] = None,
        name: Optional[str] = None,
        workspace_id: Optional[Union[str, UUID]] = None,
        user_id: Optional[Union[str, UUID]] = None,
    ) -> Page[CodeRepositoryResponseModel]:
        """List all code repositories.

        Args:
            sort_by: The column to sort by.
            page: The page of items.
            size: The maximum size of all pages.
            logical_operator: Which logical operator to use [and, or].
            id: Use the id of the code repository to filter by.
            created: Use to filter by time of creation.
            updated: Use the last updated date for filtering.
            name: The name of the code repository to filter by.
            workspace_id: The id of the workspace to filter by.
            user_id: The id of the user to filter by.

        Returns:
            A page of code repositories matching the filter description.
        """
        filter_model = CodeRepositoryFilterModel(
            sort_by=sort_by,
            page=page,
            size=size,
            logical_operator=logical_operator,
            id=id,
            created=created,
            updated=updated,
            name=name,
            workspace_id=workspace_id,
            user_id=user_id,
        )
        filter_model.set_scope_workspace(self.active_workspace.id)
        return self.zen_store.list_code_repositories(filter_model=filter_model)

    def get_code_repository(
        self,
        name_id_or_prefix: Union[str, UUID],
        allow_name_prefix_match: bool = True,
    ) -> CodeRepositoryResponseModel:
        """Get a code repository by name, id or prefix.

        Args:
            name_id_or_prefix: The name, ID or ID prefix of the code repository.
            allow_name_prefix_match: If True, allow matching by name prefix.

        Returns:
            The code repository.
        """
        return self._get_entity_by_id_or_name_or_prefix(
            get_method=self.zen_store.get_code_repository,
            list_method=self.list_code_repositories,
            name_id_or_prefix=name_id_or_prefix,
            allow_name_prefix_match=allow_name_prefix_match,
        )

    def update_code_repository(
        self,
        name_id_or_prefix: Union[UUID, str],
        name: Optional[str] = None,
        description: Optional[str] = None,
        logo_url: Optional[str] = None,
    ) -> CodeRepositoryResponseModel:
        """Update a code repository.

        Args:
            name_id_or_prefix: Name, ID or prefix of the code repository to
                update.
            name: New name of the code repository.
            description: New description of the code repository.
            logo_url: New logo URL of the code repository.

        Returns:
            The updated code repository.
        """
        repo = self.get_code_repository(
            name_id_or_prefix=name_id_or_prefix, allow_name_prefix_match=False
        )
        update = CodeRepositoryUpdateModel(  # type: ignore[call-arg]
            name=name, description=description, logo_url=logo_url
        )
        return self.zen_store.update_code_repository(
            code_repository_id=repo.id, update=update
        )

    def delete_code_repository(
        self,
        name_id_or_prefix: Union[str, UUID],
    ) -> None:
        """Delete a code repository.

        Args:
            name_id_or_prefix: The name, ID or prefix of the code repository.
        """
        repo = self.get_code_repository(
            name_id_or_prefix=name_id_or_prefix, allow_name_prefix_match=False
        )
        self.zen_store.delete_code_repository(code_repository_id=repo.id)

    # .--------------------.
    # | SERVICE CONNECTORS |
    # '--------------------'

    def get_service_connector(
        self,
        name_id_or_prefix: Union[str, UUID],
        allow_name_prefix_match: bool = True,
        load_secrets: bool = False,
    ) -> "ServiceConnectorResponseModel":
        """Fetches a registered service connector.

        Args:
            name_id_or_prefix: The id of the service connector to fetch.
            allow_name_prefix_match: If True, allow matching by name prefix.
            load_secrets: If True, load the secrets for the service connector.

        Returns:
            The registered service connector.
        """

        def scoped_list_method(
            **kwargs: Any,
        ) -> Page[ServiceConnectorResponseModel]:
            """Call `zen_store.list_service_connectors` with workspace scoping.

            Args:
                **kwargs: Keyword arguments to pass to
                    `ServiceConnectorFilterModel`.

            Returns:
                The list of service connectors.
            """
            filter_model = ServiceConnectorFilterModel(**kwargs)
            filter_model.set_scope_workspace(self.active_workspace.id)
            return self.zen_store.list_service_connectors(
                filter_model=filter_model,
            )

        connector = self._get_entity_by_id_or_name_or_prefix(
            get_method=self.zen_store.get_service_connector,
            list_method=scoped_list_method,
            name_id_or_prefix=name_id_or_prefix,
            allow_name_prefix_match=allow_name_prefix_match,
        )

        if load_secrets and connector.secret_id:
            client = Client()
            try:
                secret = client.get_secret(
                    name_id_or_prefix=connector.secret_id,
                    allow_partial_id_match=False,
                    allow_partial_name_match=False,
                )
            except KeyError as err:
                logger.error(
                    "Unable to retrieve secret values associated with "
                    f"service connector '{connector.name}': {err}"
                )
            else:
                # Add secret values to connector configuration
                connector.secrets.update(secret.values)

        return connector

    def list_service_connectors(
        self,
        sort_by: str = "created",
        page: int = PAGINATION_STARTING_PAGE,
        size: int = PAGE_SIZE_DEFAULT,
        logical_operator: LogicalOperators = LogicalOperators.AND,
        id: Optional[Union[UUID, str]] = None,
        created: Optional[datetime] = None,
        updated: Optional[datetime] = None,
        is_shared: Optional[bool] = None,
        name: Optional[str] = None,
        connector_type: Optional[str] = None,
        auth_method: Optional[str] = None,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
        workspace_id: Optional[Union[str, UUID]] = None,
        user_id: Optional[Union[str, UUID]] = None,
        labels: Optional[Dict[str, Optional[str]]] = None,
        secret_id: Optional[Union[str, UUID]] = None,
    ) -> Page[ServiceConnectorResponseModel]:
        """Lists all registered service connectors.

        Args:
            sort_by: The column to sort by
            page: The page of items
            size: The maximum size of all pages
            logical_operator: Which logical operator to use [and, or]
            id: The id of the service connector to filter by.
            created: Filter service connectors by time of creation
            updated: Use the last updated date for filtering
            connector_type: Use the service connector type for filtering
            auth_method: Use the service connector auth method for filtering
            resource_type: Filter service connectors by the resource type that
                they can give access to.
            resource_id: Filter service connectors by the resource id that
                they can give access to.
            workspace_id: The id of the workspace to filter by.
            user_id: The id of the user to filter by.
            name: The name of the service connector to filter by.
            is_shared: The shared status of the service connector to filter by.
            labels: The labels of the service connector to filter by.
            secret_id: Filter by the id of the secret that is referenced by the
                service connector.

        Returns:
            A page of service connectors.
        """
        connector_filter_model = ServiceConnectorFilterModel(
            page=page,
            size=size,
            sort_by=sort_by,
            logical_operator=logical_operator,
            workspace_id=workspace_id or self.active_workspace.id,
            user_id=user_id,
            name=name,
            is_shared=is_shared,
            connector_type=connector_type,
            auth_method=auth_method,
            resource_type=resource_type,
            resource_id=resource_id,
            id=id,
            created=created,
            updated=updated,
            labels=labels,
            secret_id=secret_id,
        )
        connector_filter_model.set_scope_workspace(self.active_workspace.id)
        return self.zen_store.list_service_connectors(
            filter_model=connector_filter_model
        )

    def create_service_connector(
        self,
        name: str,
        connector_type: str,
        resource_type: Optional[str] = None,
        auth_method: Optional[str] = None,
        configuration: Optional[Dict[str, str]] = None,
        resource_id: Optional[str] = None,
        description: str = "",
        expiration_seconds: Optional[int] = None,
        expires_at: Optional[datetime] = None,
        is_shared: bool = False,
        labels: Optional[Dict[str, str]] = None,
        auto_configure: bool = False,
        verify: bool = True,
        list_resources: bool = True,
        register: bool = True,
    ) -> Tuple[
        Optional[
            Union[
                "ServiceConnectorResponseModel",
                "ServiceConnectorRequestModel",
            ]
        ],
        Optional[ServiceConnectorResourcesModel],
    ]:
        """Create, validate and/or register a service connector.

        Args:
            name: The name of the service connector.
            connector_type: The service connector type.
            auth_method: The authentication method of the service connector.
                May be omitted if auto-configuration is used.
            resource_type: The resource type for the service connector.
            configuration: The configuration of the service connector.
            resource_id: The resource id of the service connector.
            description: The description of the service connector.
            expiration_seconds: The expiration time of the service connector.
            expires_at: The expiration time of the service connector
                credentials.
            is_shared: Whether the service connector is shared or not.
            labels: The labels of the service connector.
            auto_configure: Whether to automatically configure the service
                connector from the local environment.
            verify: Whether to verify that the service connector configuration
                and credentials can be used to gain access to the resource.
            list_resources: Whether to also list the resources that the service
                connector can give access to (if verify is True).
            register: Whether to register the service connector or not.

        Returns:
            The model of the registered service connector and the resources
            that the service connector can give access to (if verify is True).

        Raises:
            ValueError: If the arguments are invalid.
            KeyError: If the service connector type is not found.
            NotImplementedError: If auto-configuration is not supported or
                not implemented for the service connector type.
            AuthorizationException: If the connector verification failed due
                to authorization issues.
        """
        from zenml.service_connectors.service_connector_registry import (
            service_connector_registry,
        )

        connector_instance: Optional[ServiceConnector] = None
        connector_resources: Optional[ServiceConnectorResourcesModel] = None

        # Get the service connector type class
        try:
            connector = self.zen_store.get_service_connector_type(
                connector_type=connector_type,
            )
        except KeyError:
            raise KeyError(
                f"Service connector type {connector_type} not found."
                "Please check that you have installed all required "
                "Python packages and ZenML integrations and try again."
            )

        if not resource_type:
            if len(connector.resource_types) == 1:
                resource_type = connector.resource_types[0].resource_type

        # If auto_configure is set, we will try to automatically configure the
        # service connector from the local environment
        if auto_configure:
            if not connector.supports_auto_configuration:
                raise NotImplementedError(
                    f"The {connector.name} service connector type "
                    "does not support auto-configuration."
                )
            if not connector.local:
                raise NotImplementedError(
                    f"The {connector.name} service connector type "
                    "implementation is not available locally. Please "
                    "check that you have installed all required Python "
                    "packages and ZenML integrations and try again, or "
                    "skip auto-configuration."
                )

            assert connector.connector_class is not None

            connector_instance = connector.connector_class.auto_configure(
                resource_type=resource_type,
                auth_method=auth_method,
                resource_id=resource_id,
            )
            assert connector_instance is not None
            connector_request = connector_instance.to_model(
                name=name,
                user=self.active_user.id,
                workspace=self.active_workspace.id,
                description=description or "",
                is_shared=is_shared,
                labels=labels,
            )

            if verify:
                # Prefer to verify the connector config server-side if the
                # implementation if available there, because it ensures
                # that the connector can be shared with other users or used
                # from other machines and because some auth methods rely on the
                # server-side authentication environment
                if connector.remote:
                    connector_resources = (
                        self.zen_store.verify_service_connector_config(
                            connector_request,
                            list_resources=list_resources,
                        )
                    )
                else:
                    connector_resources = connector_instance.verify(
                        list_resources=list_resources,
                    )

                if connector_resources.error:
                    # Raise an exception if the connector verification failed
                    raise AuthorizationException(connector_resources.error)

        else:
            if not auth_method:
                if len(connector.auth_methods) == 1:
                    auth_method = connector.auth_methods[0].auth_method
                else:
                    raise ValueError(
                        f"Multiple authentication methods are available for "
                        f"the {connector.name} service connector type. Please "
                        f"specify one of the following: "
                        f"{list(connector.auth_method_dict.keys())}."
                    )

            connector_request = ServiceConnectorRequestModel(
                name=name,
                connector_type=connector_type,
                description=description,
                auth_method=auth_method,
                expiration_seconds=expiration_seconds,
                expires_at=expires_at,
                is_shared=is_shared,
                user=self.active_user.id,
                workspace=self.active_workspace.id,
                labels=labels or {},
            )
            # Validate and configure the resources
            connector_request.validate_and_configure_resources(
                connector_type=connector,
                resource_types=resource_type,
                resource_id=resource_id,
                configuration=configuration,
            )
            if verify:
                # Prefer to verify the connector config server-side if the
                # implementation if available there, because it ensures
                # that the connector can be shared with other users or used
                # from other machines and because some auth methods rely on the
                # server-side authentication environment
                if connector.remote:
                    connector_resources = (
                        self.zen_store.verify_service_connector_config(
                            connector_request,
                            list_resources=list_resources,
                        )
                    )
                else:
                    connector_instance = (
                        service_connector_registry.instantiate_connector(
                            model=connector_request
                        )
                    )
                    connector_resources = connector_instance.verify(
                        list_resources=list_resources,
                    )

                if connector_resources.error:
                    # Raise an exception if the connector verification failed
                    raise AuthorizationException(connector_resources.error)

                # For resource types that don't support multi-instances, it's
                # better to save the default resource ID in the connector, if
                # available. Otherwise, we'll need to instantiate the connector
                # again to get the default resource ID.
                connector_request.resource_id = (
                    connector_request.resource_id
                    or connector_resources.get_default_resource_id()
                )

        if not register:
            return connector_request, connector_resources

        # Register the new model
        connector_response = self.zen_store.create_service_connector(
            service_connector=connector_request
        )

        if connector_resources:
            connector_resources.id = connector_response.id
            connector_resources.name = connector_response.name
            connector_resources.connector_type = (
                connector_response.connector_type
            )

        return connector_response, connector_resources

    def update_service_connector(
        self,
        name_id_or_prefix: Union[UUID, str],
        name: Optional[str] = None,
        auth_method: Optional[str] = None,
        resource_type: Optional[str] = None,
        configuration: Optional[Dict[str, str]] = None,
        resource_id: Optional[str] = None,
        description: Optional[str] = None,
        expiration_seconds: Optional[int] = None,
        is_shared: Optional[bool] = None,
        labels: Optional[Dict[str, Optional[str]]] = None,
        verify: bool = True,
        list_resources: bool = True,
        update: bool = True,
    ) -> Tuple[
        Optional[
            Union[
                "ServiceConnectorResponseModel",
                "ServiceConnectorUpdateModel",
            ]
        ],
        Optional[ServiceConnectorResourcesModel],
    ]:
        """Validate and/or register an updated service connector.

        If the `resource_type`, `resource_id` and `expiration_seconds`
        parameters are set to their "empty" values (empty string for resource
        type and resource ID, 0 for expiration seconds), the existing values
        will be removed from the service connector. Setting them to None or
        omitting them will not affect the existing values.

        If supplied, the `configuration` parameter is a full replacement of the
        existing configuration rather than a partial update.

        Labels can be updated or removed by setting the label value to None.

        Args:
            name_id_or_prefix: The name, id or prefix of the service connector
                to update.
            name: The new name of the service connector.
            auth_method: The new authentication method of the service connector.
            resource_type: The new resource type for the service connector.
                If set to the empty string, the existing resource type will be
                removed.
            configuration: The new configuration of the service connector. If
                set, this needs to be a full replacement of the existing
                configuration rather than a partial update.
            resource_id: The new resource id of the service connector.
                If set to the empty string, the existing resource ID will be
                removed.
            description: The description of the service connector.
            expiration_seconds: The expiration time of the service connector.
                If set to 0, the existing expiration time will be removed.
            is_shared: Whether the service connector is shared or not.
            labels: The service connector to update or remove. If a label value
                is set to None, the label will be removed.
            verify: Whether to verify that the service connector configuration
                and credentials can be used to gain access to the resource.
            list_resources: Whether to also list the resources that the service
                connector can give access to (if verify is True).
            update: Whether to update the service connector or not.

        Returns:
            The model of the registered service connector and the resources
            that the service connector can give access to (if verify is True).

        Raises:
            AuthorizationException: If the service connector verification
                fails due to invalid credentials or insufficient permissions.
        """
        from zenml.service_connectors.service_connector_registry import (
            service_connector_registry,
        )

        connector_model = self.get_service_connector(
            name_id_or_prefix,
            allow_name_prefix_match=False,
            load_secrets=True,
        )

        connector_instance: Optional[ServiceConnector] = None
        connector_resources: Optional[ServiceConnectorResourcesModel] = None

        if isinstance(connector_model.connector_type, str):
            connector = self.get_service_connector_type(
                connector_model.connector_type
            )
        else:
            connector = connector_model.connector_type

        resource_types: Optional[Union[str, List[str]]] = None
        if resource_type == "":
            resource_types = None
        elif resource_type is None:
            resource_types = connector_model.resource_types
        else:
            resource_types = resource_type

        if not resource_type:
            if len(connector.resource_types) == 1:
                resource_types = connector.resource_types[0].resource_type

        if resource_id == "":
            resource_id = None
        elif resource_id is None:
            resource_id = connector_model.resource_id

        if expiration_seconds == 0:
            expiration_seconds = None
        elif expiration_seconds is None:
            expiration_seconds = connector_model.expiration_seconds

        connector_update = ServiceConnectorUpdateModel(
            name=name or connector_model.name,
            connector_type=connector.connector_type,
            description=description or connector_model.description,
            auth_method=auth_method or connector_model.auth_method,
            expiration_seconds=expiration_seconds,
            is_shared=is_shared
            if is_shared is not None
            else connector_model.is_shared,
            user=self.active_user.id,
            workspace=self.active_workspace.id,
        )
        # Validate and configure the resources
        if configuration is not None:
            # The supplied configuration is a drop-in replacement for the
            # existing configuration and secrets
            connector_update.validate_and_configure_resources(
                connector_type=connector,
                resource_types=resource_types,
                resource_id=resource_id,
                configuration=configuration,
            )
        else:
            connector_update.validate_and_configure_resources(
                connector_type=connector,
                resource_types=resource_types,
                resource_id=resource_id,
                configuration=connector_model.configuration,
                secrets=connector_model.secrets,
            )

        # Add the labels
        if labels is not None:
            # Apply the new label values, but don't keep any labels that
            # have been set to None in the update
            connector_update.labels = {
                **{
                    label: value
                    for label, value in connector_model.labels.items()
                    if label not in labels
                },
                **{
                    label: value
                    for label, value in labels.items()
                    if value is not None
                },
            }
        else:
            connector_update.labels = connector_model.labels

        if verify:
            # Prefer to verify the connector config server-side if the
            # implementation if available there, because it ensures
            # that the connector can be shared with other users or used
            # from other machines and because some auth methods rely on the
            # server-side authentication environment
            if connector.remote:
                connector_resources = (
                    self.zen_store.verify_service_connector_config(
                        connector_update,
                        list_resources=list_resources,
                    )
                )
            else:
                connector_instance = (
                    service_connector_registry.instantiate_connector(
                        model=connector_update
                    )
                )
                connector_resources = connector_instance.verify(
                    list_resources=list_resources
                )

            if connector_resources.error:
                raise AuthorizationException(connector_resources.error)

            # For resource types that don't support multi-instances, it's
            # better to save the default resource ID in the connector, if
            # available. Otherwise, we'll need to instantiate the connector
            # again to get the default resource ID.
            connector_update.resource_id = (
                connector_update.resource_id
                or connector_resources.get_default_resource_id()
            )

        if not update:
            return connector_update, connector_resources

        # Update the model
        connector_response = self.zen_store.update_service_connector(
            service_connector_id=connector_model.id,
            update=connector_update,
        )

        if connector_resources:
            connector_resources.id = connector_response.id
            connector_resources.name = connector_response.name
            connector_resources.connector_type = (
                connector_response.connector_type
            )

        return connector_response, connector_resources

    def delete_service_connector(
        self,
        name_id_or_prefix: Union[str, UUID],
    ) -> None:
        """Deletes a registered service connector.

        Args:
            name_id_or_prefix: The ID or name of the service connector to delete.
        """
        service_connector = self.get_service_connector(
            name_id_or_prefix=name_id_or_prefix,
            allow_name_prefix_match=False,
        )

        self.zen_store.delete_service_connector(
            service_connector_id=service_connector.id
        )
        logger.info(
            "Removed service connector (type: %s) with name '%s'.",
            service_connector.type,
            service_connector.name,
        )

    def verify_service_connector(
        self,
        name_id_or_prefix: Union[UUID, str],
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
        list_resources: bool = True,
    ) -> "ServiceConnectorResourcesModel":
        """Verifies if a service connector has access to one or more resources.

        Args:
            name_id_or_prefix: The name, id or prefix of the service connector
                to verify.
            resource_type: The type of the resource for which to verify access.
                If not provided, the resource type from the service connector
                configuration will be used.
            resource_id: The ID of the resource for which to verify access. If
                not provided, the resource ID from the service connector
                configuration will be used.
            list_resources: Whether to list the resources that the service
                connector has access to.

        Returns:
            The list of resources that the service connector has access to,
            scoped to the supplied resource type and ID, if provided.

        Raises:
            AuthorizationException: If the service connector does not have
                access to the resources.
        """
        from zenml.service_connectors.service_connector_registry import (
            service_connector_registry,
        )

        # Get the service connector model
        service_connector = self.get_service_connector(
            name_id_or_prefix=name_id_or_prefix,
            allow_name_prefix_match=False,
        )

        connector_type = self.get_service_connector_type(
            service_connector.type
        )

        # Prefer to verify the connector config server-side if the
        # implementation if available there, because it ensures
        # that the connector can be shared with other users or used
        # from other machines and because some auth methods rely on the
        # server-side authentication environment
        if connector_type.remote:
            connector_resources = self.zen_store.verify_service_connector(
                service_connector_id=service_connector.id,
                resource_type=resource_type,
                resource_id=resource_id,
                list_resources=list_resources,
            )
        else:
            connector_instance = (
                service_connector_registry.instantiate_connector(
                    model=service_connector
                )
            )
            connector_resources = connector_instance.verify(
                resource_type=resource_type,
                resource_id=resource_id,
                list_resources=list_resources,
            )

        if connector_resources.error:
            raise AuthorizationException(connector_resources.error)

        return connector_resources

    def login_service_connector(
        self,
        name_id_or_prefix: Union[UUID, str],
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
        **kwargs: Any,
    ) -> "ServiceConnector":
        """Use a service connector to authenticate a local client/SDK.

        Args:
            name_id_or_prefix: The name, id or prefix of the service connector
                to use.
            resource_type: The type of the resource to connect to. If not
                provided, the resource type from the service connector
                configuration will be used.
            resource_id: The ID of a particular resource instance to configure
                the local client to connect to. If the connector instance is
                already configured with a resource ID that is not the same or
                equivalent to the one requested, a `ValueError` exception is
                raised. May be omitted for connectors and resource types that do
                not support multiple resource instances.
            kwargs: Additional implementation specific keyword arguments to use
                to configure the client.

        Returns:
            The service connector client instance that was used to configure the
            local client.
        """
        connector_client = self.get_service_connector_client(
            name_id_or_prefix=name_id_or_prefix,
            resource_type=resource_type,
            resource_id=resource_id,
        )

        connector_client.configure_local_client(
            **kwargs,
        )

        return connector_client

    def get_service_connector_client(
        self,
        name_id_or_prefix: Union[UUID, str],
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
    ) -> "ServiceConnector":
        """Get the client side of a service connector instance to use with a local client.

        Args:
            name_id_or_prefix: The name, id or prefix of the service connector
                to use.
            resource_type: The type of the resource to connect to. If not
                provided, the resource type from the service connector
                configuration will be used.
            resource_id: The ID of a particular resource instance to configure
                the local client to connect to. If the connector instance is
                already configured with a resource ID that is not the same or
                equivalent to the one requested, a `ValueError` exception is
                raised. May be omitted for connectors and resource types that do
                not support multiple resource instances.

        Returns:
            The client side of the indicated service connector instance that can
            be used to connect to the resource locally.
        """
        from zenml.service_connectors.service_connector_registry import (
            service_connector_registry,
        )

        # Get the service connector model
        service_connector = self.get_service_connector(
            name_id_or_prefix=name_id_or_prefix,
            allow_name_prefix_match=False,
        )

        connector_type = self.get_service_connector_type(
            service_connector.type
        )

        # Prefer to fetch the connector client from the server if the
        # implementation if available there, because some auth methods rely on
        # the server-side authentication environment
        if connector_type.remote:
            connector_client_model = (
                self.zen_store.get_service_connector_client(
                    service_connector_id=service_connector.id,
                    resource_type=resource_type,
                    resource_id=resource_id,
                )
            )

            connector_client = (
                service_connector_registry.instantiate_connector(
                    model=connector_client_model
                )
            )

            # Verify the connector client on the local machine, because the
            # server-side implementation may not be able to do so
            connector_client.verify()
        else:
            connector_instance = (
                service_connector_registry.instantiate_connector(
                    model=service_connector
                )
            )

            # Fetch the connector client
            connector_client = connector_instance.get_connector_client(
                resource_type=resource_type,
                resource_id=resource_id,
            )

        return connector_client

    def list_service_connector_resources(
        self,
        connector_type: Optional[str] = None,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
    ) -> List[ServiceConnectorResourcesModel]:
        """List resources that can be accessed by service connectors.

        Args:
            connector_type: The type of service connector to filter by.
            resource_type: The type of resource to filter by.
            resource_id: The ID of a particular resource instance to filter by.

        Returns:
            The matching list of resources that available service
            connectors have access to.
        """
        return self.zen_store.list_service_connector_resources(
            user_name_or_id=self.active_user.id,
            workspace_name_or_id=self.active_workspace.id,
            connector_type=connector_type,
            resource_type=resource_type,
            resource_id=resource_id,
        )

    def list_service_connector_types(
        self,
        connector_type: Optional[str] = None,
        resource_type: Optional[str] = None,
        auth_method: Optional[str] = None,
    ) -> List[ServiceConnectorTypeModel]:
        """Get a list of service connector types.

        Args:
            connector_type: Filter by connector type.
            resource_type: Filter by resource type.
            auth_method: Filter by authentication method.

        Returns:
            List of service connector types.
        """
        return self.zen_store.list_service_connector_types(
            connector_type=connector_type,
            resource_type=resource_type,
            auth_method=auth_method,
        )

    def get_service_connector_type(
        self,
        connector_type: str,
    ) -> ServiceConnectorTypeModel:
        """Returns the requested service connector type.

        Args:
            connector_type: the service connector type identifier.

        Returns:
            The requested service connector type.
        """
        return self.zen_store.get_service_connector_type(
            connector_type=connector_type,
        )

    # ---- utility prefix matching get functions -----

    @staticmethod
    def _get_entity_by_id_or_name_or_prefix(
        get_method: Callable[..., AnyResponseModel],
        list_method: Callable[..., Page[AnyResponseModel]],
        name_id_or_prefix: Union[str, UUID],
        allow_name_prefix_match: bool = True,
    ) -> "AnyResponseModel":
        """Fetches an entity using the id, name, or partial id/name.

        Args:
            get_method: The method to use to fetch the entity by id.
            list_method: The method to use to fetch all entities.
            name_id_or_prefix: The id, name or partial id of the entity to
                fetch.
            allow_name_prefix_match: If True, allow matching by name prefix.

        Returns:
            The entity with the given name, id or partial id.

        Raises:
            ZenKeyError: If there is more than one entity with that name
                or id prefix.
        """
        from zenml.utils.uuid_utils import is_valid_uuid

        # First interpret as full UUID
        if is_valid_uuid(name_id_or_prefix):
            return get_method(name_id_or_prefix)

        # If not a UUID, try to find by name
        assert not isinstance(name_id_or_prefix, UUID)
        entity = list_method(name=f"equals:{name_id_or_prefix}")

        # If only a single entity is found, return it
        if entity.total == 1:
            return entity.items[0]

        # If still no match, try with prefix now
        if entity.total == 0:
            return Client._get_entity_by_prefix(
                get_method=get_method,
                list_method=list_method,
                partial_id_or_name=name_id_or_prefix,
                allow_name_prefix_match=allow_name_prefix_match,
            )

        # If more than one entity with the same name is found, raise an error.
        entity_label = get_method.__name__.replace("get_", "") + "s"
        raise ZenKeyError(
            f"{entity.total} {entity_label} have been found that have "
            f"a name that matches the provided "
            f"string '{name_id_or_prefix}':\n"
            f"{[entity.items]}.\n"
            f"Please use the id to uniquely identify "
            f"only one of the {entity_label}s."
        )

    @staticmethod
    def _get_entity_by_prefix(
        get_method: Callable[..., AnyResponseModel],
        list_method: Callable[..., Page[AnyResponseModel]],
        partial_id_or_name: str,
        allow_name_prefix_match: bool,
    ) -> "AnyResponseModel":
        """Fetches an entity using a partial ID or name.

        Args:
            get_method: The method to use to fetch the entity by id.
            list_method: The method to use to fetch all entities.
            partial_id_or_name: The partial ID or name of the entity to fetch.
            allow_name_prefix_match: If True, allow matching by name prefix.

        Returns:
            The entity with the given partial ID or name.

        Raises:
            KeyError: If no entity with the given partial ID or name is found.
            ZenKeyError: If there is more than one entity with that partial ID
                or name.
        """
        list_method_args: Dict[str, Any] = {
            "logical_operator": LogicalOperators.OR,
            "id": f"startswith:{partial_id_or_name}",
        }
        if allow_name_prefix_match:
            list_method_args["name"] = f"startswith:{partial_id_or_name}"

        entity = list_method(**list_method_args)

        # If only a single entity is found, return it.
        if entity.total == 1:
            return entity.items[0]

        entity_label = get_method.__name__.replace("get_", "") + "s"

        prefix_description = (
            "a name/ID prefix" if allow_name_prefix_match else "an ID prefix"
        )
        # If no entity is found, raise an error.
        if entity.total == 0:
            raise KeyError(
                f"No {entity_label} have been found that have "
                f"{prefix_description} that matches the provided string "
                f"'{partial_id_or_name}'."
            )

        # If more than one entity is found, raise an error.
        ambiguous_entities: List[str] = []
        for model in entity.items:
            model_name = getattr(model, "name", None)
            if model_name:
                ambiguous_entities.append(f"{model_name}: {model.id}")
            else:
                ambiguous_entities.append(str(model.id))
        raise ZenKeyError(
            f"{entity.total} {entity_label} have been found that have "
            f"{prefix_description} that matches the provided "
            f"string '{partial_id_or_name}':\n"
            f"{ambiguous_entities}.\n"
            f"Please provide more characters to uniquely identify "
            f"only one of the {entity_label}s."
        )

active_stack: Stack property readonly

The active stack for this client.

Returns:

Type Description
Stack

The active stack for this client.

active_stack_model: StackResponseModel property readonly

The model of the active stack for this client.

If no active stack is configured locally for the client, the active stack in the global configuration is used instead.

Returns:

Type Description
StackResponseModel

The model of the active stack for this client.

Exceptions:

Type Description
RuntimeError

If the active stack is not set.

active_user: UserResponseModel property readonly

Get the user that is currently in use.

Returns:

Type Description
UserResponseModel

The active user.

active_workspace: WorkspaceResponseModel property readonly

Get the currently active workspace of the local client.

If no active workspace is configured locally for the client, the active workspace in the global configuration is used instead.

Returns:

Type Description
WorkspaceResponseModel

The active workspace.

Exceptions:

Type Description
RuntimeError

If the active workspace is not set.

config_directory: Optional[pathlib.Path] property readonly

The configuration directory of this client.

Returns:

Type Description
Optional[pathlib.Path]

The configuration directory of this client, or None, if the client doesn't have an active root.

root: Optional[pathlib.Path] property readonly

The root directory of this client.

Returns:

Type Description
Optional[pathlib.Path]

The root directory of this client, or None, if the client has not been initialized.

uses_local_configuration: bool property readonly

Check if the client is using a local configuration.

Returns:

Type Description
bool

True if the client is using a local configuration, False otherwise.

zen_store: BaseZenStore property readonly

Shortcut to return the global zen store.

Returns:

Type Description
BaseZenStore

The global zen store.

__init__(self, root=None) special

Initializes the global client instance.

Client is a singleton class: only one instance can exist. Calling this constructor multiple times will always yield the same instance (see the exception below).

The root argument is only meant for internal use and testing purposes. User code must never pass them to the constructor. When a custom root value is passed, an anonymous Client instance is created and returned independently of the Client singleton and that will have no effect as far as the rest of the ZenML core code is concerned.

Instead of creating a new Client instance to reflect a different repository root, to change the active root in the global Client, call Client().activate_root(<new-root>).

Parameters:

Name Type Description Default
root Optional[pathlib.Path]

(internal use) custom root directory for the client. If no path is given, the repository root is determined using the environment variable ZENML_REPOSITORY_PATH (if set) and by recursively searching in the parent directories of the current working directory. Only used to initialize new clients internally.

None
Source code in zenml/client.py
def __init__(
    self,
    root: Optional[Path] = None,
) -> None:
    """Initializes the global client instance.

    Client is a singleton class: only one instance can exist. Calling
    this constructor multiple times will always yield the same instance (see
    the exception below).

    The `root` argument is only meant for internal use and testing purposes.
    User code must never pass them to the constructor.
    When a custom `root` value is passed, an anonymous Client instance
    is created and returned independently of the Client singleton and
    that will have no effect as far as the rest of the ZenML core code is
    concerned.

    Instead of creating a new Client instance to reflect a different
    repository root, to change the active root in the global Client,
    call `Client().activate_root(<new-root>)`.

    Args:
        root: (internal use) custom root directory for the client. If
            no path is given, the repository root is determined using the
            environment variable `ZENML_REPOSITORY_PATH` (if set) and by
            recursively searching in the parent directories of the
            current working directory. Only used to initialize new
            clients internally.
    """
    self._root: Optional[Path] = None
    self._config: Optional[ClientConfiguration] = None

    self._set_active_root(root)

activate_root(self, root=None)

Set the active repository root directory.

Parameters:

Name Type Description Default
root Optional[pathlib.Path]

The path to set as the active repository root. If not set, the repository root is determined using the environment variable ZENML_REPOSITORY_PATH (if set) and by recursively searching in the parent directories of the current working directory.

None
Source code in zenml/client.py
def activate_root(self, root: Optional[Path] = None) -> None:
    """Set the active repository root directory.

    Args:
        root: The path to set as the active repository root. If not set,
            the repository root is determined using the environment
            variable `ZENML_REPOSITORY_PATH` (if set) and by recursively
            searching in the parent directories of the current working
            directory.
    """
    self._set_active_root(root)

activate_stack(self, stack_name_id_or_prefix)

Sets the stack as active.

Parameters:

Name Type Description Default
stack_name_id_or_prefix Union[str, uuid.UUID]

Model of the stack to activate.

required

Exceptions:

Type Description
KeyError

If the stack is not registered.

Source code in zenml/client.py
@track(event=AnalyticsEvent.SET_STACK)
def activate_stack(
    self, stack_name_id_or_prefix: Union[str, UUID]
) -> None:
    """Sets the stack as active.

    Args:
        stack_name_id_or_prefix: Model of the stack to activate.

    Raises:
        KeyError: If the stack is not registered.
    """
    # Make sure the stack is registered
    try:
        stack = self.get_stack(name_id_or_prefix=stack_name_id_or_prefix)
    except KeyError:
        raise KeyError(
            f"Stack '{stack_name_id_or_prefix}' cannot be activated since "
            f"it is not registered yet. Please register it first."
        )

    if self._config:
        self._config.set_active_stack(stack=stack)

    else:
        # set the active stack globally only if the client doesn't use
        # a local configuration
        GlobalConfiguration().set_active_stack(stack=stack)

create_code_repository(self, name, config, source, description=None, logo_url=None)

Create a new code repository.

Parameters:

Name Type Description Default
name str

Name of the code repository.

required
config Dict[str, Any]

The configuration for the code repository.

required
source Source

The code repository implementation source.

required
description Optional[str]

The code repository description.

None
logo_url Optional[str]

URL of a logo (png, jpg or svg) for the code repository.

None

Returns:

Type Description
CodeRepositoryResponseModel

The created code repository.

Exceptions:

Type Description
RuntimeError

If the provided config is invalid.

Source code in zenml/client.py
def create_code_repository(
    self,
    name: str,
    config: Dict[str, Any],
    source: Source,
    description: Optional[str] = None,
    logo_url: Optional[str] = None,
) -> CodeRepositoryResponseModel:
    """Create a new code repository.

    Args:
        name: Name of the code repository.
        config: The configuration for the code repository.
        source: The code repository implementation source.
        description: The code repository description.
        logo_url: URL of a logo (png, jpg or svg) for the code repository.

    Returns:
        The created code repository.

    Raises:
        RuntimeError: If the provided config is invalid.
    """
    from zenml.code_repositories import BaseCodeRepository

    code_repo_class: Type[
        BaseCodeRepository
    ] = source_utils.load_and_validate_class(
        source=source, expected_class=BaseCodeRepository
    )
    try:
        # Validate the repo config
        code_repo_class(id=uuid4(), config=config)
    except Exception as e:
        raise RuntimeError(
            "Failed to validate code repository config."
        ) from e

    repo_request = CodeRepositoryRequestModel(
        user=self.active_user.id,
        workspace=self.active_workspace.id,
        name=name,
        config=config,
        source=source,
        description=description,
        logo_url=logo_url,
    )
    return self.zen_store.create_code_repository(
        code_repository=repo_request
    )

create_flavor(self, source, component_type)

Creates a new flavor.

Parameters:

Name Type Description Default
source str

The flavor to create.

required
component_type StackComponentType

The type of the flavor.

required

Returns:

Type Description
FlavorResponseModel

The created flavor (in model form).

Exceptions:

Type Description
ValueError

in case the config_schema of the flavor is too large.

Source code in zenml/client.py
def create_flavor(
    self,
    source: str,
    component_type: StackComponentType,
) -> "FlavorResponseModel":
    """Creates a new flavor.

    Args:
        source: The flavor to create.
        component_type: The type of the flavor.

    Returns:
        The created flavor (in model form).

    Raises:
        ValueError: in case the config_schema of the flavor is too large.
    """
    from zenml.stack.flavor import validate_flavor_source

    flavor = validate_flavor_source(
        source=source, component_type=component_type
    )()

    if len(flavor.config_schema) > TEXT_FIELD_MAX_LENGTH:
        raise ValueError(
            "Json representation of configuration schema"
            "exceeds max length. This could be caused by an"
            "overly long docstring on the flavors "
            "configuration class' docstring."
        )

    create_flavor_request = FlavorRequestModel(
        source=source,
        type=flavor.type,
        name=flavor.name,
        config_schema=flavor.config_schema,
        integration="custom",
        user=self.active_user.id,
        workspace=self.active_workspace.id,
    )

    return self.zen_store.create_flavor(flavor=create_flavor_request)

create_role(self, name, permissions_list)

Creates a role.

Parameters:

Name Type Description Default
name str

The name for the new role.

required
permissions_list List[str]

The permissions to attach to this role.

required

Returns:

Type Description
RoleResponseModel

The newly created role.

Source code in zenml/client.py
def create_role(
    self, name: str, permissions_list: List[str]
) -> RoleResponseModel:
    """Creates a role.

    Args:
        name: The name for the new role.
        permissions_list: The permissions to attach to this role.

    Returns:
        The newly created role.
    """
    permissions: Set[PermissionType] = set()
    for permission in permissions_list:
        if permission in PermissionType.values():
            permissions.add(PermissionType(permission))

    new_role = RoleRequestModel(name=name, permissions=permissions)
    return self.zen_store.create_role(new_role)

create_run_metadata(self, metadata, pipeline_run_id=None, step_run_id=None, artifact_id=None, stack_component_id=None)

Create run metadata.

Parameters:

Name Type Description Default
metadata Dict[str, MetadataType]

The metadata to create as a dictionary of key-value pairs.

required
pipeline_run_id Optional[uuid.UUID]

The ID of the pipeline run during which the metadata was produced. If provided, step_run_id and artifact_id must be None.

None
step_run_id Optional[uuid.UUID]

The ID of the step run during which the metadata was produced. If provided, pipeline_run_id and artifact_id must be None.

None
artifact_id Optional[uuid.UUID]

The ID of the artifact for which the metadata was produced. If provided, pipeline_run_id and step_run_id must be None.

None
stack_component_id Optional[uuid.UUID]

The ID of the stack component that produced the metadata.

None

Returns:

Type Description
Dict[str, zenml.models.run_metadata_models.RunMetadataResponseModel]

The created metadata, as string to model dictionary.

Exceptions:

Type Description
ValueError

If not exactly one of either pipeline_run_id, step_run_id, or artifact_id is provided.

Source code in zenml/client.py
def create_run_metadata(
    self,
    metadata: Dict[str, "MetadataType"],
    pipeline_run_id: Optional[UUID] = None,
    step_run_id: Optional[UUID] = None,
    artifact_id: Optional[UUID] = None,
    stack_component_id: Optional[UUID] = None,
) -> Dict[str, RunMetadataResponseModel]:
    """Create run metadata.

    Args:
        metadata: The metadata to create as a dictionary of key-value pairs.
        pipeline_run_id: The ID of the pipeline run during which the
            metadata was produced. If provided, `step_run_id` and
            `artifact_id` must be None.
        step_run_id: The ID of the step run during which the metadata was
            produced. If provided, `pipeline_run_id` and `artifact_id` must
            be None.
        artifact_id: The ID of the artifact for which the metadata was
            produced. If provided, `pipeline_run_id` and `step_run_id` must
            be None.
        stack_component_id: The ID of the stack component that produced
            the metadata.

    Returns:
        The created metadata, as string to model dictionary.

    Raises:
        ValueError: If not exactly one of either `pipeline_run_id`,
            `step_run_id`, or `artifact_id` is provided.
    """
    from zenml.metadata.metadata_types import get_metadata_type

    if not (pipeline_run_id or step_run_id or artifact_id):
        raise ValueError(
            "Cannot create run metadata without linking it to any entity. "
            "Please provide either a `pipeline_run_id`, `step_run_id`, or "
            "`artifact_id`."
        )
    if (
        (pipeline_run_id and step_run_id)
        or (pipeline_run_id and artifact_id)
        or (step_run_id and artifact_id)
    ):
        raise ValueError(
            "Cannot create run metadata linked to multiple entities. "
            "Please provide only a `pipeline_run_id` or only a "
            "`step_run_id` or only an `artifact_id`."
        )

    created_metadata: Dict[str, RunMetadataResponseModel] = {}
    for key, value in metadata.items():
        # Skip metadata that is too large to be stored in the database.
        if len(json.dumps(value)) > TEXT_FIELD_MAX_LENGTH:
            logger.warning(
                f"Metadata value for key '{key}' is too large to be "
                "stored in the database. Skipping."
            )
            continue

        # Skip metadata that is not of a supported type.
        try:
            metadata_type = get_metadata_type(value)
        except ValueError as e:
            logger.warning(
                f"Metadata value for key '{key}' is not of a supported "
                f"type. Skipping. Full error: {e}"
            )
            continue

        run_metadata = RunMetadataRequestModel(
            workspace=self.active_workspace.id,
            user=self.active_user.id,
            pipeline_run_id=pipeline_run_id,
            step_run_id=step_run_id,
            artifact_id=artifact_id,
            stack_component_id=stack_component_id,
            key=key,
            value=value,
            type=metadata_type,
        )
        metadata_model = self.zen_store.create_run_metadata(run_metadata)
        created_metadata[key] = metadata_model
    return created_metadata

create_secret(self, name, values, scope=<SecretScope.WORKSPACE: 'workspace'>)

Creates a new secret.

Parameters:

Name Type Description Default
name str

The name of the secret.

required
values Dict[str, str]

The values of the secret.

required
scope SecretScope

The scope of the secret.

<SecretScope.WORKSPACE: 'workspace'>

Returns:

Type Description
SecretResponseModel

The created secret (in model form).

Exceptions:

Type Description
NotImplementedError

If centralized secrets management is not enabled.

Source code in zenml/client.py
def create_secret(
    self,
    name: str,
    values: Dict[str, str],
    scope: SecretScope = SecretScope.WORKSPACE,
) -> "SecretResponseModel":
    """Creates a new secret.

    Args:
        name: The name of the secret.
        values: The values of the secret.
        scope: The scope of the secret.

    Returns:
        The created secret (in model form).

    Raises:
        NotImplementedError: If centralized secrets management is not
            enabled.
    """
    create_secret_request = SecretRequestModel(
        name=name,
        values=values,
        scope=scope,
        user=self.active_user.id,
        workspace=self.active_workspace.id,
    )
    try:
        return self.zen_store.create_secret(secret=create_secret_request)
    except NotImplementedError:
        raise NotImplementedError(
            "centralized secrets management is not supported or explicitly "
            "disabled in the target ZenML deployment."
        )

create_service_connector(self, name, connector_type, resource_type=None, auth_method=None, configuration=None, resource_id=None, description='', expiration_seconds=None, expires_at=None, is_shared=False, labels=None, auto_configure=False, verify=True, list_resources=True, register=True)

Create, validate and/or register a service connector.

Parameters:

Name Type Description Default
name str

The name of the service connector.

required
connector_type str

The service connector type.

required
auth_method Optional[str]

The authentication method of the service connector. May be omitted if auto-configuration is used.

None
resource_type Optional[str]

The resource type for the service connector.

None
configuration Optional[Dict[str, str]]

The configuration of the service connector.

None
resource_id Optional[str]

The resource id of the service connector.

None
description str

The description of the service connector.

''
expiration_seconds Optional[int]

The expiration time of the service connector.

None
expires_at Optional[datetime.datetime]

The expiration time of the service connector credentials.

None
is_shared bool

Whether the service connector is shared or not.

False
labels Optional[Dict[str, str]]

The labels of the service connector.

None
auto_configure bool

Whether to automatically configure the service connector from the local environment.

False
verify bool

Whether to verify that the service connector configuration and credentials can be used to gain access to the resource.

True
list_resources bool

Whether to also list the resources that the service connector can give access to (if verify is True).

True
register bool

Whether to register the service connector or not.

True

Returns:

Type Description
Tuple[Union[ServiceConnectorResponseModel, ServiceConnectorRequestModel, NoneType], Union[zenml.models.service_connector_models.ServiceConnectorResourcesModel]]

The model of the registered service connector and the resources that the service connector can give access to (if verify is True).

Exceptions:

Type Description
ValueError

If the arguments are invalid.

KeyError

If the service connector type is not found.

NotImplementedError

If auto-configuration is not supported or not implemented for the service connector type.

AuthorizationException

If the connector verification failed due to authorization issues.

Source code in zenml/client.py
def create_service_connector(
    self,
    name: str,
    connector_type: str,
    resource_type: Optional[str] = None,
    auth_method: Optional[str] = None,
    configuration: Optional[Dict[str, str]] = None,
    resource_id: Optional[str] = None,
    description: str = "",
    expiration_seconds: Optional[int] = None,
    expires_at: Optional[datetime] = None,
    is_shared: bool = False,
    labels: Optional[Dict[str, str]] = None,
    auto_configure: bool = False,
    verify: bool = True,
    list_resources: bool = True,
    register: bool = True,
) -> Tuple[
    Optional[
        Union[
            "ServiceConnectorResponseModel",
            "ServiceConnectorRequestModel",
        ]
    ],
    Optional[ServiceConnectorResourcesModel],
]:
    """Create, validate and/or register a service connector.

    Args:
        name: The name of the service connector.
        connector_type: The service connector type.
        auth_method: The authentication method of the service connector.
            May be omitted if auto-configuration is used.
        resource_type: The resource type for the service connector.
        configuration: The configuration of the service connector.
        resource_id: The resource id of the service connector.
        description: The description of the service connector.
        expiration_seconds: The expiration time of the service connector.
        expires_at: The expiration time of the service connector
            credentials.
        is_shared: Whether the service connector is shared or not.
        labels: The labels of the service connector.
        auto_configure: Whether to automatically configure the service
            connector from the local environment.
        verify: Whether to verify that the service connector configuration
            and credentials can be used to gain access to the resource.
        list_resources: Whether to also list the resources that the service
            connector can give access to (if verify is True).
        register: Whether to register the service connector or not.

    Returns:
        The model of the registered service connector and the resources
        that the service connector can give access to (if verify is True).

    Raises:
        ValueError: If the arguments are invalid.
        KeyError: If the service connector type is not found.
        NotImplementedError: If auto-configuration is not supported or
            not implemented for the service connector type.
        AuthorizationException: If the connector verification failed due
            to authorization issues.
    """
    from zenml.service_connectors.service_connector_registry import (
        service_connector_registry,
    )

    connector_instance: Optional[ServiceConnector] = None
    connector_resources: Optional[ServiceConnectorResourcesModel] = None

    # Get the service connector type class
    try:
        connector = self.zen_store.get_service_connector_type(
            connector_type=connector_type,
        )
    except KeyError:
        raise KeyError(
            f"Service connector type {connector_type} not found."
            "Please check that you have installed all required "
            "Python packages and ZenML integrations and try again."
        )

    if not resource_type:
        if len(connector.resource_types) == 1:
            resource_type = connector.resource_types[0].resource_type

    # If auto_configure is set, we will try to automatically configure the
    # service connector from the local environment
    if auto_configure:
        if not connector.supports_auto_configuration:
            raise NotImplementedError(
                f"The {connector.name} service connector type "
                "does not support auto-configuration."
            )
        if not connector.local:
            raise NotImplementedError(
                f"The {connector.name} service connector type "
                "implementation is not available locally. Please "
                "check that you have installed all required Python "
                "packages and ZenML integrations and try again, or "
                "skip auto-configuration."
            )

        assert connector.connector_class is not None

        connector_instance = connector.connector_class.auto_configure(
            resource_type=resource_type,
            auth_method=auth_method,
            resource_id=resource_id,
        )
        assert connector_instance is not None
        connector_request = connector_instance.to_model(
            name=name,
            user=self.active_user.id,
            workspace=self.active_workspace.id,
            description=description or "",
            is_shared=is_shared,
            labels=labels,
        )

        if verify:
            # Prefer to verify the connector config server-side if the
            # implementation if available there, because it ensures
            # that the connector can be shared with other users or used
            # from other machines and because some auth methods rely on the
            # server-side authentication environment
            if connector.remote:
                connector_resources = (
                    self.zen_store.verify_service_connector_config(
                        connector_request,
                        list_resources=list_resources,
                    )
                )
            else:
                connector_resources = connector_instance.verify(
                    list_resources=list_resources,
                )

            if connector_resources.error:
                # Raise an exception if the connector verification failed
                raise AuthorizationException(connector_resources.error)

    else:
        if not auth_method:
            if len(connector.auth_methods) == 1:
                auth_method = connector.auth_methods[0].auth_method
            else:
                raise ValueError(
                    f"Multiple authentication methods are available for "
                    f"the {connector.name} service connector type. Please "
                    f"specify one of the following: "
                    f"{list(connector.auth_method_dict.keys())}."
                )

        connector_request = ServiceConnectorRequestModel(
            name=name,
            connector_type=connector_type,
            description=description,
            auth_method=auth_method,
            expiration_seconds=expiration_seconds,
            expires_at=expires_at,
            is_shared=is_shared,
            user=self.active_user.id,
            workspace=self.active_workspace.id,
            labels=labels or {},
        )
        # Validate and configure the resources
        connector_request.validate_and_configure_resources(
            connector_type=connector,
            resource_types=resource_type,
            resource_id=resource_id,
            configuration=configuration,
        )
        if verify:
            # Prefer to verify the connector config server-side if the
            # implementation if available there, because it ensures
            # that the connector can be shared with other users or used
            # from other machines and because some auth methods rely on the
            # server-side authentication environment
            if connector.remote:
                connector_resources = (
                    self.zen_store.verify_service_connector_config(
                        connector_request,
                        list_resources=list_resources,
                    )
                )
            else:
                connector_instance = (
                    service_connector_registry.instantiate_connector(
                        model=connector_request
                    )
                )
                connector_resources = connector_instance.verify(
                    list_resources=list_resources,
                )

            if connector_resources.error:
                # Raise an exception if the connector verification failed
                raise AuthorizationException(connector_resources.error)

            # For resource types that don't support multi-instances, it's
            # better to save the default resource ID in the connector, if
            # available. Otherwise, we'll need to instantiate the connector
            # again to get the default resource ID.
            connector_request.resource_id = (
                connector_request.resource_id
                or connector_resources.get_default_resource_id()
            )

    if not register:
        return connector_request, connector_resources

    # Register the new model
    connector_response = self.zen_store.create_service_connector(
        service_connector=connector_request
    )

    if connector_resources:
        connector_resources.id = connector_response.id
        connector_resources.name = connector_response.name
        connector_resources.connector_type = (
            connector_response.connector_type
        )

    return connector_response, connector_resources

create_stack(self, name, components, is_shared=False)

Registers a stack and its components.

Parameters:

Name Type Description Default
name str

The name of the stack to register.

required
components Mapping[zenml.enums.StackComponentType, Union[str, uuid.UUID]]

dictionary which maps component types to component names

required
is_shared bool

boolean to decide whether the stack is shared

False

Returns:

Type Description
StackResponseModel

The model of the registered stack.

Exceptions:

Type Description
ValueError

If the stack contains private components and is attempted to be registered as shared.

Source code in zenml/client.py
def create_stack(
    self,
    name: str,
    components: Mapping[StackComponentType, Union[str, UUID]],
    is_shared: bool = False,
) -> "StackResponseModel":
    """Registers a stack and its components.

    Args:
        name: The name of the stack to register.
        components: dictionary which maps component types to component names
        is_shared: boolean to decide whether the stack is shared

    Returns:
        The model of the registered stack.

    Raises:
        ValueError: If the stack contains private components and is
            attempted to be registered as shared.
    """
    stack_components = dict()

    for c_type, c_identifier in components.items():
        # Skip non-existent components.
        if not c_identifier:
            continue

        # Get the component.
        component = self.get_stack_component(
            name_id_or_prefix=c_identifier,
            component_type=c_type,
        )
        stack_components[c_type] = [component.id]

        # Raise an error if private components are used in a shared stack.
        if is_shared and not component.is_shared:
            raise ValueError(
                f"You attempted to include the private {c_type} "
                f"'{component.name}' in a shared stack. This is not "
                f"supported. You can either share the {c_type} with the "
                f"following command:\n"
                f"`zenml {c_type.replace('_', '-')} share`{component.id}`\n"
                f"or create the stack privately and then share it and all "
                f"of its components using:\n`zenml stack share {name} -r`"
            )

    stack = StackRequestModel(
        name=name,
        components=stack_components,
        is_shared=is_shared,
        workspace=self.active_workspace.id,
        user=self.active_user.id,
    )

    self._validate_stack_configuration(stack=stack)

    return self.zen_store.create_stack(stack=stack)

create_stack_component(self, name, flavor, component_type, configuration, labels=None, is_shared=False)

Registers a stack component.

Parameters:

Name Type Description Default
name str

The name of the stack component.

required
flavor str

The flavor of the stack component.

required
component_type StackComponentType

The type of the stack component.

required
configuration Dict[str, str]

The configuration of the stack component.

required
labels Optional[Dict[str, Any]]

The labels of the stack component.

None
is_shared bool

Whether the stack component is shared or not.

False

Returns:

Type Description
ComponentResponseModel

The model of the registered component.

Source code in zenml/client.py
def create_stack_component(
    self,
    name: str,
    flavor: str,
    component_type: StackComponentType,
    configuration: Dict[str, str],
    labels: Optional[Dict[str, Any]] = None,
    is_shared: bool = False,
) -> "ComponentResponseModel":
    """Registers a stack component.

    Args:
        name: The name of the stack component.
        flavor: The flavor of the stack component.
        component_type: The type of the stack component.
        configuration: The configuration of the stack component.
        labels: The labels of the stack component.
        is_shared: Whether the stack component is shared or not.

    Returns:
        The model of the registered component.
    """
    # Get the flavor model
    flavor_model = self.get_flavor_by_name_and_type(
        name=flavor,
        component_type=component_type,
    )

    # Create and validate the configuration
    from zenml.stack import Flavor

    flavor_class = Flavor.from_model(flavor_model)
    configuration_obj = flavor_class.config_class(
        warn_about_plain_text_secrets=True, **configuration
    )

    self._validate_stack_component_configuration(
        component_type, configuration=configuration_obj
    )

    create_component_model = ComponentRequestModel(
        name=name,
        type=component_type,
        flavor=flavor,
        configuration=configuration,
        is_shared=is_shared,
        user=self.active_user.id,
        workspace=self.active_workspace.id,
        labels=labels,
    )

    # Register the new model
    return self.zen_store.create_stack_component(
        component=create_component_model
    )

create_team(self, name, users=None)

Create a team.

Parameters:

Name Type Description Default
name str

Name of the team.

required
users Optional[List[str]]

Users to add to the team.

None

Returns:

Type Description
TeamResponseModel

The created team.

Source code in zenml/client.py
def create_team(
    self, name: str, users: Optional[List[str]] = None
) -> TeamResponseModel:
    """Create a team.

    Args:
        name: Name of the team.
        users: Users to add to the team.

    Returns:
        The created team.
    """
    user_list = []
    if users:
        for user_name_or_id in users:
            user_list.append(
                self.get_user(name_id_or_prefix=user_name_or_id).id
            )

    team = TeamRequestModel(name=name, users=user_list)

    return self.zen_store.create_team(team=team)

create_team_role_assignment(self, role_name_or_id, team_name_or_id, workspace_name_or_id=None)

Create a role assignment.

Parameters:

Name Type Description Default
role_name_or_id Union[str, uuid.UUID]

Name or ID of the role to assign.

required
team_name_or_id Union[str, uuid.UUID]

Name or ID of the team to assign the role to.

required
workspace_name_or_id Union[uuid.UUID, str]

workspace scope within which to assign the role.

None

Returns:

Type Description
TeamRoleAssignmentResponseModel

The newly created role assignment.

Source code in zenml/client.py
def create_team_role_assignment(
    self,
    role_name_or_id: Union[str, UUID],
    team_name_or_id: Union[str, UUID],
    workspace_name_or_id: Optional[Union[str, UUID]] = None,
) -> TeamRoleAssignmentResponseModel:
    """Create a role assignment.

    Args:
        role_name_or_id: Name or ID of the role to assign.
        team_name_or_id: Name or ID of the team to assign
            the role to.
        workspace_name_or_id: workspace scope within which to assign the role.

    Returns:
        The newly created role assignment.
    """
    role = self.get_role(name_id_or_prefix=role_name_or_id)
    workspace = None
    if workspace_name_or_id:
        workspace = self.get_workspace(
            name_id_or_prefix=workspace_name_or_id
        )
    team = self.get_team(name_id_or_prefix=team_name_or_id)
    role_assignment = TeamRoleAssignmentRequestModel(
        role=role.id,
        team=team.id,
        workspace=workspace,
    )
    return self.zen_store.create_team_role_assignment(
        team_role_assignment=role_assignment
    )

create_user(self, name, initial_role=None, password=None)

Create a new user.

Parameters:

Name Type Description Default
name str

The name of the user.

required
initial_role Optional[str]

Optionally, an initial role to assign to the user.

None
password Optional[str]

The password of the user. If not provided, the user will be created with empty password.

None

Returns:

Type Description
UserResponseModel

The model of the created user.

Source code in zenml/client.py
def create_user(
    self,
    name: str,
    initial_role: Optional[str] = None,
    password: Optional[str] = None,
) -> UserResponseModel:
    """Create a new user.

    Args:
        name: The name of the user.
        initial_role: Optionally, an initial role to assign to the user.
        password: The password of the user. If not provided, the user will
            be created with empty password.

    Returns:
        The model of the created user.
    """
    user = UserRequestModel(name=name, password=password or None)
    if self.zen_store.type != StoreType.REST:
        user.active = password != ""
    else:
        user.active = True

    created_user = self.zen_store.create_user(user=user)

    if initial_role:
        self.create_user_role_assignment(
            role_name_or_id=initial_role,
            user_name_or_id=created_user.id,
            workspace_name_or_id=None,
        )

    return created_user

create_user_role_assignment(self, role_name_or_id, user_name_or_id, workspace_name_or_id=None)

Create a role assignment.

Parameters:

Name Type Description Default
role_name_or_id Union[str, uuid.UUID]

Name or ID of the role to assign.

required
user_name_or_id Union[str, uuid.UUID]

Name or ID of the user or team to assign the role to.

required
workspace_name_or_id Union[uuid.UUID, str]

workspace scope within which to assign the role.

None

Returns:

Type Description
UserRoleAssignmentResponseModel

The newly created role assignment.

Source code in zenml/client.py
def create_user_role_assignment(
    self,
    role_name_or_id: Union[str, UUID],
    user_name_or_id: Union[str, UUID],
    workspace_name_or_id: Optional[Union[str, UUID]] = None,
) -> UserRoleAssignmentResponseModel:
    """Create a role assignment.

    Args:
        role_name_or_id: Name or ID of the role to assign.
        user_name_or_id: Name or ID of the user or team to assign
            the role to.
        workspace_name_or_id: workspace scope within which to assign the role.

    Returns:
        The newly created role assignment.
    """
    role = self.get_role(name_id_or_prefix=role_name_or_id)
    workspace = None
    if workspace_name_or_id:
        workspace = self.get_workspace(
            name_id_or_prefix=workspace_name_or_id
        )
    user = self.get_user(name_id_or_prefix=user_name_or_id)
    role_assignment = UserRoleAssignmentRequestModel(
        role=role.id,
        user=user.id,
        workspace=workspace,
    )
    return self.zen_store.create_user_role_assignment(
        user_role_assignment=role_assignment
    )

create_workspace(self, name, description)

Create a new workspace.

Parameters:

Name Type Description Default
name str

Name of the workspace.

required
description str

Description of the workspace.

required

Returns:

Type Description
WorkspaceResponseModel

The created workspace.

Source code in zenml/client.py
def create_workspace(
    self, name: str, description: str
) -> "WorkspaceResponseModel":
    """Create a new workspace.

    Args:
        name: Name of the workspace.
        description: Description of the workspace.

    Returns:
        The created workspace.
    """
    return self.zen_store.create_workspace(
        WorkspaceRequestModel(name=name, description=description)
    )

delete_artifact(self, artifact_id, delete_metadata=True, delete_from_artifact_store=False)

Delete an artifact.

By default, this will delete only the metadata of the artifact from the database, not the artifact itself.

Parameters:

Name Type Description Default
artifact_id UUID

The ID of the artifact to delete.

required
delete_metadata bool

If True, delete the metadata of the artifact from the database.

True
delete_from_artifact_store bool

If True, delete the artifact itself from the artifact store.

False
Source code in zenml/client.py
def delete_artifact(
    self,
    artifact_id: UUID,
    delete_metadata: bool = True,
    delete_from_artifact_store: bool = False,
) -> None:
    """Delete an artifact.

    By default, this will delete only the metadata of the artifact from the
    database, not the artifact itself.

    Args:
        artifact_id: The ID of the artifact to delete.
        delete_metadata: If True, delete the metadata of the artifact from
            the database.
        delete_from_artifact_store: If True, delete the artifact itself from
            the artifact store.
    """
    artifact = self.get_artifact(artifact_id=artifact_id)
    if delete_from_artifact_store:
        self._delete_artifact_from_artifact_store(artifact=artifact)
    if delete_metadata:
        self._delete_artifact_metadata(artifact=artifact)

delete_build(self, id_or_prefix)

Delete a build.

Parameters:

Name Type Description Default
id_or_prefix str

The id or id prefix of the build.

required
Source code in zenml/client.py
def delete_build(self, id_or_prefix: str) -> None:
    """Delete a build.

    Args:
        id_or_prefix: The id or id prefix of the build.
    """
    build = self.get_build(id_or_prefix=id_or_prefix)
    self.zen_store.delete_build(build_id=build.id)

delete_code_repository(self, name_id_or_prefix)

Delete a code repository.

Parameters:

Name Type Description Default
name_id_or_prefix Union[str, uuid.UUID]

The name, ID or prefix of the code repository.

required
Source code in zenml/client.py
def delete_code_repository(
    self,
    name_id_or_prefix: Union[str, UUID],
) -> None:
    """Delete a code repository.

    Args:
        name_id_or_prefix: The name, ID or prefix of the code repository.
    """
    repo = self.get_code_repository(
        name_id_or_prefix=name_id_or_prefix, allow_name_prefix_match=False
    )
    self.zen_store.delete_code_repository(code_repository_id=repo.id)

delete_deployment(self, id_or_prefix)

Delete a deployment.

Parameters:

Name Type Description Default
id_or_prefix str

The id or id prefix of the deployment.

required
Source code in zenml/client.py
def delete_deployment(self, id_or_prefix: str) -> None:
    """Delete a deployment.

    Args:
        id_or_prefix: The id or id prefix of the deployment.
    """
    deployment = self.get_deployment(id_or_prefix=id_or_prefix)
    self.zen_store.delete_deployment(deployment_id=deployment.id)

delete_flavor(self, name_id_or_prefix)

Deletes a flavor.

Parameters:

Name Type Description Default
name_id_or_prefix str

The name, id or prefix of the id for the flavor to delete.

required
Source code in zenml/client.py
def delete_flavor(self, name_id_or_prefix: str) -> None:
    """Deletes a flavor.

    Args:
        name_id_or_prefix: The name, id or prefix of the id for the
            flavor to delete.
    """
    flavor = self.get_flavor(
        name_id_or_prefix, allow_name_prefix_match=False
    )
    self.zen_store.delete_flavor(flavor_id=flavor.id)

    logger.info(f"Deleted flavor '{flavor.name}' of type '{flavor.type}'.")

delete_pipeline(self, name_id_or_prefix, version=None)

Delete a pipeline.

Parameters:

Name Type Description Default
name_id_or_prefix Union[str, uuid.UUID]

The name, ID or ID prefix of the pipeline.

required
version Optional[str]

The pipeline version. If left empty, will delete the latest version.

None
Source code in zenml/client.py
def delete_pipeline(
    self,
    name_id_or_prefix: Union[str, UUID],
    version: Optional[str] = None,
) -> None:
    """Delete a pipeline.

    Args:
        name_id_or_prefix: The name, ID or ID prefix of the pipeline.
        version: The pipeline version. If left empty, will delete
            the latest version.
    """
    pipeline = self.get_pipeline(
        name_id_or_prefix=name_id_or_prefix, version=version
    )
    self.zen_store.delete_pipeline(pipeline_id=pipeline.id)

delete_pipeline_run(self, name_id_or_prefix)

Deletes a pipeline run.

Parameters:

Name Type Description Default
name_id_or_prefix Union[str, uuid.UUID]

Name, ID, or prefix of the pipeline run.

required
Source code in zenml/client.py
def delete_pipeline_run(
    self,
    name_id_or_prefix: Union[str, UUID],
) -> None:
    """Deletes a pipeline run.

    Args:
        name_id_or_prefix: Name, ID, or prefix of the pipeline run.
    """
    run = self.get_pipeline_run(
        name_id_or_prefix=name_id_or_prefix, allow_name_prefix_match=False
    )
    self.zen_store.delete_run(run_id=run.id)

delete_role(self, name_id_or_prefix)

Deletes a role.

Parameters:

Name Type Description Default
name_id_or_prefix str

The name or ID of the role.

required
Source code in zenml/client.py
def delete_role(self, name_id_or_prefix: str) -> None:
    """Deletes a role.

    Args:
        name_id_or_prefix: The name or ID of the role.
    """
    role = self.get_role(
        name_id_or_prefix=name_id_or_prefix, allow_name_prefix_match=False
    )
    self.zen_store.delete_role(role_name_or_id=role.id)

delete_schedule(self, name_id_or_prefix)

Delete a schedule.

Parameters:

Name Type Description Default
name_id_or_prefix Union[str, uuid.UUID]

The name, id or prefix id of the schedule to delete.

required
Source code in zenml/client.py
def delete_schedule(self, name_id_or_prefix: Union[str, UUID]) -> None:
    """Delete a schedule.

    Args:
        name_id_or_prefix: The name, id or prefix id of the schedule
            to delete.
    """
    schedule = self.get_schedule(
        name_id_or_prefix=name_id_or_prefix, allow_name_prefix_match=False
    )
    logger.warning(
        f"Deleting schedule '{name_id_or_prefix}'... This will only delete "
        "the reference of the schedule from ZenML. Please make sure to "
        "manually stop/delete this schedule in your orchestrator as well!"
    )
    self.zen_store.delete_schedule(schedule_id=schedule.id)

delete_secret(self, name_id_or_prefix, scope=None)

Deletes a secret.

Parameters:

Name Type Description Default
name_id_or_prefix str

The name or ID of the secret.

required
scope Optional[zenml.enums.SecretScope]

The scope of the secret to delete.

None
Source code in zenml/client.py
def delete_secret(
    self, name_id_or_prefix: str, scope: Optional[SecretScope] = None
) -> None:
    """Deletes a secret.

    Args:
        name_id_or_prefix: The name or ID of the secret.
        scope: The scope of the secret to delete.
    """
    secret = self.get_secret(
        name_id_or_prefix=name_id_or_prefix,
        scope=scope,
        # Don't allow partial name matches, but allow partial ID matches
        allow_partial_name_match=False,
        allow_partial_id_match=True,
    )

    self.zen_store.delete_secret(secret_id=secret.id)

delete_service_connector(self, name_id_or_prefix)

Deletes a registered service connector.

Parameters:

Name Type Description Default
name_id_or_prefix Union[str, uuid.UUID]

The ID or name of the service connector to delete.

required
Source code in zenml/client.py
def delete_service_connector(
    self,
    name_id_or_prefix: Union[str, UUID],
) -> None:
    """Deletes a registered service connector.

    Args:
        name_id_or_prefix: The ID or name of the service connector to delete.
    """
    service_connector = self.get_service_connector(
        name_id_or_prefix=name_id_or_prefix,
        allow_name_prefix_match=False,
    )

    self.zen_store.delete_service_connector(
        service_connector_id=service_connector.id
    )
    logger.info(
        "Removed service connector (type: %s) with name '%s'.",
        service_connector.type,
        service_connector.name,
    )

delete_stack(self, name_id_or_prefix, recursive=False)

Deregisters a stack.

Parameters:

Name Type Description Default
name_id_or_prefix Union[str, uuid.UUID]

The name, id or prefix id of the stack to deregister.

required
recursive bool

If True, all components of the stack which are not associated with any other stack will also be deleted.

False

Exceptions:

Type Description
ValueError

If the stack is the currently active stack for this client.

Source code in zenml/client.py
def delete_stack(
    self, name_id_or_prefix: Union[str, UUID], recursive: bool = False
) -> None:
    """Deregisters a stack.

    Args:
        name_id_or_prefix: The name, id or prefix id of the stack
            to deregister.
        recursive: If `True`, all components of the stack which are not
            associated with any other stack will also be deleted.

    Raises:
        ValueError: If the stack is the currently active stack for this
            client.
    """
    stack = self.get_stack(
        name_id_or_prefix=name_id_or_prefix, allow_name_prefix_match=False
    )

    if stack.id == self.active_stack_model.id:
        raise ValueError(
            f"Unable to deregister active stack '{stack.name}'. Make "
            f"sure to designate a new active stack before deleting this "
            f"one."
        )

    cfg = GlobalConfiguration()
    if stack.id == cfg.active_stack_id:
        raise ValueError(
            f"Unable to deregister '{stack.name}' as it is the active "
            f"stack within your global configuration. Make "
            f"sure to designate a new active stack before deleting this "
            f"one."
        )

    if recursive:
        stack_components_free_for_deletion = []

        # Get all stack components associated with this stack
        for component_type, component_model in stack.components.items():
            # Get stack associated with the stack component

            stacks = self.list_stacks(
                component_id=component_model[0].id, size=2, page=1
            )

            # Check if the stack component is part of another stack
            if len(stacks) == 1:
                if stack.id == stacks[0].id:
                    stack_components_free_for_deletion.append(
                        (component_type, component_model)
                    )

        self.delete_stack(stack.id)

        for (
            stack_component_type,
            stack_component_model,
        ) in stack_components_free_for_deletion:
            self.delete_stack_component(
                stack_component_model[0].name, stack_component_type
            )

        logger.info("Deregistered stack with name '%s'.", stack.name)
        return

    self.zen_store.delete_stack(stack_id=stack.id)
    logger.info("Deregistered stack with name '%s'.", stack.name)

delete_stack_component(self, name_id_or_prefix, component_type)

Deletes a registered stack component.

Parameters:

Name Type Description Default
name_id_or_prefix Union[str, uuid.UUID]

The model of the component to delete.

required
component_type StackComponentType

The type of the component to delete.

required
Source code in zenml/client.py
def delete_stack_component(
    self,
    name_id_or_prefix: Union[str, UUID],
    component_type: StackComponentType,
) -> None:
    """Deletes a registered stack component.

    Args:
        name_id_or_prefix: The model of the component to delete.
        component_type: The type of the component to delete.
    """
    component = self.get_stack_component(
        name_id_or_prefix=name_id_or_prefix,
        component_type=component_type,
        allow_name_prefix_match=False,
    )

    self.zen_store.delete_stack_component(component_id=component.id)
    logger.info(
        "Deregistered stack component (type: %s) with name '%s'.",
        component.type,
        component.name,
    )

delete_team(self, name_id_or_prefix)

Delete a team.

Parameters:

Name Type Description Default
name_id_or_prefix str

The name or ID of the team to delete.

required
Source code in zenml/client.py
def delete_team(self, name_id_or_prefix: str) -> None:
    """Delete a team.

    Args:
        name_id_or_prefix: The name or ID of the team to delete.
    """
    team = self.get_team(name_id_or_prefix, allow_name_prefix_match=False)
    self.zen_store.delete_team(team_name_or_id=team.id)

delete_team_role_assignment(self, role_assignment_id)

Delete a role assignment.

Parameters:

Name Type Description Default
role_assignment_id UUID

The id of the role assignments

required
Source code in zenml/client.py
def delete_team_role_assignment(self, role_assignment_id: UUID) -> None:
    """Delete a role assignment.

    Args:
        role_assignment_id: The id of the role assignments

    """
    self.zen_store.delete_team_role_assignment(role_assignment_id)

delete_user(self, name_id_or_prefix)

Delete a user.

Parameters:

Name Type Description Default
name_id_or_prefix str

The name or ID of the user to delete.

required
Source code in zenml/client.py
def delete_user(self, name_id_or_prefix: str) -> None:
    """Delete a user.

    Args:
        name_id_or_prefix: The name or ID of the user to delete.
    """
    user = self.get_user(name_id_or_prefix, allow_name_prefix_match=False)
    self.zen_store.delete_user(user_name_or_id=user.name)

delete_user_role_assignment(self, role_assignment_id)

Delete a role assignment.

Parameters:

Name Type Description Default
role_assignment_id UUID

The id of the role assignments

required
Source code in zenml/client.py
def delete_user_role_assignment(self, role_assignment_id: UUID) -> None:
    """Delete a role assignment.

    Args:
        role_assignment_id: The id of the role assignments

    """
    self.zen_store.delete_user_role_assignment(role_assignment_id)

delete_workspace(self, name_id_or_prefix)

Delete a workspace.

Parameters:

Name Type Description Default
name_id_or_prefix str

The name or ID of the workspace to delete.

required

Exceptions:

Type Description
IllegalOperationError

If the workspace to delete is the active workspace.

Source code in zenml/client.py
def delete_workspace(self, name_id_or_prefix: str) -> None:
    """Delete a workspace.

    Args:
        name_id_or_prefix: The name or ID of the workspace to delete.

    Raises:
        IllegalOperationError: If the workspace to delete is the active
            workspace.
    """
    workspace = self.get_workspace(
        name_id_or_prefix, allow_name_prefix_match=False
    )
    if self.active_workspace.id == workspace.id:
        raise IllegalOperationError(
            f"Workspace '{name_id_or_prefix}' cannot be deleted since "
            "it is currently active. Please set another workspace as "
            "active first."
        )
    self.zen_store.delete_workspace(workspace_name_or_id=workspace.id)

deploy_stack_component(self, name, flavor, cloud, component_type, configuration={}, labels=None)

Deploys a stack component.

Parameters:

Name Type Description Default
name str

The name of the deployed stack component.

required
flavor str

The flavor of the deployed stack component.

required
cloud str

The cloud of the deployed stack component.

required
component_type StackComponentType

The type of the stack component to deploy.

required
configuration Optional[Dict[str, Any]]

The configuration of the deployed stack component.

{}
labels Optional[Dict[str, Any]]

The labels of the deployed stack component.

None

Returns:

Type Description
Optional[ComponentResponseModel]

The deployed stack component.

Source code in zenml/client.py
def deploy_stack_component(
    self,
    name: str,
    flavor: str,
    cloud: str,
    component_type: StackComponentType,
    configuration: Optional[Dict[str, Any]] = {},
    labels: Optional[Dict[str, Any]] = None,
) -> Optional["ComponentResponseModel"]:
    """Deploys a stack component.

    Args:
        name: The name of the deployed stack component.
        flavor: The flavor of the deployed stack component.
        cloud: The cloud of the deployed stack component.
        component_type: The type of the stack component to deploy.
        configuration: The configuration of the deployed stack component.
        labels: The labels of the deployed stack component.

    Returns:
        The deployed stack component.
    """
    STACK_COMPONENT_RECIPE_DIR = "deployed_stack_components"

    if component_type.value not in [
        "artifact_store",
        "container_registry",
        "secrets_manager",
    ]:
        enabled_services = [f"{component_type.value}_{flavor}"]
    else:
        enabled_services = [f"{component_type.value}"]

    # path should be fixed at a constant in the
    # global config directory
    path = Path(
        os.path.join(
            io_utils.get_global_config_directory(),
            STACK_COMPONENT_RECIPE_DIR,
            f"{cloud}-modular",
        )
    )

    with event_handler(
        event=AnalyticsEvent.DEPLOY_STACK_COMPONENT,
        v2=True,
    ) as handler:
        handler.metadata.update({component_type.value: flavor})

        import python_terraform

        from zenml.recipes import (
            StackRecipeService,
            StackRecipeServiceConfig,
        )

        # create the stack recipe service.
        stack_recipe_service_config = StackRecipeServiceConfig(
            directory_path=str(path),
            enabled_services=enabled_services,
            input_variables=configuration,
        )

        stack_recipe_service = StackRecipeService.get_service(str(path))

        if stack_recipe_service:
            logger.info(
                "An existing deployment of the recipe found. "
                f"with path {path}. "
                "Proceeding to update or create resources. "
            )
        else:
            stack_recipe_service = StackRecipeService(
                config=stack_recipe_service_config,
                stack_recipe_name=f"{cloud}-modular",
            )

        try:
            # start the service (the init and apply operation)
            stack_recipe_service.start()

        except python_terraform.TerraformCommandError:
            logger.error(
                "Deployment of the stack component failed or was "
                "interrupted. "
            )
            return None

        # get the outputs from the deployed recipe
        outputs = stack_recipe_service.get_outputs()
        outputs = {k: v for k, v in outputs.items() if v != ""}

        # get all outputs that start with the component type into a map
        comp_outputs = {
            k: v
            for k, v in outputs.items()
            if k.startswith(component_type.value)
        }

        logger.info(
            "Registering a new stack component of type %s with name '%s'.",
            component_type,
            name or comp_outputs[f"{component_type.value}_name"],
        )

        # call the register stack component function using the values of the outputs
        # truncate the component type from the output
        stack_comp = self.create_stack_component(
            name=name or comp_outputs[f"{component_type.value}_name"],
            flavor=comp_outputs[f"{component_type.value}_flavor"],
            component_type=component_type,
            configuration=eval(
                comp_outputs[f"{component_type.value}_configuration"]
            ),
            labels=labels,
        )

        # if the component is an experiment tracker of flavor mlflow, then
        # output the name of the mlflow bucket if it exists
        if (
            component_type == StackComponentType.EXPERIMENT_TRACKER
            and flavor == "mlflow"
        ):
            mlflow_bucket = outputs.get("mlflow-bucket")
            if mlflow_bucket:
                logger.info(
                    "The bucket used for MLflow is: %s "
                    "You can use this bucket as an artifact store to "
                    "avoid having to create a new one.",
                    mlflow_bucket,
                )

        # if the cloud is k3d, then check the container registry
        # outputs. If they are set, then create one.
        if cloud == "k3d":
            container_registry_outputs = {
                k: v
                for k, v in outputs.items()
                if k.startswith("container_registry")
            }
            if container_registry_outputs:
                self.create_stack_component(
                    name=container_registry_outputs[
                        "container_registry_name"
                    ],
                    flavor=container_registry_outputs[
                        "container_registry_flavor"
                    ],
                    component_type=StackComponentType.CONTAINER_REGISTRY,
                    configuration=eval(
                        container_registry_outputs[
                            "container_registry_configuration"
                        ]
                    ),
                )

    return stack_comp

destroy_stack_component(self, component)

Destroys a stack component.

Parameters:

Name Type Description Default
component ComponentResponseModel

The stack component to destroy.

required

Returns:

Type Description
None

None

Source code in zenml/client.py
def destroy_stack_component(
    self,
    component: ComponentResponseModel,
) -> None:
    """Destroys a stack component.

    Args:
        component: The stack component to destroy.

    Returns:
        None
    """
    STACK_COMPONENT_RECIPE_DIR = "deployed_stack_components"

    if component.type.value not in [
        "artifact_store",
        "container_registry",
        "secrets_manager",
    ]:
        disabled_services = [f"{component.type.value}_{component.flavor}"]
    else:
        disabled_services = [f"{component.type.value}"]

    # assert that labels is not None
    assert component.labels is not None
    # path should be fixed at a constant in the
    # global config directory
    path = Path(
        os.path.join(
            io_utils.get_global_config_directory(),
            STACK_COMPONENT_RECIPE_DIR,
            f"{component.labels['cloud']}-modular",
        )
    )

    with event_handler(
        event=AnalyticsEvent.DESTROY_STACK_COMPONENT,
        v2=True,
    ) as handler:
        handler.metadata.update({component.type.value: component.flavor})

        import python_terraform

        from zenml.recipes import (
            StackRecipeService,
        )

        stack_recipe_service = StackRecipeService.get_service(str(path))

        if not stack_recipe_service:
            logger.error(
                f"No deployed {component.type.value} found with "
                f"flavor {component.flavor} and name {component.name}."
            )
            return None

        stack_recipe_service.config.disabled_services = disabled_services

        try:
            # start the service (the init and apply operation)
            stack_recipe_service.stop()

        except python_terraform.TerraformCommandError:
            logger.error(
                "Destruction of the stack component failed or was "
                "interrupted. "
            )
            return None

    logger.info(
        "Deregistering stack component %s...",
        component.name,
    )

    # call the delete stack component function
    self.delete_stack_component(
        name_id_or_prefix=component.name,
        component_type=component.type,
    )

find_repository(path=None, enable_warnings=False) staticmethod

Search for a ZenML repository directory.

Parameters:

Name Type Description Default
path Optional[pathlib.Path]

Optional path to look for the repository. If no path is given, this function tries to find the repository using the environment variable ZENML_REPOSITORY_PATH (if set) and recursively searching in the parent directories of the current working directory.

None
enable_warnings bool

If True, warnings are printed if the repository root cannot be found.

False

Returns:

Type Description
Optional[pathlib.Path]

Absolute path to a ZenML repository directory or None if no repository directory was found.

Source code in zenml/client.py
@staticmethod
def find_repository(
    path: Optional[Path] = None, enable_warnings: bool = False
) -> Optional[Path]:
    """Search for a ZenML repository directory.

    Args:
        path: Optional path to look for the repository. If no path is
            given, this function tries to find the repository using the
            environment variable `ZENML_REPOSITORY_PATH` (if set) and
            recursively searching in the parent directories of the current
            working directory.
        enable_warnings: If `True`, warnings are printed if the repository
            root cannot be found.

    Returns:
        Absolute path to a ZenML repository directory or None if no
        repository directory was found.
    """
    if not path:
        # try to get path from the environment variable
        env_var_path = os.getenv(ENV_ZENML_REPOSITORY_PATH)
        if env_var_path:
            path = Path(env_var_path)

    if path:
        # explicit path via parameter or environment variable, don't search
        # parent directories
        search_parent_directories = False
        warning_message = (
            f"Unable to find ZenML repository at path '{path}'. Make sure "
            f"to create a ZenML repository by calling `zenml init` when "
            f"specifying an explicit repository path in code or via the "
            f"environment variable '{ENV_ZENML_REPOSITORY_PATH}'."
        )
    else:
        # try to find the repository in the parent directories of the
        # current working directory
        path = Path.cwd()
        search_parent_directories = True
        warning_message = (
            f"Unable to find ZenML repository in your current working "
            f"directory ({path}) or any parent directories. If you "
            f"want to use an existing repository which is in a different "
            f"location, set the environment variable "
            f"'{ENV_ZENML_REPOSITORY_PATH}'. If you want to create a new "
            f"repository, run `zenml init`."
        )

    def _find_repository_helper(path_: Path) -> Optional[Path]:
        """Recursively search parent directories for a ZenML repository.

        Args:
            path_: The path to search.

        Returns:
            Absolute path to a ZenML repository directory or None if no
            repository directory was found.
        """
        if Client.is_repository_directory(path_):
            return path_

        if not search_parent_directories or io_utils.is_root(str(path_)):
            return None

        return _find_repository_helper(path_.parent)

    repository_path = _find_repository_helper(path)

    if repository_path:
        return repository_path.resolve()
    if enable_warnings:
        logger.warning(warning_message)
    return None

get_artifact(self, artifact_id)

Get an artifact by ID.

Parameters:

Name Type Description Default
artifact_id UUID

The ID of the artifact to get.

required

Returns:

Type Description
ArtifactResponseModel

The artifact.

Source code in zenml/client.py
def get_artifact(self, artifact_id: UUID) -> ArtifactResponseModel:
    """Get an artifact by ID.

    Args:
        artifact_id: The ID of the artifact to get.

    Returns:
        The artifact.
    """
    return self.zen_store.get_artifact(artifact_id)

get_build(self, id_or_prefix)

Get a build by id or prefix.

Parameters:

Name Type Description Default
id_or_prefix str

The id or id prefix of the build.

required

Returns:

Type Description
PipelineBuildResponseModel

The build.

Exceptions:

Type Description
KeyError

If no build was found for the given id or prefix.

ZenKeyError

If multiple builds were found that match the given id or prefix.

Source code in zenml/client.py
def get_build(self, id_or_prefix: str) -> PipelineBuildResponseModel:
    """Get a build by id or prefix.

    Args:
        id_or_prefix: The id or id prefix of the build.

    Returns:
        The build.

    Raises:
        KeyError: If no build was found for the given id or prefix.
        ZenKeyError: If multiple builds were found that match the given
            id or prefix.
    """
    from zenml.utils.uuid_utils import is_valid_uuid

    # First interpret as full UUID
    if is_valid_uuid(id_or_prefix):
        return self.zen_store.get_build(UUID(id_or_prefix))

    entity = self.list_builds(
        id=f"startswith:{id_or_prefix}",
    )

    # If only a single entity is found, return it.
    if entity.total == 1:
        return entity.items[0]

    # If no entity is found, raise an error.
    if entity.total == 0:
        raise KeyError(
            f"No builds have been found that have either an id or prefix "
            f"that matches the provided string '{id_or_prefix}'."
        )

    raise ZenKeyError(
        f"{entity.total} builds have been found that have "
        f"an ID that matches the provided "
        f"string '{id_or_prefix}':\n"
        f"{[entity.items]}.\n"
        f"Please use the id to uniquely identify "
        f"only one of the builds."
    )

get_code_repository(self, name_id_or_prefix, allow_name_prefix_match=True)

Get a code repository by name, id or prefix.

Parameters:

Name Type Description Default
name_id_or_prefix Union[str, uuid.UUID]

The name, ID or ID prefix of the code repository.

required
allow_name_prefix_match bool

If True, allow matching by name prefix.

True

Returns:

Type Description
CodeRepositoryResponseModel

The code repository.

Source code in zenml/client.py
def get_code_repository(
    self,
    name_id_or_prefix: Union[str, UUID],
    allow_name_prefix_match: bool = True,
) -> CodeRepositoryResponseModel:
    """Get a code repository by name, id or prefix.

    Args:
        name_id_or_prefix: The name, ID or ID prefix of the code repository.
        allow_name_prefix_match: If True, allow matching by name prefix.

    Returns:
        The code repository.
    """
    return self._get_entity_by_id_or_name_or_prefix(
        get_method=self.zen_store.get_code_repository,
        list_method=self.list_code_repositories,
        name_id_or_prefix=name_id_or_prefix,
        allow_name_prefix_match=allow_name_prefix_match,
    )

get_deployment(self, id_or_prefix)

Get a deployment by id or prefix.

Parameters:

Name Type Description Default
id_or_prefix str

The id or id prefix of the build.

required

Returns:

Type Description
PipelineDeploymentResponseModel

The deployment.

Exceptions:

Type Description
KeyError

If no deployment was found for the given id or prefix.

ZenKeyError

If multiple deployments were found that match the given id or prefix.

Source code in zenml/client.py
def get_deployment(
    self, id_or_prefix: str
) -> PipelineDeploymentResponseModel:
    """Get a deployment by id or prefix.

    Args:
        id_or_prefix: The id or id prefix of the build.

    Returns:
        The deployment.

    Raises:
        KeyError: If no deployment was found for the given id or prefix.
        ZenKeyError: If multiple deployments were found that match the given
            id or prefix.
    """
    from zenml.utils.uuid_utils import is_valid_uuid

    # First interpret as full UUID
    if is_valid_uuid(id_or_prefix):
        return self.zen_store.get_deployment(UUID(id_or_prefix))

    entity = self.list_deployments(
        id=f"startswith:{id_or_prefix}",
    )

    # If only a single entity is found, return it.
    if entity.total == 1:
        return entity.items[0]

    # If no entity is found, raise an error.
    if entity.total == 0:
        raise KeyError(
            f"No deployment have been found that have either an id or "
            f"prefix that matches the provided string '{id_or_prefix}'."
        )

    raise ZenKeyError(
        f"{entity.total} deployments have been found that have "
        f"an ID that matches the provided "
        f"string '{id_or_prefix}':\n"
        f"{[entity.items]}.\n"
        f"Please use the id to uniquely identify "
        f"only one of the deployments."
    )

get_flavor(self, name_id_or_prefix, allow_name_prefix_match=True)

Get a stack component flavor.

Parameters:

Name Type Description Default
name_id_or_prefix str

The name, ID or prefix to the id of the flavor to get.

required
allow_name_prefix_match bool

If True, allow matching by name prefix.

True

Returns:

Type Description
FlavorResponseModel

The stack component flavor.

Source code in zenml/client.py
def get_flavor(
    self,
    name_id_or_prefix: str,
    allow_name_prefix_match: bool = True,
) -> "FlavorResponseModel":
    """Get a stack component flavor.

    Args:
        name_id_or_prefix: The name, ID or prefix to the id of the flavor
            to get.
        allow_name_prefix_match: If True, allow matching by name prefix.

    Returns:
        The stack component flavor.
    """
    return self._get_entity_by_id_or_name_or_prefix(
        get_method=self.zen_store.get_flavor,
        list_method=self.list_flavors,
        name_id_or_prefix=name_id_or_prefix,
        allow_name_prefix_match=allow_name_prefix_match,
    )

get_flavor_by_name_and_type(self, name, component_type)

Fetches a registered flavor.

Parameters:

Name Type Description Default
component_type StackComponentType

The type of the component to fetch.

required
name str

The name of the flavor to fetch.

required

Returns:

Type Description
FlavorResponseModel

The registered flavor.

Exceptions:

Type Description
KeyError

If no flavor exists for the given type and name.

Source code in zenml/client.py
def get_flavor_by_name_and_type(
    self, name: str, component_type: "StackComponentType"
) -> "FlavorResponseModel":
    """Fetches a registered flavor.

    Args:
        component_type: The type of the component to fetch.
        name: The name of the flavor to fetch.

    Returns:
        The registered flavor.

    Raises:
        KeyError: If no flavor exists for the given type and name.
    """
    logger.debug(
        f"Fetching the flavor of type {component_type} with name {name}."
    )

    flavors = self.list_flavors(
        type=component_type,
        name=name,
    ).items

    if flavors:
        if len(flavors) > 1:
            raise KeyError(
                f"More than one flavor with name {name} and type "
                f"{component_type} exists."
            )

        return flavors[0]
    else:
        raise KeyError(
            f"No flavor with name '{name}' and type '{component_type}' "
            "exists."
        )

get_flavors_by_type(self, component_type)

Fetches the list of flavor for a stack component type.

Parameters:

Name Type Description Default
component_type StackComponentType

The type of the component to fetch.

required

Returns:

Type Description
Page[FlavorResponseModel]

The list of flavors.

Source code in zenml/client.py
def get_flavors_by_type(
    self, component_type: "StackComponentType"
) -> Page[FlavorResponseModel]:
    """Fetches the list of flavor for a stack component type.

    Args:
        component_type: The type of the component to fetch.

    Returns:
        The list of flavors.
    """
    logger.debug(f"Fetching the flavors of type {component_type}.")

    return self.list_flavors(
        type=component_type,
    )

get_instance() classmethod

Return the Client singleton instance.

Returns:

Type Description
Optional[Client]

The Client singleton instance or None, if the Client hasn't been initialized yet.

Source code in zenml/client.py
@classmethod
def get_instance(cls) -> Optional["Client"]:
    """Return the Client singleton instance.

    Returns:
        The Client singleton instance or None, if the Client hasn't
        been initialized yet.
    """
    return cls._global_client

get_pipeline(self, name_id_or_prefix, version=None)

Get a pipeline by name, id or prefix.

Parameters:

Name Type Description Default
name_id_or_prefix Union[str, uuid.UUID]

The name, ID or ID prefix of the pipeline.

required
version Optional[str]

The pipeline version. If not specified, the latest version is returned.

None

Returns:

Type Description
PipelineResponseModel

The pipeline.

Exceptions:

Type Description
KeyError

If no pipelines were found for the given ID/name and version.

ZenKeyError

If multiple pipelines match the ID prefix.

Source code in zenml/client.py
def get_pipeline(
    self,
    name_id_or_prefix: Union[str, UUID],
    version: Optional[str] = None,
) -> PipelineResponseModel:
    """Get a pipeline by name, id or prefix.

    Args:
        name_id_or_prefix: The name, ID or ID prefix of the pipeline.
        version: The pipeline version. If not specified, the latest
            version is returned.

    Returns:
        The pipeline.

    Raises:
        KeyError: If no pipelines were found for the given ID/name and
            version.
        ZenKeyError: If multiple pipelines match the ID prefix.
    """
    from zenml.utils.uuid_utils import is_valid_uuid

    if is_valid_uuid(name_id_or_prefix):
        if version:
            logger.warning(
                "You specified both an ID as well as a version of the "
                "pipeline. Ignoring the version and fetching the "
                "pipeline by ID."
            )
        if not isinstance(name_id_or_prefix, UUID):
            name_id_or_prefix = UUID(name_id_or_prefix, version=4)

        return self.zen_store.get_pipeline(name_id_or_prefix)

    assert not isinstance(name_id_or_prefix, UUID)
    exact_name_matches = self.list_pipelines(
        size=1,
        sort_by="desc:created",
        name=f"equals:{name_id_or_prefix}",
        version=version,
    )

    if len(exact_name_matches) == 1:
        # If the name matches exactly, use the explicitly specified version
        # or fallback to the latest if not given
        return exact_name_matches.items[0]

    partial_id_matches = self.list_pipelines(
        id=f"startswith:{name_id_or_prefix}"
    )
    if partial_id_matches.total == 1:
        if version:
            logger.warning(
                "You specified both an ID as well as a version of the "
                "pipeline. Ignoring the version and fetching the "
                "pipeline by ID."
            )
        return partial_id_matches[0]
    elif partial_id_matches.total == 0:
        raise KeyError(
            f"No pipelines found for name, ID or prefix "
            f"{name_id_or_prefix}."
        )
    else:
        raise ZenKeyError(
            f"{partial_id_matches.total} pipelines have been found that "
            "have an id prefix that matches the provided string "
            f"'{name_id_or_prefix}':\n"
            f"{partial_id_matches.items}.\n"
            f"Please provide more characters to uniquely identify "
            f"only one of the pipelines."
        )

get_pipeline_run(self, name_id_or_prefix, allow_name_prefix_match=True)

Gets a pipeline run by name, ID, or prefix.

Parameters:

Name Type Description Default
name_id_or_prefix Union[str, uuid.UUID]

Name, ID, or prefix of the pipeline run.

required
allow_name_prefix_match bool

If True, allow matching by name prefix.

True

Returns:

Type Description
PipelineRunResponseModel

The pipeline run.

Source code in zenml/client.py
def get_pipeline_run(
    self,
    name_id_or_prefix: Union[str, UUID],
    allow_name_prefix_match: bool = True,
) -> PipelineRunResponseModel:
    """Gets a pipeline run by name, ID, or prefix.

    Args:
        name_id_or_prefix: Name, ID, or prefix of the pipeline run.
        allow_name_prefix_match: If True, allow matching by name prefix.

    Returns:
        The pipeline run.
    """
    return self._get_entity_by_id_or_name_or_prefix(
        get_method=self.zen_store.get_run,
        list_method=self.list_pipeline_runs,
        name_id_or_prefix=name_id_or_prefix,
        allow_name_prefix_match=allow_name_prefix_match,
    )

get_role(self, name_id_or_prefix, allow_name_prefix_match=True)

Gets a role.

Parameters:

Name Type Description Default
name_id_or_prefix Union[str, uuid.UUID]

The name or ID of the role.

required
allow_name_prefix_match bool

If True, allow matching by name prefix.

True

Returns:

Type Description
RoleResponseModel

The fetched role.

Source code in zenml/client.py
def get_role(
    self,
    name_id_or_prefix: Union[str, UUID],
    allow_name_prefix_match: bool = True,
) -> RoleResponseModel:
    """Gets a role.

    Args:
        name_id_or_prefix: The name or ID of the role.
        allow_name_prefix_match: If True, allow matching by name prefix.

    Returns:
        The fetched role.
    """
    return self._get_entity_by_id_or_name_or_prefix(
        get_method=self.zen_store.get_role,
        list_method=self.list_roles,
        name_id_or_prefix=name_id_or_prefix,
        allow_name_prefix_match=allow_name_prefix_match,
    )

get_run_step(self, step_run_id)

Get a step run by ID.

Parameters:

Name Type Description Default
step_run_id UUID

The ID of the step run to get.

required

Returns:

Type Description
StepRunResponseModel

The step run.

Source code in zenml/client.py
def get_run_step(self, step_run_id: UUID) -> StepRunResponseModel:
    """Get a step run by ID.

    Args:
        step_run_id: The ID of the step run to get.

    Returns:
        The step run.
    """
    return self.zen_store.get_run_step(step_run_id)

get_schedule(self, name_id_or_prefix, allow_name_prefix_match=True)

Get a schedule by name, id or prefix.

Parameters:

Name Type Description Default
name_id_or_prefix Union[str, uuid.UUID]

The name, id or prefix of the schedule.

required
allow_name_prefix_match bool

If True, allow matching by name prefix.

True

Returns:

Type Description
ScheduleResponseModel

The schedule.

Source code in zenml/client.py
def get_schedule(
    self,
    name_id_or_prefix: Union[str, UUID],
    allow_name_prefix_match: bool = True,
) -> ScheduleResponseModel:
    """Get a schedule by name, id or prefix.

    Args:
        name_id_or_prefix: The name, id or prefix of the schedule.
        allow_name_prefix_match: If True, allow matching by name prefix.

    Returns:
        The schedule.
    """
    return self._get_entity_by_id_or_name_or_prefix(
        get_method=self.zen_store.get_schedule,
        list_method=self.list_schedules,
        name_id_or_prefix=name_id_or_prefix,
        allow_name_prefix_match=allow_name_prefix_match,
    )

get_secret(self, name_id_or_prefix, scope=None, allow_partial_name_match=True, allow_partial_id_match=True)

Get a secret.

Get a secret identified by a name, ID or prefix of the name or ID and optionally a scope.

If a scope is not provided, the secret will be searched for in all scopes starting with the innermost scope (user) to the outermost scope (workspace). When a name or prefix is used instead of a UUID value, each scope is first searched for an exact match, then for a ID prefix or name substring match before moving on to the next scope.

Parameters:

Name Type Description Default
name_id_or_prefix Union[str, uuid.UUID]

The name, ID or prefix to the id of the secret to get.

required
scope Optional[zenml.enums.SecretScope]

The scope of the secret. If not set, all scopes will be searched starting with the innermost scope (user) to the outermost scope (global) until a secret is found.

None
allow_partial_name_match bool

If True, allow partial name matches.

True
allow_partial_id_match bool

If True, allow partial ID matches.

True

Returns:

Type Description
SecretResponseModel

The secret.

Exceptions:

Type Description
KeyError

If no secret is found.

ZenKeyError

If multiple secrets are found.

NotImplementedError

If centralized secrets management is not enabled.

Source code in zenml/client.py
def get_secret(
    self,
    name_id_or_prefix: Union[str, UUID],
    scope: Optional[SecretScope] = None,
    allow_partial_name_match: bool = True,
    allow_partial_id_match: bool = True,
) -> "SecretResponseModel":
    """Get a secret.

    Get a secret identified by a name, ID or prefix of the name or ID and
    optionally a scope.

    If a scope is not provided, the secret will be searched for in all
    scopes starting with the innermost scope (user) to the outermost scope
    (workspace). When a name or prefix is used instead of a UUID value, each
    scope is first searched for an exact match, then for a ID prefix or
    name substring match before moving on to the next scope.

    Args:
        name_id_or_prefix: The name, ID or prefix to the id of the secret
            to get.
        scope: The scope of the secret. If not set, all scopes will be
            searched starting with the innermost scope (user) to the
            outermost scope (global) until a secret is found.
        allow_partial_name_match: If True, allow partial name matches.
        allow_partial_id_match: If True, allow partial ID matches.

    Returns:
        The secret.

    Raises:
        KeyError: If no secret is found.
        ZenKeyError: If multiple secrets are found.
        NotImplementedError: If centralized secrets management is not
            enabled.
    """
    from zenml.utils.uuid_utils import is_valid_uuid

    try:
        # First interpret as full UUID
        if is_valid_uuid(name_id_or_prefix):
            # Fetch by ID; filter by scope if provided
            secret = self.zen_store.get_secret(
                secret_id=UUID(name_id_or_prefix)
                if isinstance(name_id_or_prefix, str)
                else name_id_or_prefix
            )
            if scope is not None and secret.scope != scope:
                raise KeyError(
                    f"No secret found with ID {str(name_id_or_prefix)}"
                )

            return secret
    except NotImplementedError:
        raise NotImplementedError(
            "centralized secrets management is not supported or explicitly "
            "disabled in the target ZenML deployment."
        )

    # If not a UUID, try to find by name and then by prefix
    assert not isinstance(name_id_or_prefix, UUID)

    # Scopes to search in order of priority
    search_scopes = (
        [SecretScope.USER, SecretScope.WORKSPACE]
        if scope is None
        else [scope]
    )

    secrets = self.list_secrets(
        logical_operator=LogicalOperators.OR,
        name=f"contains:{name_id_or_prefix}"
        if allow_partial_name_match
        else f"equals:{name_id_or_prefix}",
        id=f"startswith:{name_id_or_prefix}"
        if allow_partial_id_match
        else None,
    )

    for search_scope in search_scopes:
        partial_matches: List[SecretResponseModel] = []
        for secret in secrets.items:
            if secret.scope != search_scope:
                continue
            # Exact match
            if secret.name == name_id_or_prefix:
                # Need to fetch the secret again to get the secret values
                return self.zen_store.get_secret(secret_id=secret.id)
            # Partial match
            partial_matches.append(secret)

        if len(partial_matches) > 1:
            match_summary = "\n".join(
                [
                    f"[{secret.id}]: name = {secret.name}"
                    for secret in partial_matches
                ]
            )
            raise ZenKeyError(
                f"{len(partial_matches)} secrets have been found that have "
                f"a name or ID that matches the provided "
                f"string '{name_id_or_prefix}':\n"
                f"{match_summary}.\n"
                f"Please use the id to uniquely identify "
                f"only one of the secrets."
            )

        # If only a single secret is found, return it
        if len(partial_matches) == 1:
            # Need to fetch the secret again to get the secret values
            return self.zen_store.get_secret(
                secret_id=partial_matches[0].id
            )

    msg = (
        f"No secret found with name, ID or prefix "
        f"'{name_id_or_prefix}'"
    )
    if scope is not None:
        msg += f" in scope '{scope}'"

    raise KeyError(msg)

get_secret_by_name_and_scope(self, name, scope=None)

Fetches a registered secret with a given name and optional scope.

This is a version of get_secret that restricts the search to a given name and an optional scope, without doing any prefix or UUID matching.

If no scope is provided, the search will be done first in the user scope, then in the workspace scope.

Parameters:

Name Type Description Default
name str

The name of the secret to get.

required
scope Optional[zenml.enums.SecretScope]

The scope of the secret to get.

None

Returns:

Type Description
SecretResponseModel

The registered secret.

Exceptions:

Type Description
KeyError

If no secret exists for the given name in the given scope.

Source code in zenml/client.py
def get_secret_by_name_and_scope(
    self, name: str, scope: Optional[SecretScope] = None
) -> "SecretResponseModel":
    """Fetches a registered secret with a given name and optional scope.

    This is a version of get_secret that restricts the search to a given
    name and an optional scope, without doing any prefix or UUID matching.

    If no scope is provided, the search will be done first in the user
    scope, then in the workspace scope.

    Args:
        name: The name of the secret to get.
        scope: The scope of the secret to get.

    Returns:
        The registered secret.

    Raises:
        KeyError: If no secret exists for the given name in the given scope.
    """
    logger.debug(
        f"Fetching the secret with name '{name}' and scope '{scope}'."
    )

    # Scopes to search in order of priority
    search_scopes = (
        [SecretScope.USER, SecretScope.WORKSPACE]
        if scope is None
        else [scope]
    )

    for search_scope in search_scopes:
        secrets = self.list_secrets(
            logical_operator=LogicalOperators.AND,
            name=f"equals:{name}",
            scope=search_scope,
        )

        if len(secrets.items) >= 1:
            # Need to fetch the secret again to get the secret values
            return self.zen_store.get_secret(secret_id=secrets.items[0].id)

    msg = f"No secret with name '{name}' was found"
    if scope is not None:
        msg += f" in scope '{scope.value}'"

    raise KeyError(msg)

get_service_connector(self, name_id_or_prefix, allow_name_prefix_match=True, load_secrets=False)

Fetches a registered service connector.

Parameters:

Name Type Description Default
name_id_or_prefix Union[str, uuid.UUID]

The id of the service connector to fetch.

required
allow_name_prefix_match bool

If True, allow matching by name prefix.

True
load_secrets bool

If True, load the secrets for the service connector.

False

Returns:

Type Description
ServiceConnectorResponseModel

The registered service connector.

Source code in zenml/client.py
def get_service_connector(
    self,
    name_id_or_prefix: Union[str, UUID],
    allow_name_prefix_match: bool = True,
    load_secrets: bool = False,
) -> "ServiceConnectorResponseModel":
    """Fetches a registered service connector.

    Args:
        name_id_or_prefix: The id of the service connector to fetch.
        allow_name_prefix_match: If True, allow matching by name prefix.
        load_secrets: If True, load the secrets for the service connector.

    Returns:
        The registered service connector.
    """

    def scoped_list_method(
        **kwargs: Any,
    ) -> Page[ServiceConnectorResponseModel]:
        """Call `zen_store.list_service_connectors` with workspace scoping.

        Args:
            **kwargs: Keyword arguments to pass to
                `ServiceConnectorFilterModel`.

        Returns:
            The list of service connectors.
        """
        filter_model = ServiceConnectorFilterModel(**kwargs)
        filter_model.set_scope_workspace(self.active_workspace.id)
        return self.zen_store.list_service_connectors(
            filter_model=filter_model,
        )

    connector = self._get_entity_by_id_or_name_or_prefix(
        get_method=self.zen_store.get_service_connector,
        list_method=scoped_list_method,
        name_id_or_prefix=name_id_or_prefix,
        allow_name_prefix_match=allow_name_prefix_match,
    )

    if load_secrets and connector.secret_id:
        client = Client()
        try:
            secret = client.get_secret(
                name_id_or_prefix=connector.secret_id,
                allow_partial_id_match=False,
                allow_partial_name_match=False,
            )
        except KeyError as err:
            logger.error(
                "Unable to retrieve secret values associated with "
                f"service connector '{connector.name}': {err}"
            )
        else:
            # Add secret values to connector configuration
            connector.secrets.update(secret.values)

    return connector

get_service_connector_client(self, name_id_or_prefix, resource_type=None, resource_id=None)

Get the client side of a service connector instance to use with a local client.

Parameters:

Name Type Description Default
name_id_or_prefix Union[uuid.UUID, str]

The name, id or prefix of the service connector to use.

required
resource_type Optional[str]

The type of the resource to connect to. If not provided, the resource type from the service connector configuration will be used.

None
resource_id Optional[str]

The ID of a particular resource instance to configure the local client to connect to. If the connector instance is already configured with a resource ID that is not the same or equivalent to the one requested, a ValueError exception is raised. May be omitted for connectors and resource types that do not support multiple resource instances.

None

Returns:

Type Description
ServiceConnector

The client side of the indicated service connector instance that can be used to connect to the resource locally.

Source code in zenml/client.py
def get_service_connector_client(
    self,
    name_id_or_prefix: Union[UUID, str],
    resource_type: Optional[str] = None,
    resource_id: Optional[str] = None,
) -> "ServiceConnector":
    """Get the client side of a service connector instance to use with a local client.

    Args:
        name_id_or_prefix: The name, id or prefix of the service connector
            to use.
        resource_type: The type of the resource to connect to. If not
            provided, the resource type from the service connector
            configuration will be used.
        resource_id: The ID of a particular resource instance to configure
            the local client to connect to. If the connector instance is
            already configured with a resource ID that is not the same or
            equivalent to the one requested, a `ValueError` exception is
            raised. May be omitted for connectors and resource types that do
            not support multiple resource instances.

    Returns:
        The client side of the indicated service connector instance that can
        be used to connect to the resource locally.
    """
    from zenml.service_connectors.service_connector_registry import (
        service_connector_registry,
    )

    # Get the service connector model
    service_connector = self.get_service_connector(
        name_id_or_prefix=name_id_or_prefix,
        allow_name_prefix_match=False,
    )

    connector_type = self.get_service_connector_type(
        service_connector.type
    )

    # Prefer to fetch the connector client from the server if the
    # implementation if available there, because some auth methods rely on
    # the server-side authentication environment
    if connector_type.remote:
        connector_client_model = (
            self.zen_store.get_service_connector_client(
                service_connector_id=service_connector.id,
                resource_type=resource_type,
                resource_id=resource_id,
            )
        )

        connector_client = (
            service_connector_registry.instantiate_connector(
                model=connector_client_model
            )
        )

        # Verify the connector client on the local machine, because the
        # server-side implementation may not be able to do so
        connector_client.verify()
    else:
        connector_instance = (
            service_connector_registry.instantiate_connector(
                model=service_connector
            )
        )

        # Fetch the connector client
        connector_client = connector_instance.get_connector_client(
            resource_type=resource_type,
            resource_id=resource_id,
        )

    return connector_client

get_service_connector_type(self, connector_type)

Returns the requested service connector type.

Parameters:

Name Type Description Default
connector_type str

the service connector type identifier.

required

Returns:

Type Description
ServiceConnectorTypeModel

The requested service connector type.

Source code in zenml/client.py
def get_service_connector_type(
    self,
    connector_type: str,
) -> ServiceConnectorTypeModel:
    """Returns the requested service connector type.

    Args:
        connector_type: the service connector type identifier.

    Returns:
        The requested service connector type.
    """
    return self.zen_store.get_service_connector_type(
        connector_type=connector_type,
    )

get_stack(self, name_id_or_prefix=None, allow_name_prefix_match=True)

Get a stack by name, ID or prefix.

If no name, ID or prefix is provided, the active stack is returned.

Parameters:

Name Type Description Default
name_id_or_prefix Union[uuid.UUID, str]

The name, ID or prefix of the stack.

None
allow_name_prefix_match bool

If True, allow matching by name prefix.

True

Returns:

Type Description
StackResponseModel

The stack.

Source code in zenml/client.py
def get_stack(
    self,
    name_id_or_prefix: Optional[Union[UUID, str]] = None,
    allow_name_prefix_match: bool = True,
) -> "StackResponseModel":
    """Get a stack by name, ID or prefix.

    If no name, ID or prefix is provided, the active stack is returned.

    Args:
        name_id_or_prefix: The name, ID or prefix of the stack.
        allow_name_prefix_match: If True, allow matching by name prefix.

    Returns:
        The stack.
    """
    if name_id_or_prefix is not None:
        return self._get_entity_by_id_or_name_or_prefix(
            get_method=self.zen_store.get_stack,
            list_method=self.list_stacks,
            name_id_or_prefix=name_id_or_prefix,
            allow_name_prefix_match=allow_name_prefix_match,
        )
    else:
        return self.active_stack_model

get_stack_component(self, component_type, name_id_or_prefix=None, allow_name_prefix_match=True)

Fetches a registered stack component.

If the name_id_or_prefix is provided, it will try to fetch the component with the corresponding identifier. If not, it will try to fetch the active component of the given type.

Parameters:

Name Type Description Default
component_type StackComponentType

The type of the component to fetch

required
name_id_or_prefix Union[uuid.UUID, str]

The id of the component to fetch.

None
allow_name_prefix_match bool

If True, allow matching by name prefix.

True

Returns:

Type Description
ComponentResponseModel

The registered stack component.

Exceptions:

Type Description
KeyError

If no name_id_or_prefix is provided and no such component is part of the active stack.

Source code in zenml/client.py
def get_stack_component(
    self,
    component_type: StackComponentType,
    name_id_or_prefix: Optional[Union[str, UUID]] = None,
    allow_name_prefix_match: bool = True,
) -> "ComponentResponseModel":
    """Fetches a registered stack component.

    If the name_id_or_prefix is provided, it will try to fetch the component
    with the corresponding identifier. If not, it will try to fetch the
    active component of the given type.

    Args:
        component_type: The type of the component to fetch
        name_id_or_prefix: The id of the component to fetch.
        allow_name_prefix_match: If True, allow matching by name prefix.

    Returns:
        The registered stack component.

    Raises:
        KeyError: If no name_id_or_prefix is provided and no such component
            is part of the active stack.
    """
    # If no `name_id_or_prefix` provided, try to get the active component.
    if not name_id_or_prefix:
        components = self.active_stack_model.components.get(
            component_type, None
        )
        if components:
            return components[0]
        raise KeyError(
            "No name_id_or_prefix provided and there is no active "
            f"{component_type} in the current active stack."
        )

    # Else, try to fetch the component with an explicit type filter
    def type_scoped_list_method(
        **kwargs: Any,
    ) -> Page[ComponentResponseModel]:
        """Call `zen_store.list_stack_components` with type scoping.

        Args:
            **kwargs: Keyword arguments to pass to `ComponentFilterModel`.

        Returns:
            The type-scoped list of components.
        """
        component_filter_model = ComponentFilterModel(**kwargs)
        component_filter_model.set_scope_type(
            component_type=component_type
        )
        component_filter_model.set_scope_workspace(
            self.active_workspace.id
        )
        return self.zen_store.list_stack_components(
            component_filter_model=component_filter_model,
        )

    return self._get_entity_by_id_or_name_or_prefix(
        get_method=self.zen_store.get_stack_component,
        list_method=type_scoped_list_method,
        name_id_or_prefix=name_id_or_prefix,
        allow_name_prefix_match=allow_name_prefix_match,
    )

get_team(self, name_id_or_prefix, allow_name_prefix_match=True)

Gets a team.

Parameters:

Name Type Description Default
name_id_or_prefix Union[str, uuid.UUID]

The name or ID of the team.

required
allow_name_prefix_match bool

If True, allow matching by name prefix.

True

Returns:

Type Description
TeamResponseModel

The Team

Source code in zenml/client.py
def get_team(
    self,
    name_id_or_prefix: Union[str, UUID],
    allow_name_prefix_match: bool = True,
) -> TeamResponseModel:
    """Gets a team.

    Args:
        name_id_or_prefix: The name or ID of the team.
        allow_name_prefix_match: If True, allow matching by name prefix.

    Returns:
        The Team
    """
    return self._get_entity_by_id_or_name_or_prefix(
        get_method=self.zen_store.get_team,
        list_method=self.list_teams,
        name_id_or_prefix=name_id_or_prefix,
        allow_name_prefix_match=allow_name_prefix_match,
    )

get_team_role_assignment(self, team_role_assignment_id)

Get a role assignment.

Parameters:

Name Type Description Default
team_role_assignment_id UUID

The id of the role assignments

required

Returns:

Type Description
TeamRoleAssignmentResponseModel

The role assignment.

Source code in zenml/client.py
def get_team_role_assignment(
    self, team_role_assignment_id: UUID
) -> TeamRoleAssignmentResponseModel:
    """Get a role assignment.

    Args:
        team_role_assignment_id: The id of the role assignments

    Returns:
        The role assignment.
    """
    return self.zen_store.get_team_role_assignment(
        team_role_assignment_id=team_role_assignment_id
    )

get_user(self, name_id_or_prefix, allow_name_prefix_match=True)

Gets a user.

Parameters:

Name Type Description Default
name_id_or_prefix Union[str, uuid.UUID]

The name or ID of the user.

required
allow_name_prefix_match bool

If True, allow matching by name prefix.

True

Returns:

Type Description
UserResponseModel

The User

Source code in zenml/client.py
def get_user(
    self,
    name_id_or_prefix: Union[str, UUID],
    allow_name_prefix_match: bool = True,
) -> UserResponseModel:
    """Gets a user.

    Args:
        name_id_or_prefix: The name or ID of the user.
        allow_name_prefix_match: If True, allow matching by name prefix.

    Returns:
        The User
    """
    return self._get_entity_by_id_or_name_or_prefix(
        get_method=self.zen_store.get_user,
        list_method=self.list_users,
        name_id_or_prefix=name_id_or_prefix,
        allow_name_prefix_match=allow_name_prefix_match,
    )

get_user_role_assignment(self, role_assignment_id)

Get a role assignment.

Parameters:

Name Type Description Default
role_assignment_id UUID

The id of the role assignments

required

Returns:

Type Description
UserRoleAssignmentResponseModel

The role assignment.

Source code in zenml/client.py
def get_user_role_assignment(
    self, role_assignment_id: UUID
) -> UserRoleAssignmentResponseModel:
    """Get a role assignment.

    Args:
        role_assignment_id: The id of the role assignments

    Returns:
        The role assignment.
    """
    return self.zen_store.get_user_role_assignment(
        user_role_assignment_id=role_assignment_id
    )

get_workspace(self, name_id_or_prefix, allow_name_prefix_match=True)

Gets a workspace.

Parameters:

Name Type Description Default
name_id_or_prefix Union[uuid.UUID, str]

The name or ID of the workspace.

required
allow_name_prefix_match bool

If True, allow matching by name prefix.

True

Returns:

Type Description
WorkspaceResponseModel

The workspace

Source code in zenml/client.py
def get_workspace(
    self,
    name_id_or_prefix: Optional[Union[UUID, str]],
    allow_name_prefix_match: bool = True,
) -> WorkspaceResponseModel:
    """Gets a workspace.

    Args:
        name_id_or_prefix: The name or ID of the workspace.
        allow_name_prefix_match: If True, allow matching by name prefix.

    Returns:
        The workspace
    """
    if not name_id_or_prefix:
        return self.active_workspace
    return self._get_entity_by_id_or_name_or_prefix(
        get_method=self.zen_store.get_workspace,
        list_method=self.list_workspaces,
        name_id_or_prefix=name_id_or_prefix,
        allow_name_prefix_match=allow_name_prefix_match,
    )

initialize(root=None) staticmethod

Initializes a new ZenML repository at the given path.

Parameters:

Name Type Description Default
root Optional[pathlib.Path]

The root directory where the repository should be created. If None, the current working directory is used.

None

Exceptions:

Type Description
InitializationException

If the root directory already contains a ZenML repository.

Source code in zenml/client.py
@staticmethod
def initialize(
    root: Optional[Path] = None,
) -> None:
    """Initializes a new ZenML repository at the given path.

    Args:
        root: The root directory where the repository should be created.
            If None, the current working directory is used.

    Raises:
        InitializationException: If the root directory already contains a
            ZenML repository.
    """
    with event_handler(AnalyticsEvent.INITIALIZE_REPO):
        root = root or Path.cwd()
        logger.debug("Initializing new repository at path %s.", root)
        if Client.is_repository_directory(root):
            raise InitializationException(
                f"Found existing ZenML repository at path '{root}'."
            )

        config_directory = str(root / REPOSITORY_DIRECTORY_NAME)
        io_utils.create_dir_recursive_if_not_exists(config_directory)
        # Initialize the repository configuration at the custom path
        Client(root=root)

is_inside_repository(file_path) staticmethod

Returns whether a file is inside the active ZenML repository.

Parameters:

Name Type Description Default
file_path str

A file path.

required

Returns:

Type Description
bool

True if the file is inside the active ZenML repository, False otherwise.

Source code in zenml/client.py
@staticmethod
def is_inside_repository(file_path: str) -> bool:
    """Returns whether a file is inside the active ZenML repository.

    Args:
        file_path: A file path.

    Returns:
        True if the file is inside the active ZenML repository, False
        otherwise.
    """
    repo_path = Client.find_repository()
    if not repo_path:
        return False

    return repo_path in Path(file_path).resolve().parents

is_repository_directory(path) staticmethod

Checks whether a ZenML client exists at the given path.

Parameters:

Name Type Description Default
path Path

The path to check.

required

Returns:

Type Description
bool

True if a ZenML client exists at the given path, False otherwise.

Source code in zenml/client.py
@staticmethod
def is_repository_directory(path: Path) -> bool:
    """Checks whether a ZenML client exists at the given path.

    Args:
        path: The path to check.

    Returns:
        True if a ZenML client exists at the given path,
        False otherwise.
    """
    config_dir = path / REPOSITORY_DIRECTORY_NAME
    return fileio.isdir(str(config_dir))

list_artifacts(self, sort_by='created', page=1, size=50, logical_operator=<LogicalOperators.AND: 'and'>, id=None, created=None, updated=None, name=None, artifact_store_id=None, type=None, data_type=None, uri=None, materializer=None, workspace_id=None, user_id=None, only_unused=False)

Get all artifacts.

Parameters:

Name Type Description Default
sort_by str

The column to sort by

'created'
page int

The page of items

1
size int

The maximum size of all pages

50
logical_operator LogicalOperators

Which logical operator to use [and, or]

<LogicalOperators.AND: 'and'>
id Union[uuid.UUID, str]

Use the id of runs to filter by.

None
created Union[datetime.datetime, str]

Use to filter by time of creation

None
updated Union[datetime.datetime, str]

Use the last updated date for filtering

None
name Optional[str]

The name of the run to filter by.

None
artifact_store_id Union[uuid.UUID, str]

The id of the artifact store to filter by.

None
type Optional[zenml.enums.ArtifactType]

The type of the artifact to filter by.

None
data_type Optional[str]

The data type of the artifact to filter by.

None
uri Optional[str]

The uri of the artifact to filter by.

None
materializer Optional[str]

The materializer of the artifact to filter by.

None
workspace_id Union[uuid.UUID, str]

The id of the workspace to filter by.

None
user_id Union[uuid.UUID, str]

The id of the user to filter by.

None
only_unused Optional[bool]

Only return artifacts that are not used in any runs.

False

Returns:

Type Description
Page[ArtifactResponseModel]

A list of artifacts.

Source code in zenml/client.py
def list_artifacts(
    self,
    sort_by: str = "created",
    page: int = PAGINATION_STARTING_PAGE,
    size: int = PAGE_SIZE_DEFAULT,
    logical_operator: LogicalOperators = LogicalOperators.AND,
    id: Optional[Union[UUID, str]] = None,
    created: Optional[Union[datetime, str]] = None,
    updated: Optional[Union[datetime, str]] = None,
    name: Optional[str] = None,
    artifact_store_id: Optional[Union[str, UUID]] = None,
    type: Optional[ArtifactType] = None,
    data_type: Optional[str] = None,
    uri: Optional[str] = None,
    materializer: Optional[str] = None,
    workspace_id: Optional[Union[str, UUID]] = None,
    user_id: Optional[Union[str, UUID]] = None,
    only_unused: Optional[bool] = False,
) -> Page[ArtifactResponseModel]:
    """Get all artifacts.

    Args:
        sort_by: The column to sort by
        page: The page of items
        size: The maximum size of all pages
        logical_operator: Which logical operator to use [and, or]
        id: Use the id of runs to filter by.
        created: Use to filter by time of creation
        updated: Use the last updated date for filtering
        name: The name of the run to filter by.
        artifact_store_id: The id of the artifact store to filter by.
        type: The type of the artifact to filter by.
        data_type: The data type of the artifact to filter by.
        uri: The uri of the artifact to filter by.
        materializer: The materializer of the artifact to filter by.
        workspace_id: The id of the workspace to filter by.
        user_id: The  id of the user to filter by.
        only_unused: Only return artifacts that are not used in any runs.

    Returns:
        A list of artifacts.
    """
    artifact_filter_model = ArtifactFilterModel(
        sort_by=sort_by,
        page=page,
        size=size,
        logical_operator=logical_operator,
        id=id,
        created=created,
        updated=updated,
        name=name,
        artifact_store_id=artifact_store_id,
        type=type,
        data_type=data_type,
        uri=uri,
        materializer=materializer,
        workspace_id=workspace_id,
        user_id=user_id,
        only_unused=only_unused,
    )
    artifact_filter_model.set_scope_workspace(self.active_workspace.id)
    return self.zen_store.list_artifacts(artifact_filter_model)

list_builds(self, sort_by='created', page=1, size=50, logical_operator