Skip to content

Zen Stores

zenml.zen_stores special

ZenStores define ways to store ZenML relevant data locally or remotely.

base_zen_store

Base Zen Store implementation.

BaseZenStore (BaseModel, ZenStoreInterface, SecretsStoreInterface, AnalyticsTrackerMixin, ABC) pydantic-model

Base class for accessing and persisting ZenML core objects.

Attributes:

Name Type Description
config StoreConfiguration

The configuration of the store.

track_analytics bool

Only send analytics if set to True.

secrets_store

The secrets store to use for storing sensitive data.

Source code in zenml/zen_stores/base_zen_store.py
class BaseZenStore(
    BaseModel,
    ZenStoreInterface,
    SecretsStoreInterface,
    AnalyticsTrackerMixin,
    ABC,
):
    """Base class for accessing and persisting ZenML core objects.

    Attributes:
        config: The configuration of the store.
        track_analytics: Only send analytics if set to `True`.
        secrets_store: The secrets store to use for storing sensitive data.
    """

    config: StoreConfiguration
    track_analytics: bool = True
    _secrets_store: Optional[BaseSecretsStore] = None
    _event_handlers: Dict[StoreEvent, List[Callable[..., Any]]] = {}

    TYPE: ClassVar[StoreType]
    CONFIG_TYPE: ClassVar[Type[StoreConfiguration]]

    # ---------------------------------
    # Initialization and configuration
    # ---------------------------------

    def __init__(
        self,
        skip_default_registrations: bool = False,
        **kwargs: Any,
    ) -> None:
        """Create and initialize a store.

        Args:
            skip_default_registrations: If `True`, the creation of the default
                stack and user in the store will be skipped.
            **kwargs: Additional keyword arguments to pass to the Pydantic
                constructor.

        Raises:
            RuntimeError: If the store cannot be initialized.
        """
        super().__init__(**kwargs)

        try:
            self._initialize()
        except Exception as e:
            raise RuntimeError(
                f"Error initializing {self.type.value} store with URL "
                f"'{self.url}': {str(e)}"
            ) from e

        if not skip_default_registrations:
            logger.debug("Initializing database")
            self._initialize_database()
        else:
            logger.debug("Skipping database initialization")

    @staticmethod
    def get_store_class(store_type: StoreType) -> Type["BaseZenStore"]:
        """Returns the class of the given store type.

        Args:
            store_type: The type of the store to get the class for.

        Returns:
            The class of the given store type or None if the type is unknown.

        Raises:
            TypeError: If the store type is unsupported.
        """
        if store_type == StoreType.SQL:
            from zenml.zen_stores.sql_zen_store import SqlZenStore

            return SqlZenStore
        elif store_type == StoreType.REST:
            from zenml.zen_stores.rest_zen_store import RestZenStore

            return RestZenStore
        else:
            raise TypeError(
                f"No store implementation found for store type "
                f"`{store_type.value}`."
            )

    @staticmethod
    def get_store_config_class(
        store_type: StoreType,
    ) -> Type["StoreConfiguration"]:
        """Returns the store config class of the given store type.

        Args:
            store_type: The type of the store to get the class for.

        Returns:
            The config class of the given store type.
        """
        store_class = BaseZenStore.get_store_class(store_type)
        return store_class.CONFIG_TYPE

    @staticmethod
    def get_store_type(url: str) -> StoreType:
        """Returns the store type associated with a URL schema.

        Args:
            url: The store URL.

        Returns:
            The store type associated with the supplied URL schema.

        Raises:
            TypeError: If no store type was found to support the supplied URL.
        """
        from zenml.zen_stores.rest_zen_store import RestZenStoreConfiguration
        from zenml.zen_stores.sql_zen_store import SqlZenStoreConfiguration

        if SqlZenStoreConfiguration.supports_url_scheme(url):
            return StoreType.SQL
        elif RestZenStoreConfiguration.supports_url_scheme(url):
            return StoreType.REST
        else:
            raise TypeError(f"No store implementation found for URL: {url}.")

    @staticmethod
    def create_store(
        config: StoreConfiguration,
        skip_default_registrations: bool = False,
        **kwargs: Any,
    ) -> "BaseZenStore":
        """Create and initialize a store from a store configuration.

        Args:
            config: The store configuration to use.
            skip_default_registrations: If `True`, the creation of the default
                stack and user in the store will be skipped.
            **kwargs: Additional keyword arguments to pass to the store class

        Returns:
            The initialized store.
        """
        logger.debug(f"Creating store with config '{config}'...")
        store_class = BaseZenStore.get_store_class(config.type)
        store = store_class(
            config=config,
            skip_default_registrations=skip_default_registrations,
            **kwargs,
        )

        secrets_store_config = store.config.secrets_store

        # Initialize the secrets store
        if (
            secrets_store_config
            and secrets_store_config.type != SecretsStoreType.NONE
        ):
            secrets_store_class = BaseSecretsStore.get_store_class(
                secrets_store_config
            )
            store._secrets_store = secrets_store_class(
                zen_store=store,
                config=secrets_store_config,
            )
            # Update the config with the actual secrets store config
            # to reflect the default values in the saved configuration
            store.config.secrets_store = store._secrets_store.config
        return store

    @staticmethod
    def get_default_store_config(path: str) -> StoreConfiguration:
        """Get the default store configuration.

        The default store is a SQLite store that saves the DB contents on the
        local filesystem.

        Args:
            path: The local path where the store DB will be stored.

        Returns:
            The default store configuration.
        """
        from zenml.zen_stores.sql_zen_store import SqlZenStoreConfiguration

        config = SqlZenStoreConfiguration(
            type=StoreType.SQL,
            url=SqlZenStoreConfiguration.get_local_url(path),
            secrets_store=SqlSecretsStoreConfiguration(
                type=SecretsStoreType.SQL,
            ),
        )
        return config

    def _initialize_database(self) -> None:
        """Initialize the database on first use."""
        try:
            default_workspace = self._default_workspace
        except KeyError:
            default_workspace = self._create_default_workspace()
        try:
            assert self._admin_role
        except KeyError:
            self._create_admin_role()
        try:
            assert self._guest_role
        except KeyError:
            self._create_guest_role()
        try:
            default_user = self._default_user
        except KeyError:
            default_user = self._create_default_user()
        try:
            self._get_default_stack(
                workspace_name_or_id=default_workspace.id,
                user_name_or_id=default_user.id,
            )
        except KeyError:
            self._create_default_stack(
                workspace_name_or_id=default_workspace.id,
                user_name_or_id=default_user.id,
            )

    @property
    def url(self) -> str:
        """The URL of the store.

        Returns:
            The URL of the store.
        """
        return self.config.url

    @property
    def type(self) -> StoreType:
        """The type of the store.

        Returns:
            The type of the store.
        """
        return self.TYPE

    @property
    def secrets_store(self) -> Optional["BaseSecretsStore"]:
        """The secrets store associated with this store.

        Returns:
            The secrets store associated with this store.
        """
        return self._secrets_store

    def validate_active_config(
        self,
        active_workspace_name_or_id: Optional[Union[str, UUID]] = None,
        active_stack_id: Optional[UUID] = None,
        config_name: str = "",
    ) -> Tuple[WorkspaceResponseModel, StackResponseModel]:
        """Validate the active configuration.

        Call this method to validate the supplied active workspace and active
        stack values.

        This method is guaranteed to return valid workspace ID and stack ID
        values. If the supplied workspace and stack are not set or are not valid
        (e.g. they do not exist or are not accessible), the default workspace and
        default workspace stack will be returned in their stead.

        Args:
            active_workspace_name_or_id: The name or ID of the active workspace.
            active_stack_id: The ID of the active stack.
            config_name: The name of the configuration to validate (used in the
                displayed logs/messages).

        Returns:
            A tuple containing the active workspace and active stack.
        """
        active_workspace: WorkspaceResponseModel

        if active_workspace_name_or_id:
            try:
                active_workspace = self.get_workspace(
                    active_workspace_name_or_id
                )
            except KeyError:
                active_workspace = self._get_or_create_default_workspace()

                logger.warning(
                    f"The current {config_name} active workspace is no longer "
                    f"available. Resetting the active workspace to "
                    f"'{active_workspace.name}'."
                )
        else:
            active_workspace = self._get_or_create_default_workspace()

            logger.info(
                f"Setting the {config_name} active workspace "
                f"to '{active_workspace.name}'."
            )

        active_stack: StackResponseModel

        # Sanitize the active stack
        if active_stack_id:
            # Ensure that the active stack is still valid
            try:
                active_stack = self.get_stack(stack_id=active_stack_id)
            except KeyError:
                logger.warning(
                    "The current %s active stack is no longer available. "
                    "Resetting the active stack to default.",
                    config_name,
                )
                active_stack = self._get_or_create_default_stack(
                    active_workspace
                )
            else:
                if active_stack.workspace.id != active_workspace.id:
                    logger.warning(
                        "The current %s active stack is not part of the active "
                        "workspace. Resetting the active stack to default.",
                        config_name,
                    )
                    active_stack = self._get_or_create_default_stack(
                        active_workspace
                    )
                elif not active_stack.is_shared and (
                    not active_stack.user
                    or (active_stack.user.id != self.get_user().id)
                ):
                    logger.warning(
                        "The current %s active stack is not shared and not "
                        "owned by the active user. "
                        "Resetting the active stack to default.",
                        config_name,
                    )
                    active_stack = self._get_or_create_default_stack(
                        active_workspace
                    )
        else:
            logger.warning(
                "Setting the %s active stack to default.",
                config_name,
            )
            active_stack = self._get_or_create_default_stack(active_workspace)

        return active_workspace, active_stack

    def get_store_info(self) -> ServerModel:
        """Get information about the store.

        Returns:
            Information about the store.
        """
        return ServerModel(
            id=GlobalConfiguration().user_id,
            version=zenml.__version__,
            deployment_type=os.environ.get(
                ENV_ZENML_SERVER_DEPLOYMENT_TYPE, ServerDeploymentType.OTHER
            ),
            database_type=ServerDatabaseType.OTHER,
            debug=IS_DEBUG_ENV,
            secrets_store_type=self.secrets_store.type
            if self.secrets_store
            else SecretsStoreType.NONE,
        )

    def is_local_store(self) -> bool:
        """Check if the store is local or connected to a local ZenML server.

        Returns:
            True if the store is local, False otherwise.
        """
        return self.get_store_info().is_local()

    def _get_or_create_default_stack(
        self, workspace: "WorkspaceResponseModel"
    ) -> "StackResponseModel":
        try:
            return self._get_default_stack(
                workspace_name_or_id=workspace.id,
                user_name_or_id=self.get_user().id,
            )
        except KeyError:
            return self._create_default_stack(
                workspace_name_or_id=workspace.id,
                user_name_or_id=self.get_user().id,
            )

    def _get_or_create_default_workspace(self) -> "WorkspaceResponseModel":
        try:
            return self._default_workspace
        except KeyError:
            return self._create_default_workspace()

    # --------------
    # Event Handlers
    # --------------

    def register_event_handler(
        self,
        event: StoreEvent,
        handler: Callable[..., Any],
    ) -> None:
        """Register an external event handler.

        The handler will be called when the store event is triggered.

        Args:
            event: The event to register the handler for.
            handler: The handler function to register.
        """
        self._event_handlers.setdefault(event, []).append(handler)

    def _trigger_event(self, event: StoreEvent, **kwargs: Any) -> None:
        """Trigger an event and call all registered handlers.

        Args:
            event: The event to trigger.
            **kwargs: The event arguments.
        """
        for handler in self._event_handlers.get(event, []):
            try:
                handler(event, **kwargs)
            except Exception as e:
                logger.error(
                    f"Silently ignoring error caught while triggering event "
                    f"store handler for event {event.value}: {e}",
                    exc_info=True,
                )

    # ------
    # Stacks
    # ------

    @track(AnalyticsEvent.REGISTERED_DEFAULT_STACK)
    def _create_default_stack(
        self,
        workspace_name_or_id: Union[str, UUID],
        user_name_or_id: Union[str, UUID],
    ) -> StackResponseModel:
        """Create the default stack components and stack.

        The default stack contains a local orchestrator and a local artifact
        store.

        Args:
            workspace_name_or_id: Name or ID of the workspace to which the stack
                belongs.
            user_name_or_id: The name or ID of the user that owns the stack.

        Returns:
            The model of the created default stack.
        """
        workspace = self.get_workspace(
            workspace_name_or_id=workspace_name_or_id
        )
        user = self.get_user(user_name_or_id=user_name_or_id)

        logger.info(
            f"Creating default stack for user '{user.name}' in workspace "
            f"{workspace.name}..."
        )

        # Register the default orchestrator
        orchestrator = self.create_stack_component(
            component=ComponentRequestModel(
                user=user.id,
                workspace=workspace.id,
                name=DEFAULT_STACK_COMPONENT_NAME,
                type=StackComponentType.ORCHESTRATOR,
                flavor="local",
                configuration={},
            ),
        )

        # Register the default artifact store
        artifact_store = self.create_stack_component(
            component=ComponentRequestModel(
                user=user.id,
                workspace=workspace.id,
                name=DEFAULT_STACK_COMPONENT_NAME,
                type=StackComponentType.ARTIFACT_STORE,
                flavor="local",
                configuration={},
            ),
        )

        components = {c.type: [c.id] for c in [orchestrator, artifact_store]}
        # Register the default stack
        stack = StackRequestModel(
            name=DEFAULT_STACK_NAME,
            components=components,
            is_shared=False,
            workspace=workspace.id,
            user=user.id,
        )
        return self.create_stack(stack=stack)

    def _get_default_stack(
        self,
        workspace_name_or_id: Union[str, UUID],
        user_name_or_id: Union[str, UUID],
    ) -> StackResponseModel:
        """Get the default stack for a user in a workspace.

        Args:
            workspace_name_or_id: Name or ID of the workspace.
            user_name_or_id: Name or ID of the user.

        Returns:
            The default stack in the workspace owned by the supplied user.

        Raises:
            KeyError: if the workspace or default stack doesn't exist.
        """
        default_stacks = self.list_stacks(
            StackFilterModel(
                workspace_id=workspace_name_or_id,
                user_id=user_name_or_id,
                name=DEFAULT_STACK_NAME,
            )
        )
        if default_stacks.total == 0:
            raise KeyError(
                f"No default stack found for user {str(user_name_or_id)} in "
                f"workspace {str(workspace_name_or_id)}"
            )
        return default_stacks.items[0]

    # -----
    # Roles
    # -----
    @property
    def _admin_role(self) -> RoleResponseModel:
        """Get the admin role.

        Returns:
            The default admin role.
        """
        return self.get_role(DEFAULT_ADMIN_ROLE)

    @track(AnalyticsEvent.CREATED_DEFAULT_ROLES)
    def _create_admin_role(self) -> RoleResponseModel:
        """Creates the admin role.

        Returns:
            The admin role
        """
        logger.info(f"Creating '{DEFAULT_ADMIN_ROLE}' role ...")
        return self.create_role(
            RoleRequestModel(
                name=DEFAULT_ADMIN_ROLE,
                permissions={
                    PermissionType.READ,
                    PermissionType.WRITE,
                    PermissionType.ME,
                },
            )
        )

    @property
    def _guest_role(self) -> RoleResponseModel:
        """Get the guest role.

        Returns:
            The guest role.
        """
        return self.get_role(DEFAULT_GUEST_ROLE)

    @track(AnalyticsEvent.CREATED_DEFAULT_ROLES)
    def _create_guest_role(self) -> RoleResponseModel:
        """Creates the guest role.

        Returns:
            The guest role
        """
        logger.info(f"Creating '{DEFAULT_GUEST_ROLE}' role ...")
        return self.create_role(
            RoleRequestModel(
                name=DEFAULT_GUEST_ROLE,
                permissions={
                    PermissionType.READ,
                    PermissionType.ME,
                },
            )
        )

    # -----
    # Users
    # -----

    @property
    def _default_user_name(self) -> str:
        """Get the default user name.

        Returns:
            The default user name.
        """
        return os.getenv(ENV_ZENML_DEFAULT_USER_NAME, DEFAULT_USERNAME)

    @property
    def _default_user(self) -> UserResponseModel:
        """Get the default user.

        Returns:
            The default user.

        Raises:
            KeyError: If the default user doesn't exist.
        """
        user_name = self._default_user_name
        try:
            return self.get_user(user_name)
        except KeyError:
            raise KeyError(f"The default user '{user_name}' is not configured")

    @track(AnalyticsEvent.CREATED_DEFAULT_USER)
    def _create_default_user(self) -> UserResponseModel:
        """Creates a default user with the admin role.

        Returns:
            The default user.
        """
        user_name = os.getenv(ENV_ZENML_DEFAULT_USER_NAME, DEFAULT_USERNAME)
        user_password = os.getenv(
            ENV_ZENML_DEFAULT_USER_PASSWORD, DEFAULT_PASSWORD
        )

        logger.info(f"Creating default user '{user_name}' ...")
        new_user = self.create_user(
            UserRequestModel(
                name=user_name,
                active=True,
                password=user_password,
            )
        )
        self.create_user_role_assignment(
            UserRoleAssignmentRequestModel(
                role=self._admin_role.id,
                user=new_user.id,
                workspace=None,
            )
        )
        return new_user

    # -----
    # Roles
    # -----

    @property
    def roles(self) -> Page[RoleResponseModel]:
        """All existing roles.

        Returns:
            A list of all existing roles.
        """
        return self.list_roles(RoleFilterModel())

    # --------
    # Workspaces
    # --------

    @property
    def _default_workspace_name(self) -> str:
        """Get the default workspace name.

        Returns:
            The default workspace name.
        """
        return os.getenv(
            ENV_ZENML_DEFAULT_WORKSPACE_NAME, DEFAULT_WORKSPACE_NAME
        )

    @property
    def _default_workspace(self) -> WorkspaceResponseModel:
        """Get the default workspace.

        Returns:
            The default workspace.

        Raises:
            KeyError: if the default workspace doesn't exist.
        """
        workspace_name = self._default_workspace_name
        try:
            return self.get_workspace(workspace_name)
        except KeyError:
            raise KeyError(
                f"The default workspace '{workspace_name}' is not configured"
            )

    @track(AnalyticsEvent.CREATED_DEFAULT_WORKSPACE)
    def _create_default_workspace(self) -> WorkspaceResponseModel:
        """Creates a default workspace.

        Returns:
            The default workspace.
        """
        workspace_name = self._default_workspace_name
        logger.info(f"Creating default workspace '{workspace_name}' ...")
        return self.create_workspace(
            WorkspaceRequestModel(name=workspace_name)
        )

    # ---------
    # Analytics
    # ---------

    def track_event(
        self,
        event: AnalyticsEvent,
        metadata: Optional[Dict[str, Any]] = None,
    ) -> None:
        """Track an analytics event.

        Args:
            event: The event to track.
            metadata: Additional metadata to track with the event.
        """
        if self.track_analytics:
            # Server information is always tracked, if available.
            track_event(event, metadata)

    class Config:
        """Pydantic configuration class."""

        # Validate attributes when assigning them. We need to set this in order
        # to have a mix of mutable and immutable attributes
        validate_assignment = True
        # Ignore extra attributes from configs of previous ZenML versions
        extra = "ignore"
        # all attributes with leading underscore are private and therefore
        # are mutable and not included in serialization
        underscore_attrs_are_private = True
roles: Page[RoleResponseModel] property readonly

All existing roles.

Returns:

Type Description
Page[RoleResponseModel]

A list of all existing roles.

secrets_store: Optional[BaseSecretsStore] property readonly

The secrets store associated with this store.

Returns:

Type Description
Optional[BaseSecretsStore]

The secrets store associated with this store.

type: StoreType property readonly

The type of the store.

Returns:

Type Description
StoreType

The type of the store.

url: str property readonly

The URL of the store.

Returns:

Type Description
str

The URL of the store.

Config

Pydantic configuration class.

Source code in zenml/zen_stores/base_zen_store.py
class Config:
    """Pydantic configuration class."""

    # Validate attributes when assigning them. We need to set this in order
    # to have a mix of mutable and immutable attributes
    validate_assignment = True
    # Ignore extra attributes from configs of previous ZenML versions
    extra = "ignore"
    # all attributes with leading underscore are private and therefore
    # are mutable and not included in serialization
    underscore_attrs_are_private = True
__init__(self, skip_default_registrations=False, **kwargs) special

Create and initialize a store.

Parameters:

Name Type Description Default
skip_default_registrations bool

If True, the creation of the default stack and user in the store will be skipped.

False
**kwargs Any

Additional keyword arguments to pass to the Pydantic constructor.

{}

Exceptions:

Type Description
RuntimeError

If the store cannot be initialized.

Source code in zenml/zen_stores/base_zen_store.py
def __init__(
    self,
    skip_default_registrations: bool = False,
    **kwargs: Any,
) -> None:
    """Create and initialize a store.

    Args:
        skip_default_registrations: If `True`, the creation of the default
            stack and user in the store will be skipped.
        **kwargs: Additional keyword arguments to pass to the Pydantic
            constructor.

    Raises:
        RuntimeError: If the store cannot be initialized.
    """
    super().__init__(**kwargs)

    try:
        self._initialize()
    except Exception as e:
        raise RuntimeError(
            f"Error initializing {self.type.value} store with URL "
            f"'{self.url}': {str(e)}"
        ) from e

    if not skip_default_registrations:
        logger.debug("Initializing database")
        self._initialize_database()
    else:
        logger.debug("Skipping database initialization")
create_secret(self, secret)

Creates a new secret.

The new secret is also validated against the scoping rules enforced in the secrets store:

  • only one workspace-scoped secret with the given name can exist in the target workspace.
  • only one user-scoped secret with the given name can exist in the target workspace for the target user.

Parameters:

Name Type Description Default
secret SecretRequestModel

The secret to create.

required

Returns:

Type Description
SecretResponseModel

The newly created secret.

Exceptions:

Type Description
KeyError

if the user or workspace does not exist.

EntityExistsError

If a secret with the same name already exists in the same scope.

ValueError

if the secret is invalid.

Source code in zenml/zen_stores/base_zen_store.py
@abstractmethod
def create_secret(
    self,
    secret: SecretRequestModel,
) -> SecretResponseModel:
    """Creates a new secret.

    The new secret is also validated against the scoping rules enforced in
    the secrets store:

      - only one workspace-scoped secret with the given name can exist
        in the target workspace.
      - only one user-scoped secret with the given name can exist in the
        target workspace for the target user.

    Args:
        secret: The secret to create.

    Returns:
        The newly created secret.

    Raises:
        KeyError: if the user or workspace does not exist.
        EntityExistsError: If a secret with the same name already exists in
            the same scope.
        ValueError: if the secret is invalid.
    """
create_store(config, skip_default_registrations=False, **kwargs) staticmethod

Create and initialize a store from a store configuration.

Parameters:

Name Type Description Default
config StoreConfiguration

The store configuration to use.

required
skip_default_registrations bool

If True, the creation of the default stack and user in the store will be skipped.

False
**kwargs Any

Additional keyword arguments to pass to the store class

{}

Returns:

Type Description
BaseZenStore

The initialized store.

Source code in zenml/zen_stores/base_zen_store.py
@staticmethod
def create_store(
    config: StoreConfiguration,
    skip_default_registrations: bool = False,
    **kwargs: Any,
) -> "BaseZenStore":
    """Create and initialize a store from a store configuration.

    Args:
        config: The store configuration to use.
        skip_default_registrations: If `True`, the creation of the default
            stack and user in the store will be skipped.
        **kwargs: Additional keyword arguments to pass to the store class

    Returns:
        The initialized store.
    """
    logger.debug(f"Creating store with config '{config}'...")
    store_class = BaseZenStore.get_store_class(config.type)
    store = store_class(
        config=config,
        skip_default_registrations=skip_default_registrations,
        **kwargs,
    )

    secrets_store_config = store.config.secrets_store

    # Initialize the secrets store
    if (
        secrets_store_config
        and secrets_store_config.type != SecretsStoreType.NONE
    ):
        secrets_store_class = BaseSecretsStore.get_store_class(
            secrets_store_config
        )
        store._secrets_store = secrets_store_class(
            zen_store=store,
            config=secrets_store_config,
        )
        # Update the config with the actual secrets store config
        # to reflect the default values in the saved configuration
        store.config.secrets_store = store._secrets_store.config
    return store
delete_secret(self, secret_id)

Deletes a secret.

Parameters:

Name Type Description Default
secret_id UUID

The ID of the secret to delete.

required

Exceptions:

Type Description
KeyError

if the secret doesn't exist.

Source code in zenml/zen_stores/base_zen_store.py
@abstractmethod
def delete_secret(self, secret_id: UUID) -> None:
    """Deletes a secret.

    Args:
        secret_id: The ID of the secret to delete.

    Raises:
        KeyError: if the secret doesn't exist.
    """
get_default_store_config(path) staticmethod

Get the default store configuration.

The default store is a SQLite store that saves the DB contents on the local filesystem.

Parameters:

Name Type Description Default
path str

The local path where the store DB will be stored.

required

Returns:

Type Description
StoreConfiguration

The default store configuration.

Source code in zenml/zen_stores/base_zen_store.py
@staticmethod
def get_default_store_config(path: str) -> StoreConfiguration:
    """Get the default store configuration.

    The default store is a SQLite store that saves the DB contents on the
    local filesystem.

    Args:
        path: The local path where the store DB will be stored.

    Returns:
        The default store configuration.
    """
    from zenml.zen_stores.sql_zen_store import SqlZenStoreConfiguration

    config = SqlZenStoreConfiguration(
        type=StoreType.SQL,
        url=SqlZenStoreConfiguration.get_local_url(path),
        secrets_store=SqlSecretsStoreConfiguration(
            type=SecretsStoreType.SQL,
        ),
    )
    return config
get_secret(self, secret_id)

Get a secret with a given name.

Parameters:

Name Type Description Default
secret_id UUID

ID of the secret.

required

Returns:

Type Description
SecretResponseModel

The secret.

Exceptions:

Type Description
KeyError

if the secret does not exist.

Source code in zenml/zen_stores/base_zen_store.py
@abstractmethod
def get_secret(self, secret_id: UUID) -> SecretResponseModel:
    """Get a secret with a given name.

    Args:
        secret_id: ID of the secret.

    Returns:
        The secret.

    Raises:
        KeyError: if the secret does not exist.
    """
get_store_class(store_type) staticmethod

Returns the class of the given store type.

Parameters:

Name Type Description Default
store_type StoreType

The type of the store to get the class for.

required

Returns:

Type Description
Type[BaseZenStore]

The class of the given store type or None if the type is unknown.

Exceptions:

Type Description
TypeError

If the store type is unsupported.

Source code in zenml/zen_stores/base_zen_store.py
@staticmethod
def get_store_class(store_type: StoreType) -> Type["BaseZenStore"]:
    """Returns the class of the given store type.

    Args:
        store_type: The type of the store to get the class for.

    Returns:
        The class of the given store type or None if the type is unknown.

    Raises:
        TypeError: If the store type is unsupported.
    """
    if store_type == StoreType.SQL:
        from zenml.zen_stores.sql_zen_store import SqlZenStore

        return SqlZenStore
    elif store_type == StoreType.REST:
        from zenml.zen_stores.rest_zen_store import RestZenStore

        return RestZenStore
    else:
        raise TypeError(
            f"No store implementation found for store type "
            f"`{store_type.value}`."
        )
get_store_config_class(store_type) staticmethod

Returns the store config class of the given store type.

Parameters:

Name Type Description Default
store_type StoreType

The type of the store to get the class for.

required

Returns:

Type Description
Type[StoreConfiguration]

The config class of the given store type.

Source code in zenml/zen_stores/base_zen_store.py
@staticmethod
def get_store_config_class(
    store_type: StoreType,
) -> Type["StoreConfiguration"]:
    """Returns the store config class of the given store type.

    Args:
        store_type: The type of the store to get the class for.

    Returns:
        The config class of the given store type.
    """
    store_class = BaseZenStore.get_store_class(store_type)
    return store_class.CONFIG_TYPE
get_store_info(self)

Get information about the store.

Returns:

Type Description
ServerModel

Information about the store.

Source code in zenml/zen_stores/base_zen_store.py
def get_store_info(self) -> ServerModel:
    """Get information about the store.

    Returns:
        Information about the store.
    """
    return ServerModel(
        id=GlobalConfiguration().user_id,
        version=zenml.__version__,
        deployment_type=os.environ.get(
            ENV_ZENML_SERVER_DEPLOYMENT_TYPE, ServerDeploymentType.OTHER
        ),
        database_type=ServerDatabaseType.OTHER,
        debug=IS_DEBUG_ENV,
        secrets_store_type=self.secrets_store.type
        if self.secrets_store
        else SecretsStoreType.NONE,
    )
get_store_type(url) staticmethod

Returns the store type associated with a URL schema.

Parameters:

Name Type Description Default
url str

The store URL.

required

Returns:

Type Description
StoreType

The store type associated with the supplied URL schema.

Exceptions:

Type Description
TypeError

If no store type was found to support the supplied URL.

Source code in zenml/zen_stores/base_zen_store.py
@staticmethod
def get_store_type(url: str) -> StoreType:
    """Returns the store type associated with a URL schema.

    Args:
        url: The store URL.

    Returns:
        The store type associated with the supplied URL schema.

    Raises:
        TypeError: If no store type was found to support the supplied URL.
    """
    from zenml.zen_stores.rest_zen_store import RestZenStoreConfiguration
    from zenml.zen_stores.sql_zen_store import SqlZenStoreConfiguration

    if SqlZenStoreConfiguration.supports_url_scheme(url):
        return StoreType.SQL
    elif RestZenStoreConfiguration.supports_url_scheme(url):
        return StoreType.REST
    else:
        raise TypeError(f"No store implementation found for URL: {url}.")
is_local_store(self)

Check if the store is local or connected to a local ZenML server.

Returns:

Type Description
bool

True if the store is local, False otherwise.

Source code in zenml/zen_stores/base_zen_store.py
def is_local_store(self) -> bool:
    """Check if the store is local or connected to a local ZenML server.

    Returns:
        True if the store is local, False otherwise.
    """
    return self.get_store_info().is_local()
list_secrets(self, secret_filter_model)

List all secrets matching the given filter criteria.

Note that returned secrets do not include any secret values. To fetch the secret values, use get_secret.

Parameters:

Name Type Description Default
secret_filter_model SecretFilterModel

All filter parameters including pagination params.

required

Returns:

Type Description
Page[SecretResponseModel]

A list of all secrets matching the filter criteria, with pagination information and sorted according to the filter criteria. The returned secrets do not include any secret values, only metadata. To fetch the secret values, use get_secret individually with each secret.

Source code in zenml/zen_stores/base_zen_store.py
@abstractmethod
def list_secrets(
    self, secret_filter_model: SecretFilterModel
) -> Page[SecretResponseModel]:
    """List all secrets matching the given filter criteria.

    Note that returned secrets do not include any secret values. To fetch
    the secret values, use `get_secret`.

    Args:
        secret_filter_model: All filter parameters including pagination
            params.

    Returns:
        A list of all secrets matching the filter criteria, with pagination
        information and sorted according to the filter criteria. The
        returned secrets do not include any secret values, only metadata. To
        fetch the secret values, use `get_secret` individually with each
        secret.
    """
register_event_handler(self, event, handler)

Register an external event handler.

The handler will be called when the store event is triggered.

Parameters:

Name Type Description Default
event StoreEvent

The event to register the handler for.

required
handler Callable[..., Any]

The handler function to register.

required
Source code in zenml/zen_stores/base_zen_store.py
def register_event_handler(
    self,
    event: StoreEvent,
    handler: Callable[..., Any],
) -> None:
    """Register an external event handler.

    The handler will be called when the store event is triggered.

    Args:
        event: The event to register the handler for.
        handler: The handler function to register.
    """
    self._event_handlers.setdefault(event, []).append(handler)
track_event(self, event, metadata=None)

Track an analytics event.

Parameters:

Name Type Description Default
event AnalyticsEvent

The event to track.

required
metadata Optional[Dict[str, Any]]

Additional metadata to track with the event.

None
Source code in zenml/zen_stores/base_zen_store.py
def track_event(
    self,
    event: AnalyticsEvent,
    metadata: Optional[Dict[str, Any]] = None,
) -> None:
    """Track an analytics event.

    Args:
        event: The event to track.
        metadata: Additional metadata to track with the event.
    """
    if self.track_analytics:
        # Server information is always tracked, if available.
        track_event(event, metadata)
update_secret(self, secret_id, secret_update)

Updates a secret.

Secret values that are specified as None in the update that are present in the existing secret are removed from the existing secret. Values that are present in both secrets are overwritten. All other values in both the existing secret and the update are kept (merged).

If the update includes a change of name or scope, the scoping rules enforced in the secrets store are used to validate the update:

  • only one workspace-scoped secret with the given name can exist in the target workspace.
  • only one user-scoped secret with the given name can exist in the target workspace for the target user.

Parameters:

Name Type Description Default
secret_id UUID

The ID of the secret to be updated.

required
secret_update SecretUpdateModel

The update to be applied.

required

Returns:

Type Description
SecretResponseModel

The updated secret.

Exceptions:

Type Description
KeyError

if the secret doesn't exist.

EntityExistsError

If a secret with the same name already exists in the same scope.

ValueError

if the secret is invalid.

Source code in zenml/zen_stores/base_zen_store.py
@abstractmethod
def update_secret(
    self,
    secret_id: UUID,
    secret_update: SecretUpdateModel,
) -> SecretResponseModel:
    """Updates a secret.

    Secret values that are specified as `None` in the update that are
    present in the existing secret are removed from the existing secret.
    Values that are present in both secrets are overwritten. All other
    values in both the existing secret and the update are kept (merged).

    If the update includes a change of name or scope, the scoping rules
    enforced in the secrets store are used to validate the update:

      - only one workspace-scoped secret with the given name can exist
        in the target workspace.
      - only one user-scoped secret with the given name can exist in the
        target workspace for the target user.

    Args:
        secret_id: The ID of the secret to be updated.
        secret_update: The update to be applied.

    Returns:
        The updated secret.

    Raises:
        KeyError: if the secret doesn't exist.
        EntityExistsError: If a secret with the same name already exists in
            the same scope.
        ValueError: if the secret is invalid.
    """
validate_active_config(self, active_workspace_name_or_id=None, active_stack_id=None, config_name='')

Validate the active configuration.

Call this method to validate the supplied active workspace and active stack values.

This method is guaranteed to return valid workspace ID and stack ID values. If the supplied workspace and stack are not set or are not valid (e.g. they do not exist or are not accessible), the default workspace and default workspace stack will be returned in their stead.

Parameters:

Name Type Description Default
active_workspace_name_or_id Union[str, uuid.UUID]

The name or ID of the active workspace.

None
active_stack_id Optional[uuid.UUID]

The ID of the active stack.

None
config_name str

The name of the configuration to validate (used in the displayed logs/messages).

''

Returns:

Type Description
Tuple[zenml.models.workspace_models.WorkspaceResponseModel, zenml.models.stack_models.StackResponseModel]

A tuple containing the active workspace and active stack.

Source code in zenml/zen_stores/base_zen_store.py
def validate_active_config(
    self,
    active_workspace_name_or_id: Optional[Union[str, UUID]] = None,
    active_stack_id: Optional[UUID] = None,
    config_name: str = "",
) -> Tuple[WorkspaceResponseModel, StackResponseModel]:
    """Validate the active configuration.

    Call this method to validate the supplied active workspace and active
    stack values.

    This method is guaranteed to return valid workspace ID and stack ID
    values. If the supplied workspace and stack are not set or are not valid
    (e.g. they do not exist or are not accessible), the default workspace and
    default workspace stack will be returned in their stead.

    Args:
        active_workspace_name_or_id: The name or ID of the active workspace.
        active_stack_id: The ID of the active stack.
        config_name: The name of the configuration to validate (used in the
            displayed logs/messages).

    Returns:
        A tuple containing the active workspace and active stack.
    """
    active_workspace: WorkspaceResponseModel

    if active_workspace_name_or_id:
        try:
            active_workspace = self.get_workspace(
                active_workspace_name_or_id
            )
        except KeyError:
            active_workspace = self._get_or_create_default_workspace()

            logger.warning(
                f"The current {config_name} active workspace is no longer "
                f"available. Resetting the active workspace to "
                f"'{active_workspace.name}'."
            )
    else:
        active_workspace = self._get_or_create_default_workspace()

        logger.info(
            f"Setting the {config_name} active workspace "
            f"to '{active_workspace.name}'."
        )

    active_stack: StackResponseModel

    # Sanitize the active stack
    if active_stack_id:
        # Ensure that the active stack is still valid
        try:
            active_stack = self.get_stack(stack_id=active_stack_id)
        except KeyError:
            logger.warning(
                "The current %s active stack is no longer available. "
                "Resetting the active stack to default.",
                config_name,
            )
            active_stack = self._get_or_create_default_stack(
                active_workspace
            )
        else:
            if active_stack.workspace.id != active_workspace.id:
                logger.warning(
                    "The current %s active stack is not part of the active "
                    "workspace. Resetting the active stack to default.",
                    config_name,
                )
                active_stack = self._get_or_create_default_stack(
                    active_workspace
                )
            elif not active_stack.is_shared and (
                not active_stack.user
                or (active_stack.user.id != self.get_user().id)
            ):
                logger.warning(
                    "The current %s active stack is not shared and not "
                    "owned by the active user. "
                    "Resetting the active stack to default.",
                    config_name,
                )
                active_stack = self._get_or_create_default_stack(
                    active_workspace
                )
    else:
        logger.warning(
            "Setting the %s active stack to default.",
            config_name,
        )
        active_stack = self._get_or_create_default_stack(active_workspace)

    return active_workspace, active_stack

enums

Zen Store enums.

StoreEvent (StrEnum)

Events that can be triggered by the store.

Source code in zenml/zen_stores/enums.py
class StoreEvent(StrEnum):
    """Events that can be triggered by the store."""

    # Triggered just before deleting a workspace. The workspace ID is passed as
    # a `workspace_id` UUID argument.
    WORKSPACE_DELETED = "workspace_deleted"
    # Triggered just before deleting a user. The user ID is passed as
    # a `user_id` UUID argument.
    USER_DELETED = "user_deleted"

migrations special

Alembic database migration utilities.

alembic

Alembic utilities wrapper.

The Alembic class defined here acts as a wrapper around the Alembic library that automatically configures Alembic to use the ZenML SQL store database connection.

Alembic

Alembic environment and migration API.

This class provides a wrapper around the Alembic library that automatically configures Alembic to use the ZenML SQL store database connection.

Source code in zenml/zen_stores/migrations/alembic.py
class Alembic:
    """Alembic environment and migration API.

    This class provides a wrapper around the Alembic library that automatically
    configures Alembic to use the ZenML SQL store database connection.
    """

    def __init__(
        self,
        engine: Engine,
        metadata: MetaData = SQLModel.metadata,
        context: Optional[EnvironmentContext] = None,
        **kwargs: Any,
    ) -> None:
        """Initialize the Alembic wrapper.

        Args:
            engine: The SQLAlchemy engine to use.
            metadata: The SQLAlchemy metadata to use.
            context: The Alembic environment context to use. If not set, a new
                context is created pointing to the ZenML migrations directory.
            **kwargs: Additional keyword arguments to pass to the Alembic
                environment context.
        """
        self.engine = engine
        self.metadata = metadata
        self.context_kwargs = kwargs

        self.config = Config()
        self.config.set_main_option(
            "script_location", str(Path(__file__).parent)
        )
        self.config.set_main_option(
            "version_locations", str(Path(__file__).parent / "versions")
        )

        self.script_directory = ScriptDirectory.from_config(self.config)
        if context is None:
            self.environment_context = EnvironmentContext(
                self.config, self.script_directory
            )
        else:
            self.environment_context = context

    def db_is_empty(self) -> bool:
        """Check if the database is empty.

        Returns:
            True if the database is empty, False otherwise.
        """
        # Check the existence of any of the SQLModel tables
        return not self.engine.dialect.has_table(
            self.engine.connect(), schemas.StackSchema.__tablename__
        )

    def run_migrations(
        self,
        fn: Optional[Callable[[_RevIdType, MigrationContext], List[Any]]],
    ) -> None:
        """Run an online migration function in the current migration context.

        Args:
            fn: Migration function to run. If not set, the function configured
                externally by the Alembic CLI command is used.
        """
        fn_context_args: Dict[Any, Any] = {}
        if fn is not None:
            fn_context_args["fn"] = fn

        with self.engine.connect() as connection:
            self.environment_context.configure(
                connection=connection,
                target_metadata=self.metadata,
                include_object=include_object,
                compare_type=True,
                render_as_batch=True,
                **fn_context_args,
                **self.context_kwargs,
            )

            with self.environment_context.begin_transaction():
                self.environment_context.run_migrations()

    def current_revisions(self) -> List[str]:
        """Get the current database revisions.

        Returns:
            List of head revisions.
        """
        current_revisions: List[str] = []

        def do_get_current_rev(rev: _RevIdType, context: Any) -> List[Any]:
            nonlocal current_revisions

            for r in self.script_directory.get_all_current(
                rev  # type:ignore [arg-type]
            ):
                if r is None:
                    continue
                current_revisions.append(r.revision)
            return []

        self.run_migrations(do_get_current_rev)

        return current_revisions

    def stamp(self, revision: str) -> None:
        """Stamp the revision table with the given revision without running any migrations.

        Args:
            revision: String revision target.
        """

        def do_stamp(rev: _RevIdType, context: Any) -> List[Any]:
            return self.script_directory._stamp_revs(revision, rev)

        self.run_migrations(do_stamp)

    def upgrade(self, revision: str = "heads") -> None:
        """Upgrade the database to a later version.

        Args:
            revision: String revision target.
        """

        def do_upgrade(rev: _RevIdType, context: Any) -> List[Any]:
            return self.script_directory._upgrade_revs(
                revision, rev  # type:ignore [arg-type]
            )

        self.run_migrations(do_upgrade)

    def downgrade(self, revision: str) -> None:
        """Revert the database to a previous version.

        Args:
            revision: String revision target.
        """

        def do_downgrade(rev: _RevIdType, context: Any) -> List[Any]:
            return self.script_directory._downgrade_revs(
                revision, rev  # type:ignore [arg-type]
            )

        self.run_migrations(do_downgrade)
__init__(self, engine, metadata=MetaData(), context=None, **kwargs) special

Initialize the Alembic wrapper.

Parameters:

Name Type Description Default
engine Engine

The SQLAlchemy engine to use.

required
metadata MetaData

The SQLAlchemy metadata to use.

MetaData()
context Optional[alembic.runtime.environment.EnvironmentContext]

The Alembic environment context to use. If not set, a new context is created pointing to the ZenML migrations directory.

None
**kwargs Any

Additional keyword arguments to pass to the Alembic environment context.

{}
Source code in zenml/zen_stores/migrations/alembic.py
def __init__(
    self,
    engine: Engine,
    metadata: MetaData = SQLModel.metadata,
    context: Optional[EnvironmentContext] = None,
    **kwargs: Any,
) -> None:
    """Initialize the Alembic wrapper.

    Args:
        engine: The SQLAlchemy engine to use.
        metadata: The SQLAlchemy metadata to use.
        context: The Alembic environment context to use. If not set, a new
            context is created pointing to the ZenML migrations directory.
        **kwargs: Additional keyword arguments to pass to the Alembic
            environment context.
    """
    self.engine = engine
    self.metadata = metadata
    self.context_kwargs = kwargs

    self.config = Config()
    self.config.set_main_option(
        "script_location", str(Path(__file__).parent)
    )
    self.config.set_main_option(
        "version_locations", str(Path(__file__).parent / "versions")
    )

    self.script_directory = ScriptDirectory.from_config(self.config)
    if context is None:
        self.environment_context = EnvironmentContext(
            self.config, self.script_directory
        )
    else:
        self.environment_context = context
current_revisions(self)

Get the current database revisions.

Returns:

Type Description
List[str]

List of head revisions.

Source code in zenml/zen_stores/migrations/alembic.py
def current_revisions(self) -> List[str]:
    """Get the current database revisions.

    Returns:
        List of head revisions.
    """
    current_revisions: List[str] = []

    def do_get_current_rev(rev: _RevIdType, context: Any) -> List[Any]:
        nonlocal current_revisions

        for r in self.script_directory.get_all_current(
            rev  # type:ignore [arg-type]
        ):
            if r is None:
                continue
            current_revisions.append(r.revision)
        return []

    self.run_migrations(do_get_current_rev)

    return current_revisions
db_is_empty(self)

Check if the database is empty.

Returns:

Type Description
bool

True if the database is empty, False otherwise.

Source code in zenml/zen_stores/migrations/alembic.py
def db_is_empty(self) -> bool:
    """Check if the database is empty.

    Returns:
        True if the database is empty, False otherwise.
    """
    # Check the existence of any of the SQLModel tables
    return not self.engine.dialect.has_table(
        self.engine.connect(), schemas.StackSchema.__tablename__
    )
downgrade(self, revision)

Revert the database to a previous version.

Parameters:

Name Type Description Default
revision str

String revision target.

required
Source code in zenml/zen_stores/migrations/alembic.py
def downgrade(self, revision: str) -> None:
    """Revert the database to a previous version.

    Args:
        revision: String revision target.
    """

    def do_downgrade(rev: _RevIdType, context: Any) -> List[Any]:
        return self.script_directory._downgrade_revs(
            revision, rev  # type:ignore [arg-type]
        )

    self.run_migrations(do_downgrade)
run_migrations(self, fn)

Run an online migration function in the current migration context.

Parameters:

Name Type Description Default
fn Optional[Callable[[Union[str, Sequence[str]], alembic.runtime.migration.MigrationContext], List[Any]]]

Migration function to run. If not set, the function configured externally by the Alembic CLI command is used.

required
Source code in zenml/zen_stores/migrations/alembic.py
def run_migrations(
    self,
    fn: Optional[Callable[[_RevIdType, MigrationContext], List[Any]]],
) -> None:
    """Run an online migration function in the current migration context.

    Args:
        fn: Migration function to run. If not set, the function configured
            externally by the Alembic CLI command is used.
    """
    fn_context_args: Dict[Any, Any] = {}
    if fn is not None:
        fn_context_args["fn"] = fn

    with self.engine.connect() as connection:
        self.environment_context.configure(
            connection=connection,
            target_metadata=self.metadata,
            include_object=include_object,
            compare_type=True,
            render_as_batch=True,
            **fn_context_args,
            **self.context_kwargs,
        )

        with self.environment_context.begin_transaction():
            self.environment_context.run_migrations()
stamp(self, revision)

Stamp the revision table with the given revision without running any migrations.

Parameters:

Name Type Description Default
revision str

String revision target.

required
Source code in zenml/zen_stores/migrations/alembic.py
def stamp(self, revision: str) -> None:
    """Stamp the revision table with the given revision without running any migrations.

    Args:
        revision: String revision target.
    """

    def do_stamp(rev: _RevIdType, context: Any) -> List[Any]:
        return self.script_directory._stamp_revs(revision, rev)

    self.run_migrations(do_stamp)
upgrade(self, revision='heads')

Upgrade the database to a later version.

Parameters:

Name Type Description Default
revision str

String revision target.

'heads'
Source code in zenml/zen_stores/migrations/alembic.py
def upgrade(self, revision: str = "heads") -> None:
    """Upgrade the database to a later version.

    Args:
        revision: String revision target.
    """

    def do_upgrade(rev: _RevIdType, context: Any) -> List[Any]:
        return self.script_directory._upgrade_revs(
            revision, rev  # type:ignore [arg-type]
        )

    self.run_migrations(do_upgrade)
AlembicVersion (Base)

Alembic version table.

Source code in zenml/zen_stores/migrations/alembic.py
class AlembicVersion(Base):  # type: ignore[valid-type,misc]
    """Alembic version table."""

    __tablename__ = "alembic_version"
    version_num = Column(String, nullable=False, primary_key=True)
include_object(object, name, type_, *args, **kwargs)

Function used to exclude tables from the migration scripts.

Parameters:

Name Type Description Default
object Any

The schema item object to check.

required
name str

The name of the object to check.

required
type_ str

The type of the object to check.

required
*args Any

Additional arguments.

()
**kwargs Any

Additional keyword arguments.

{}

Returns:

Type Description
bool

True if the object should be included, False otherwise.

Source code in zenml/zen_stores/migrations/alembic.py
def include_object(
    object: Any, name: str, type_: str, *args: Any, **kwargs: Any
) -> bool:
    """Function used to exclude tables from the migration scripts.

    Args:
        object: The schema item object to check.
        name: The name of the object to check.
        type_: The type of the object to check.
        *args: Additional arguments.
        **kwargs: Additional keyword arguments.

    Returns:
        True if the object should be included, False otherwise.
    """
    return not (type_ == "table" and name in exclude_tables)

rest_zen_store

REST Zen Store implementation.

RestZenStore (BaseZenStore) pydantic-model

Store implementation for accessing data from a REST API.

Source code in zenml/zen_stores/rest_zen_store.py
class RestZenStore(BaseZenStore):
    """Store implementation for accessing data from a REST API."""

    config: RestZenStoreConfiguration
    TYPE: ClassVar[StoreType] = StoreType.REST
    CONFIG_TYPE: ClassVar[Type[StoreConfiguration]] = RestZenStoreConfiguration
    _api_token: Optional[str] = None
    _session: Optional[requests.Session] = None

    def _initialize_database(self) -> None:
        """Initialize the database."""
        # don't do anything for a REST store

    # ====================================
    # ZenML Store interface implementation
    # ====================================

    # --------------------------------
    # Initialization and configuration
    # --------------------------------

    def _initialize(self) -> None:
        """Initialize the REST store."""
        client_version = zenml.__version__
        server_version = self.get_store_info().version

        if not DISABLE_CLIENT_SERVER_MISMATCH_WARNING and (
            server_version != client_version
        ):
            logger.warning(
                "Your ZenML client version (%s) does not match the server "
                "version (%s). This version mismatch might lead to errors or "
                "unexpected behavior. \nTo disable this warning message, set "
                "the environment variable `%s=True`",
                client_version,
                server_version,
                ENV_ZENML_DISABLE_CLIENT_SERVER_MISMATCH_WARNING,
            )

    def get_store_info(self) -> ServerModel:
        """Get information about the server.

        Returns:
            Information about the server.
        """
        body = self.get(INFO)
        return ServerModel.parse_obj(body)

    # ------
    # Stacks
    # ------

    @track(AnalyticsEvent.REGISTERED_STACK)
    def create_stack(self, stack: StackRequestModel) -> StackResponseModel:
        """Register a new stack.

        Args:
            stack: The stack to register.

        Returns:
            The registered stack.
        """
        return self._create_workspace_scoped_resource(
            resource=stack,
            route=STACKS,
            response_model=StackResponseModel,
        )

    def get_stack(self, stack_id: UUID) -> StackResponseModel:
        """Get a stack by its unique ID.

        Args:
            stack_id: The ID of the stack to get.

        Returns:
            The stack with the given ID.
        """
        return self._get_resource(
            resource_id=stack_id,
            route=STACKS,
            response_model=StackResponseModel,
        )

    def list_stacks(
        self, stack_filter_model: StackFilterModel
    ) -> Page[StackResponseModel]:
        """List all stacks matching the given filter criteria.

        Args:
            stack_filter_model: All filter parameters including pagination
                params.

        Returns:
            A list of all stacks matching the filter criteria.
        """
        return self._list_paginated_resources(
            route=STACKS,
            response_model=StackResponseModel,
            filter_model=stack_filter_model,
        )

    @track(AnalyticsEvent.UPDATED_STACK)
    def update_stack(
        self, stack_id: UUID, stack_update: StackUpdateModel
    ) -> StackResponseModel:
        """Update a stack.

        Args:
            stack_id: The ID of the stack update.
            stack_update: The update request on the stack.

        Returns:
            The updated stack.
        """
        return self._update_resource(
            resource_id=stack_id,
            resource_update=stack_update,
            route=STACKS,
            response_model=StackResponseModel,
        )

    @track(AnalyticsEvent.DELETED_STACK)
    def delete_stack(self, stack_id: UUID) -> None:
        """Delete a stack.

        Args:
            stack_id: The ID of the stack to delete.
        """
        self._delete_resource(
            resource_id=stack_id,
            route=STACKS,
        )

    # ----------------
    # Stack components
    # ----------------

    @track(AnalyticsEvent.REGISTERED_STACK_COMPONENT)
    def create_stack_component(
        self,
        component: ComponentRequestModel,
    ) -> ComponentResponseModel:
        """Create a stack component.

        Args:
            component: The stack component to create.

        Returns:
            The created stack component.
        """
        return self._create_workspace_scoped_resource(
            resource=component,
            route=STACK_COMPONENTS,
            response_model=ComponentResponseModel,
        )

    def get_stack_component(
        self, component_id: UUID
    ) -> ComponentResponseModel:
        """Get a stack component by ID.

        Args:
            component_id: The ID of the stack component to get.

        Returns:
            The stack component.
        """
        return self._get_resource(
            resource_id=component_id,
            route=STACK_COMPONENTS,
            response_model=ComponentResponseModel,
        )

    def list_stack_components(
        self, component_filter_model: ComponentFilterModel
    ) -> Page[ComponentResponseModel]:
        """List all stack components matching the given filter criteria.

        Args:
            component_filter_model: All filter parameters including pagination
                params.

        Returns:
            A list of all stack components matching the filter criteria.
        """
        return self._list_paginated_resources(
            route=STACK_COMPONENTS,
            response_model=ComponentResponseModel,
            filter_model=component_filter_model,
        )

    @track(AnalyticsEvent.UPDATED_STACK_COMPONENT)
    def update_stack_component(
        self,
        component_id: UUID,
        component_update: ComponentUpdateModel,
    ) -> ComponentResponseModel:
        """Update an existing stack component.

        Args:
            component_id: The ID of the stack component to update.
            component_update: The update to be applied to the stack component.

        Returns:
            The updated stack component.
        """
        return self._update_resource(
            resource_id=component_id,
            resource_update=component_update,
            route=STACK_COMPONENTS,
            response_model=ComponentResponseModel,
        )

    @track(AnalyticsEvent.DELETED_STACK_COMPONENT)
    def delete_stack_component(self, component_id: UUID) -> None:
        """Delete a stack component.

        Args:
            component_id: The ID of the stack component to delete.
        """
        self._delete_resource(
            resource_id=component_id,
            route=STACK_COMPONENTS,
        )

    # -----------------------
    # Stack component flavors
    # -----------------------

    @track(AnalyticsEvent.CREATED_FLAVOR)
    def create_flavor(self, flavor: FlavorRequestModel) -> FlavorResponseModel:
        """Creates a new stack component flavor.

        Args:
            flavor: The stack component flavor to create.

        Returns:
            The newly created flavor.
        """
        return self._create_resource(
            resource=flavor,
            route=FLAVORS,
            response_model=FlavorResponseModel,
        )

    def update_flavor(
        self, flavor_id: UUID, flavor_update: FlavorUpdateModel
    ) -> FlavorResponseModel:
        """Updates an existing user.

        Args:
            flavor_id: The id of the flavor to update.
            flavor_update: The update to be applied to the flavor.

        Returns:
            The updated flavor.
        """
        return self._update_resource(
            resource_id=flavor_id,
            resource_update=flavor_update,
            route=FLAVORS,
            response_model=FlavorResponseModel,
        )

    def get_flavor(self, flavor_id: UUID) -> FlavorResponseModel:
        """Get a stack component flavor by ID.

        Args:
            flavor_id: The ID of the stack component flavor to get.

        Returns:
            The stack component flavor.
        """
        return self._get_resource(
            resource_id=flavor_id,
            route=FLAVORS,
            response_model=FlavorResponseModel,
        )

    def list_flavors(
        self, flavor_filter_model: FlavorFilterModel
    ) -> Page[FlavorResponseModel]:
        """List all stack component flavors matching the given filter criteria.

        Args:
            flavor_filter_model: All filter parameters including pagination
                params

        Returns:
            List of all the stack component flavors matching the given criteria.
        """
        return self._list_paginated_resources(
            route=FLAVORS,
            response_model=FlavorResponseModel,
            filter_model=flavor_filter_model,
        )

    @track(AnalyticsEvent.DELETED_FLAVOR)
    def delete_flavor(self, flavor_id: UUID) -> None:
        """Delete a stack component flavor.

        Args:
            flavor_id: The ID of the stack component flavor to delete.
        """
        self._delete_resource(
            resource_id=flavor_id,
            route=FLAVORS,
        )

    # -----
    # Users
    # -----

    @track(AnalyticsEvent.CREATED_USER)
    def create_user(self, user: UserRequestModel) -> UserResponseModel:
        """Creates a new user.

        Args:
            user: User to be created.

        Returns:
            The newly created user.
        """
        return self._create_resource(
            resource=user,
            route=USERS + "?assign_default_role=False",
            response_model=UserResponseModel,
        )

    def get_user(
        self,
        user_name_or_id: Optional[Union[str, UUID]] = None,
        include_private: bool = False,
    ) -> UserResponseModel:
        """Gets a specific user, when no id is specified the active user is returned.

        The `include_private` parameter is ignored here as it is handled
        implicitly by the /current-user endpoint that is queried when no
        user_name_or_id is set. Raises a KeyError in case a user with that id
        does not exist.

        Args:
            user_name_or_id: The name or ID of the user to get.
            include_private: Whether to include private user information

        Returns:
            The requested user, if it was found.
        """
        if user_name_or_id:
            return self._get_resource(
                resource_id=user_name_or_id,
                route=USERS,
                response_model=UserResponseModel,
            )
        else:
            body = self.get(CURRENT_USER)
            return UserResponseModel.parse_obj(body)

    def list_users(
        self, user_filter_model: UserFilterModel
    ) -> Page[UserResponseModel]:
        """List all users.

        Args:
            user_filter_model: All filter parameters including pagination
                params.

        Returns:
            A list of all users.
        """
        return self._list_paginated_resources(
            route=USERS,
            response_model=UserResponseModel,
            filter_model=user_filter_model,
        )

    @track(AnalyticsEvent.UPDATED_USER)
    def update_user(
        self, user_id: UUID, user_update: UserUpdateModel
    ) -> UserResponseModel:
        """Updates an existing user.

        Args:
            user_id: The id of the user to update.
            user_update: The update to be applied to the user.

        Returns:
            The updated user.
        """
        return self._update_resource(
            resource_id=user_id,
            resource_update=user_update,
            route=USERS,
            response_model=UserResponseModel,
        )

    @track(AnalyticsEvent.DELETED_USER)
    def delete_user(self, user_name_or_id: Union[str, UUID]) -> None:
        """Deletes a user.

        Args:
            user_name_or_id: The name or ID of the user to delete.
        """
        self._delete_resource(
            resource_id=user_name_or_id,
            route=USERS,
        )

    # -----
    # Teams
    # -----

    @track(AnalyticsEvent.CREATED_TEAM)
    def create_team(self, team: TeamRequestModel) -> TeamResponseModel:
        """Creates a new team.

        Args:
            team: The team model to create.

        Returns:
            The newly created team.
        """
        return self._create_resource(
            resource=team,
            route=TEAMS,
            response_model=TeamResponseModel,
        )

    def get_team(self, team_name_or_id: Union[str, UUID]) -> TeamResponseModel:
        """Gets a specific team.

        Args:
            team_name_or_id: Name or ID of the team to get.

        Returns:
            The requested team.
        """
        return self._get_resource(
            resource_id=team_name_or_id,
            route=TEAMS,
            response_model=TeamResponseModel,
        )

    def list_teams(
        self, team_filter_model: TeamFilterModel
    ) -> Page[TeamResponseModel]:
        """List all teams matching the given filter criteria.

        Args:
            team_filter_model: All filter parameters including pagination
                params.

        Returns:
            A list of all teams matching the filter criteria.
        """
        return self._list_paginated_resources(
            route=TEAMS,
            response_model=TeamResponseModel,
            filter_model=team_filter_model,
        )

    @track(AnalyticsEvent.UPDATED_TEAM)
    def update_team(
        self, team_id: UUID, team_update: TeamUpdateModel
    ) -> TeamResponseModel:
        """Update an existing team.

        Args:
            team_id: The ID of the team to be updated.
            team_update: The update to be applied to the team.

        Returns:
            The updated team.
        """
        return self._update_resource(
            resource_id=team_id,
            resource_update=team_update,
            route=TEAMS,
            response_model=TeamResponseModel,
        )

    @track(AnalyticsEvent.DELETED_TEAM)
    def delete_team(self, team_name_or_id: Union[str, UUID]) -> None:
        """Deletes a team.

        Args:
            team_name_or_id: Name or ID of the team to delete.
        """
        self._delete_resource(
            resource_id=team_name_or_id,
            route=TEAMS,
        )

    # -----
    # Roles
    # -----

    @track(AnalyticsEvent.CREATED_ROLE)
    def create_role(self, role: RoleRequestModel) -> RoleResponseModel:
        """Creates a new role.

        Args:
            role: The role model to create.

        Returns:
            The newly created role.
        """
        return self._create_resource(
            resource=role,
            route=ROLES,
            response_model=RoleResponseModel,
        )

    def get_role(self, role_name_or_id: Union[str, UUID]) -> RoleResponseModel:
        """Gets a specific role.

        Args:
            role_name_or_id: Name or ID of the role to get.

        Returns:
            The requested role.
        """
        return self._get_resource(
            resource_id=role_name_or_id,
            route=ROLES,
            response_model=RoleResponseModel,
        )

    def list_roles(
        self, role_filter_model: RoleFilterModel
    ) -> Page[RoleResponseModel]:
        """List all roles matching the given filter criteria.

        Args:
            role_filter_model: All filter parameters including pagination
                params.

        Returns:
            A list of all roles matching the filter criteria.
        """
        return self._list_paginated_resources(
            route=ROLES,
            response_model=RoleResponseModel,
            filter_model=role_filter_model,
        )

    @track(AnalyticsEvent.UPDATED_ROLE)
    def update_role(
        self, role_id: UUID, role_update: RoleUpdateModel
    ) -> RoleResponseModel:
        """Update an existing role.

        Args:
            role_id: The ID of the role to be updated.
            role_update: The update to be applied to the role.

        Returns:
            The updated role.
        """
        return self._update_resource(
            resource_id=role_id,
            resource_update=role_update,
            route=ROLES,
            response_model=RoleResponseModel,
        )

    @track(AnalyticsEvent.DELETED_ROLE)
    def delete_role(self, role_name_or_id: Union[str, UUID]) -> None:
        """Deletes a role.

        Args:
            role_name_or_id: Name or ID of the role to delete.
        """
        self._delete_resource(
            resource_id=role_name_or_id,
            route=ROLES,
        )

    # ----------------
    # Role assignments
    # ----------------

    def list_user_role_assignments(
        self, user_role_assignment_filter_model: UserRoleAssignmentFilterModel
    ) -> Page[UserRoleAssignmentResponseModel]:
        """List all roles assignments matching the given filter criteria.

        Args:
            user_role_assignment_filter_model: All filter parameters including
                pagination params.

        Returns:
            A list of all roles assignments matching the filter criteria.
        """
        return self._list_paginated_resources(
            route=USER_ROLE_ASSIGNMENTS,
            response_model=UserRoleAssignmentResponseModel,
            filter_model=user_role_assignment_filter_model,
        )

    def get_user_role_assignment(
        self, user_role_assignment_id: UUID
    ) -> UserRoleAssignmentResponseModel:
        """Get an existing role assignment by name or ID.

        Args:
            user_role_assignment_id: Name or ID of the role assignment to get.

        Returns:
            The requested workspace.
        """
        return self._get_resource(
            resource_id=user_role_assignment_id,
            route=USER_ROLE_ASSIGNMENTS,
            response_model=UserRoleAssignmentResponseModel,
        )

    def delete_user_role_assignment(
        self, user_role_assignment_id: UUID
    ) -> None:
        """Delete a specific role assignment.

        Args:
            user_role_assignment_id: The ID of the specific role assignment
        """
        self._delete_resource(
            resource_id=user_role_assignment_id,
            route=USER_ROLE_ASSIGNMENTS,
        )

    def create_user_role_assignment(
        self, user_role_assignment: UserRoleAssignmentRequestModel
    ) -> UserRoleAssignmentResponseModel:
        """Creates a new role assignment.

        Args:
            user_role_assignment: The role assignment to create.

        Returns:
            The newly created workspace.
        """
        return self._create_resource(
            resource=user_role_assignment,
            route=USER_ROLE_ASSIGNMENTS,
            response_model=UserRoleAssignmentResponseModel,
        )

    # ---------------------
    # Team Role assignments
    # ---------------------

    def create_team_role_assignment(
        self, team_role_assignment: TeamRoleAssignmentRequestModel
    ) -> TeamRoleAssignmentResponseModel:
        """Creates a new team role assignment.

        Args:
            team_role_assignment: The role assignment model to create.

        Returns:
            The newly created role assignment.
        """
        return self._create_resource(
            resource=team_role_assignment,
            route=TEAM_ROLE_ASSIGNMENTS,
            response_model=TeamRoleAssignmentResponseModel,
        )

    def get_team_role_assignment(
        self, team_role_assignment_id: UUID
    ) -> TeamRoleAssignmentResponseModel:
        """Gets a specific role assignment.

        Args:
            team_role_assignment_id: ID of the role assignment to get.

        Returns:
            The requested role assignment.
        """
        return self._get_resource(
            resource_id=team_role_assignment_id,
            route=TEAM_ROLE_ASSIGNMENTS,
            response_model=TeamRoleAssignmentResponseModel,
        )

    def delete_team_role_assignment(
        self, team_role_assignment_id: UUID
    ) -> None:
        """Delete a specific role assignment.

        Args:
            team_role_assignment_id: The ID of the specific role assignment
        """
        self._delete_resource(
            resource_id=team_role_assignment_id,
            route=TEAM_ROLE_ASSIGNMENTS,
        )

    def list_team_role_assignments(
        self, team_role_assignment_filter_model: TeamRoleAssignmentFilterModel
    ) -> Page[TeamRoleAssignmentResponseModel]:
        """List all roles assignments matching the given filter criteria.

        Args:
            team_role_assignment_filter_model: All filter parameters including
                pagination params.

        Returns:
            A list of all roles assignments matching the filter criteria.
        """
        return self._list_paginated_resources(
            route=TEAM_ROLE_ASSIGNMENTS,
            response_model=TeamRoleAssignmentResponseModel,
            filter_model=team_role_assignment_filter_model,
        )

    # --------
    # Workspaces
    # --------

    @track(AnalyticsEvent.CREATED_WORKSPACE)
    def create_workspace(
        self, workspace: WorkspaceRequestModel
    ) -> WorkspaceResponseModel:
        """Creates a new workspace.

        Args:
            workspace: The workspace to create.

        Returns:
            The newly created workspace.
        """
        return self._create_resource(
            resource=workspace,
            route=WORKSPACES,
            response_model=WorkspaceResponseModel,
        )

    def get_workspace(
        self, workspace_name_or_id: Union[UUID, str]
    ) -> WorkspaceResponseModel:
        """Get an existing workspace by name or ID.

        Args:
            workspace_name_or_id: Name or ID of the workspace to get.

        Returns:
            The requested workspace.
        """
        return self._get_resource(
            resource_id=workspace_name_or_id,
            route=WORKSPACES,
            response_model=WorkspaceResponseModel,
        )

    def list_workspaces(
        self, workspace_filter_model: WorkspaceFilterModel
    ) -> Page[WorkspaceResponseModel]:
        """List all workspace matching the given filter criteria.

        Args:
            workspace_filter_model: All filter parameters including pagination
                params.

        Returns:
            A list of all workspace matching the filter criteria.
        """
        return self._list_paginated_resources(
            route=WORKSPACES,
            response_model=WorkspaceResponseModel,
            filter_model=workspace_filter_model,
        )

    @track(AnalyticsEvent.UPDATED_WORKSPACE)
    def update_workspace(
        self, workspace_id: UUID, workspace_update: WorkspaceUpdateModel
    ) -> WorkspaceResponseModel:
        """Update an existing workspace.

        Args:
            workspace_id: The ID of the workspace to be updated.
            workspace_update: The update to be applied to the workspace.

        Returns:
            The updated workspace.
        """
        return self._update_resource(
            resource_id=workspace_id,
            resource_update=workspace_update,
            route=WORKSPACES,
            response_model=WorkspaceResponseModel,
        )

    @track(AnalyticsEvent.DELETED_WORKSPACE)
    def delete_workspace(self, workspace_name_or_id: Union[str, UUID]) -> None:
        """Deletes a workspace.

        Args:
            workspace_name_or_id: Name or ID of the workspace to delete.
        """
        self._delete_resource(
            resource_id=workspace_name_or_id,
            route=WORKSPACES,
        )

    # ---------
    # Pipelines
    # ---------

    @track(AnalyticsEvent.CREATE_PIPELINE)
    def create_pipeline(
        self, pipeline: PipelineRequestModel
    ) -> PipelineResponseModel:
        """Creates a new pipeline in a workspace.

        Args:
            pipeline: The pipeline to create.

        Returns:
            The newly created pipeline.
        """
        return self._create_workspace_scoped_resource(
            resource=pipeline,
            route=PIPELINES,
            response_model=PipelineResponseModel,
        )

    def get_pipeline(self, pipeline_id: UUID) -> PipelineResponseModel:
        """Get a pipeline with a given ID.

        Args:
            pipeline_id: ID of the pipeline.

        Returns:
            The pipeline.
        """
        return self._get_resource(
            resource_id=pipeline_id,
            route=PIPELINES,
            response_model=PipelineResponseModel,
        )

    def list_pipelines(
        self, pipeline_filter_model: PipelineFilterModel
    ) -> Page[PipelineResponseModel]:
        """List all pipelines matching the given filter criteria.

        Args:
            pipeline_filter_model: All filter parameters including pagination
                params.

        Returns:
            A list of all pipelines matching the filter criteria.
        """
        return self._list_paginated_resources(
            route=PIPELINES,
            response_model=PipelineResponseModel,
            filter_model=pipeline_filter_model,
        )

    @track(AnalyticsEvent.UPDATE_PIPELINE)
    def update_pipeline(
        self, pipeline_id: UUID, pipeline_update: PipelineUpdateModel
    ) -> PipelineResponseModel:
        """Updates a pipeline.

        Args:
            pipeline_id: The ID of the pipeline to be updated.
            pipeline_update: The update to be applied.

        Returns:
            The updated pipeline.
        """
        return self._update_resource(
            resource_id=pipeline_id,
            resource_update=pipeline_update,
            route=PIPELINES,
            response_model=PipelineResponseModel,
        )

    @track(AnalyticsEvent.DELETE_PIPELINE)
    def delete_pipeline(self, pipeline_id: UUID) -> None:
        """Deletes a pipeline.

        Args:
            pipeline_id: The ID of the pipeline to delete.
        """
        self._delete_resource(
            resource_id=pipeline_id,
            route=PIPELINES,
        )

    # ---------
    # Builds
    # ---------

    def create_build(
        self,
        build: PipelineBuildRequestModel,
    ) -> PipelineBuildResponseModel:
        """Creates a new build in a workspace.

        Args:
            build: The build to create.

        Returns:
            The newly created build.
        """
        return self._create_workspace_scoped_resource(
            resource=build,
            route=PIPELINE_BUILDS,
            response_model=PipelineBuildResponseModel,
        )

    def get_build(self, build_id: UUID) -> PipelineBuildResponseModel:
        """Get a build with a given ID.

        Args:
            build_id: ID of the build.

        Returns:
            The build.
        """
        return self._get_resource(
            resource_id=build_id,
            route=PIPELINE_BUILDS,
            response_model=PipelineBuildResponseModel,
        )

    def list_builds(
        self, build_filter_model: PipelineBuildFilterModel
    ) -> Page[PipelineBuildResponseModel]:
        """List all builds matching the given filter criteria.

        Args:
            build_filter_model: All filter parameters including pagination
                params.

        Returns:
            A page of all builds matching the filter criteria.
        """
        return self._list_paginated_resources(
            route=PIPELINE_BUILDS,
            response_model=PipelineBuildResponseModel,
            filter_model=build_filter_model,
        )

    def delete_build(self, build_id: UUID) -> None:
        """Deletes a build.

        Args:
            build_id: The ID of the build to delete.
        """
        self._delete_resource(
            resource_id=build_id,
            route=PIPELINE_BUILDS,
        )

    # ----------------------
    # Pipeline Deployments
    # ----------------------

    def create_deployment(
        self,
        deployment: PipelineDeploymentRequestModel,
    ) -> PipelineDeploymentResponseModel:
        """Creates a new deployment in a workspace.

        Args:
            deployment: The deployment to create.

        Returns:
            The newly created deployment.
        """
        return self._create_workspace_scoped_resource(
            resource=deployment,
            route=PIPELINE_DEPLOYMENTS,
            response_model=PipelineDeploymentResponseModel,
        )

    def get_deployment(
        self, deployment_id: UUID
    ) -> PipelineDeploymentResponseModel:
        """Get a deployment with a given ID.

        Args:
            deployment_id: ID of the deployment.

        Returns:
            The deployment.
        """
        return self._get_resource(
            resource_id=deployment_id,
            route=PIPELINE_DEPLOYMENTS,
            response_model=PipelineDeploymentResponseModel,
        )

    def list_deployments(
        self, deployment_filter_model: PipelineDeploymentFilterModel
    ) -> Page[PipelineDeploymentResponseModel]:
        """List all deployments matching the given filter criteria.

        Args:
            deployment_filter_model: All filter parameters including pagination
                params.

        Returns:
            A page of all deployments matching the filter criteria.
        """
        return self._list_paginated_resources(
            route=PIPELINE_DEPLOYMENTS,
            response_model=PipelineDeploymentResponseModel,
            filter_model=deployment_filter_model,
        )

    def delete_deployment(self, deployment_id: UUID) -> None:
        """Deletes a deployment.

        Args:
            deployment_id: The ID of the deployment to delete.
        """
        self._delete_resource(
            resource_id=deployment_id,
            route=PIPELINE_DEPLOYMENTS,
        )

    # ---------
    # Schedules
    # ---------

    def create_schedule(
        self, schedule: ScheduleRequestModel
    ) -> ScheduleResponseModel:
        """Creates a new schedule.

        Args:
            schedule: The schedule to create.

        Returns:
            The newly created schedule.
        """
        return self._create_workspace_scoped_resource(
            resource=schedule,
            route=SCHEDULES,
            response_model=ScheduleResponseModel,
        )

    def get_schedule(self, schedule_id: UUID) -> ScheduleResponseModel:
        """Get a schedule with a given ID.

        Args:
            schedule_id: ID of the schedule.

        Returns:
            The schedule.
        """
        return self._get_resource(
            resource_id=schedule_id,
            route=SCHEDULES,
            response_model=ScheduleResponseModel,
        )

    def list_schedules(
        self, schedule_filter_model: ScheduleFilterModel
    ) -> Page[ScheduleResponseModel]:
        """List all schedules in the workspace.

        Args:
            schedule_filter_model: All filter parameters including pagination
                params

        Returns:
            A list of schedules.
        """
        return self._list_paginated_resources(
            route=SCHEDULES,
            response_model=ScheduleResponseModel,
            filter_model=schedule_filter_model,
        )

    def update_schedule(
        self,
        schedule_id: UUID,
        schedule_update: ScheduleUpdateModel,
    ) -> ScheduleResponseModel:
        """Updates a schedule.

        Args:
            schedule_id: The ID of the schedule to be updated.
            schedule_update: The update to be applied.

        Returns:
            The updated schedule.
        """
        return self._update_resource(
            resource_id=schedule_id,
            resource_update=schedule_update,
            route=SCHEDULES,
            response_model=ScheduleResponseModel,
        )

    def delete_schedule(self, schedule_id: UUID) -> None:
        """Deletes a schedule.

        Args:
            schedule_id: The ID of the schedule to delete.
        """
        self._delete_resource(
            resource_id=schedule_id,
            route=SCHEDULES,
        )

    # --------------
    # Pipeline runs
    # --------------

    def create_run(
        self, pipeline_run: PipelineRunRequestModel
    ) -> PipelineRunResponseModel:
        """Creates a pipeline run.

        Args:
            pipeline_run: The pipeline run to create.

        Returns:
            The created pipeline run.
        """
        return self._create_workspace_scoped_resource(
            resource=pipeline_run,
            response_model=PipelineRunResponseModel,
            route=RUNS,
        )

    def get_run(
        self, run_name_or_id: Union[UUID, str]
    ) -> PipelineRunResponseModel:
        """Gets a pipeline run.

        Args:
            run_name_or_id: The name or ID of the pipeline run to get.

        Returns:
            The pipeline run.
        """
        return self._get_resource(
            resource_id=run_name_or_id,
            route=RUNS,
            response_model=PipelineRunResponseModel,
        )

    def get_or_create_run(
        self, pipeline_run: PipelineRunRequestModel
    ) -> Tuple[PipelineRunResponseModel, bool]:
        """Gets or creates a pipeline run.

        If a run with the same ID or name already exists, it is returned.
        Otherwise, a new run is created.

        Args:
            pipeline_run: The pipeline run to get or create.

        Returns:
            The pipeline run, and a boolean indicating whether the run was
            created or not.
        """
        return self._get_or_create_workspace_scoped_resource(
            resource=pipeline_run,
            route=RUNS,
            response_model=PipelineRunResponseModel,
        )

    def list_runs(
        self, runs_filter_model: PipelineRunFilterModel
    ) -> Page[PipelineRunResponseModel]:
        """List all pipeline runs matching the given filter criteria.

        Args:
            runs_filter_model: All filter parameters including pagination
                params.

        Returns:
            A list of all pipeline runs matching the filter criteria.
        """
        return self._list_paginated_resources(
            route=RUNS,
            response_model=PipelineRunResponseModel,
            filter_model=runs_filter_model,
        )

    def update_run(
        self, run_id: UUID, run_update: PipelineRunUpdateModel
    ) -> PipelineRunResponseModel:
        """Updates a pipeline run.

        Args:
            run_id: The ID of the pipeline run to update.
            run_update: The update to be applied to the pipeline run.


        Returns:
            The updated pipeline run.
        """
        return self._update_resource(
            resource_id=run_id,
            resource_update=run_update,
            response_model=PipelineRunResponseModel,
            route=RUNS,
        )

    def delete_run(self, run_id: UUID) -> None:
        """Deletes a pipeline run.

        Args:
            run_id: The ID of the pipeline run to delete.
        """
        self._delete_resource(
            resource_id=run_id,
            route=RUNS,
        )

    # ------------------
    # Pipeline run steps
    # ------------------

    def create_run_step(
        self, step_run: StepRunRequestModel
    ) -> StepRunResponseModel:
        """Creates a step run.

        Args:
            step_run: The step run to create.

        Returns:
            The created step run.
        """
        return self._create_resource(
            resource=step_run,
            response_model=StepRunResponseModel,
            route=STEPS,
        )

    def get_run_step(self, step_run_id: UUID) -> StepRunResponseModel:
        """Get a step run by ID.

        Args:
            step_run_id: The ID of the step run to get.

        Returns:
            The step run.
        """
        return self._get_resource(
            resource_id=step_run_id,
            route=STEPS,
            response_model=StepRunResponseModel,
        )

    def list_run_steps(
        self, step_run_filter_model: StepRunFilterModel
    ) -> Page[StepRunResponseModel]:
        """List all step runs matching the given filter criteria.

        Args:
            step_run_filter_model: All filter parameters including pagination
                params.

        Returns:
            A list of all step runs matching the filter criteria.
        """
        return self._list_paginated_resources(
            route=STEPS,
            response_model=StepRunResponseModel,
            filter_model=step_run_filter_model,
        )

    def update_run_step(
        self,
        step_run_id: UUID,
        step_run_update: StepRunUpdateModel,
    ) -> StepRunResponseModel:
        """Updates a step run.

        Args:
            step_run_id: The ID of the step to update.
            step_run_update: The update to be applied to the step.

        Returns:
            The updated step run.
        """
        return self._update_resource(
            resource_id=step_run_id,
            resource_update=step_run_update,
            response_model=StepRunResponseModel,
            route=STEPS,
        )

    # ---------
    # Artifacts
    # ---------

    def create_artifact(
        self, artifact: ArtifactRequestModel
    ) -> ArtifactResponseModel:
        """Creates an artifact.

        Args:
            artifact: The artifact to create.

        Returns:
            The created artifact.
        """
        return self._create_resource(
            resource=artifact,
            response_model=ArtifactResponseModel,
            route=ARTIFACTS,
        )

    def get_artifact(self, artifact_id: UUID) -> ArtifactResponseModel:
        """Gets an artifact.

        Args:
            artifact_id: The ID of the artifact to get.

        Returns:
            The artifact.
        """
        return self._get_resource(
            resource_id=artifact_id,
            route=ARTIFACTS,
            response_model=ArtifactResponseModel,
        )

    def list_artifacts(
        self, artifact_filter_model: ArtifactFilterModel
    ) -> Page[ArtifactResponseModel]:
        """List all artifacts matching the given filter criteria.

        Args:
            artifact_filter_model: All filter parameters including pagination
                params.

        Returns:
            A list of all artifacts matching the filter criteria.
        """
        return self._list_paginated_resources(
            route=ARTIFACTS,
            response_model=ArtifactResponseModel,
            filter_model=artifact_filter_model,
        )

    def delete_artifact(self, artifact_id: UUID) -> None:
        """Deletes an artifact.

        Args:
            artifact_id: The ID of the artifact to delete.
        """
        self._delete_resource(resource_id=artifact_id, route=ARTIFACTS)

    # ------------
    # Run Metadata
    # ------------

    def create_run_metadata(
        self, run_metadata: RunMetadataRequestModel
    ) -> RunMetadataResponseModel:
        """Creates run metadata.

        Args:
            run_metadata: The run metadata to create.

        Returns:
            The created run metadata.
        """
        return self._create_workspace_scoped_resource(
            resource=run_metadata,
            response_model=RunMetadataResponseModel,
            route=RUN_METADATA,
        )

    def list_run_metadata(
        self,
        run_metadata_filter_model: RunMetadataFilterModel,
    ) -> Page[RunMetadataResponseModel]:
        """List run metadata.

        Args:
            run_metadata_filter_model: All filter parameters including
                pagination params.

        Returns:
            The run metadata.
        """
        return self._list_paginated_resources(
            route=RUN_METADATA,
            response_model=RunMetadataResponseModel,
            filter_model=run_metadata_filter_model,
        )

    # -----------------
    # Code Repositories
    # -----------------

    def create_code_repository(
        self, code_repository: CodeRepositoryRequestModel
    ) -> CodeRepositoryResponseModel:
        """Creates a new code repository.

        Args:
            code_repository: Code repository to be created.

        Returns:
            The newly created code repository.
        """
        return self._create_workspace_scoped_resource(
            resource=code_repository,
            response_model=CodeRepositoryResponseModel,
            route=CODE_REPOSITORIES,
        )

    def get_code_repository(
        self, code_repository_id: UUID
    ) -> CodeRepositoryResponseModel:
        """Gets a specific code repository.

        Args:
            code_repository_id: The ID of the code repository to get.

        Returns:
            The requested code repository, if it was found.
        """
        return self._get_resource(
            resource_id=code_repository_id,
            route=CODE_REPOSITORIES,
            response_model=CodeRepositoryResponseModel,
        )

    def list_code_repositories(
        self, filter_model: CodeRepositoryFilterModel
    ) -> Page[CodeRepositoryResponseModel]:
        """List all code repositories.

        Args:
            filter_model: All filter parameters including pagination
                params.

        Returns:
            A page of all code repositories.
        """
        return self._list_paginated_resources(
            route=CODE_REPOSITORIES,
            response_model=CodeRepositoryResponseModel,
            filter_model=filter_model,
        )

    def update_code_repository(
        self, code_repository_id: UUID, update: CodeRepositoryUpdateModel
    ) -> CodeRepositoryResponseModel:
        """Updates an existing code repository.

        Args:
            code_repository_id: The ID of the code repository to update.
            update: The update to be applied to the code repository.

        Returns:
            The updated code repository.
        """
        return self._update_resource(
            resource_id=code_repository_id,
            resource_update=update,
            response_model=CodeRepositoryResponseModel,
            route=CODE_REPOSITORIES,
        )

    def delete_code_repository(self, code_repository_id: UUID) -> None:
        """Deletes a code repository.

        Args:
            code_repository_id: The ID of the code repository to delete.
        """
        self._delete_resource(
            resource_id=code_repository_id, route=CODE_REPOSITORIES
        )

    # ------------------
    # Service Connectors
    # ------------------

    def _populate_connector_type(
        self,
        *connector_models: Union[
            ServiceConnectorResponseModel, ServiceConnectorResourcesModel
        ],
    ) -> None:
        """Populates or updates the connector type of the given connector or resource models.

        If the connector type is not locally available, the connector type
        field is left as is. The local and remote flags of the connector type
        are updated accordingly.

        Args:
            connector_models: The service connector or resource models to
                populate.
        """
        for service_connector in connector_models:
            # Mark the remote connector type as being only remotely available
            if not isinstance(service_connector.connector_type, str):
                service_connector.connector_type.local = False
                service_connector.connector_type.remote = True

            if not service_connector_registry.is_registered(
                service_connector.type
            ):
                continue

            connector_type = (
                service_connector_registry.get_service_connector_type(
                    service_connector.type
                )
            )
            connector_type.local = True
            if not isinstance(service_connector.connector_type, str):
                connector_type.remote = True
            service_connector.connector_type = connector_type

    def create_service_connector(
        self, service_connector: ServiceConnectorRequestModel
    ) -> ServiceConnectorResponseModel:
        """Creates a new service connector.

        Args:
            service_connector: Service connector to be created.

        Returns:
            The newly created service connector.
        """
        connector_model = self._create_workspace_scoped_resource(
            resource=service_connector,
            route=SERVICE_CONNECTORS,
            response_model=ServiceConnectorResponseModel,
        )
        self._populate_connector_type(connector_model)
        return connector_model

    def get_service_connector(
        self, service_connector_id: UUID
    ) -> ServiceConnectorResponseModel:
        """Gets a specific service connector.

        Args:
            service_connector_id: The ID of the service connector to get.

        Returns:
            The requested service connector, if it was found.
        """
        connector_model = self._get_resource(
            resource_id=service_connector_id,
            route=SERVICE_CONNECTORS,
            response_model=ServiceConnectorResponseModel,
            params={"expand_secrets": False},
        )
        self._populate_connector_type(connector_model)
        return connector_model

    def list_service_connectors(
        self, filter_model: ServiceConnectorFilterModel
    ) -> Page[ServiceConnectorResponseModel]:
        """List all service connectors.

        Args:
            filter_model: All filter parameters including pagination
                params.

        Returns:
            A page of all service connectors.
        """
        connector_models = self._list_paginated_resources(
            route=SERVICE_CONNECTORS,
            response_model=ServiceConnectorResponseModel,
            filter_model=filter_model,
            params={"expand_secrets": False},
        )
        self._populate_connector_type(*connector_models.items)
        return connector_models

    def update_service_connector(
        self, service_connector_id: UUID, update: ServiceConnectorUpdateModel
    ) -> ServiceConnectorResponseModel:
        """Updates an existing service connector.

        The update model contains the fields to be updated. If a field value is
        set to None in the model, the field is not updated, but there are
        special rules concerning some fields:

        * the `configuration` and `secrets` fields together represent a full
        valid configuration update, not just a partial update. If either is
        set (i.e. not None) in the update, their values are merged together and
        will replace the existing configuration and secrets values.
        * the `resource_id` field value is also a full replacement value: if set
        to `None`, the resource ID is removed from the service connector.
        * the `expiration_seconds` field value is also a full replacement value:
        if set to `None`, the expiration is removed from the service connector.
        * the `secret_id` field value in the update is ignored, given that
        secrets are managed internally by the ZenML store.
        * the `labels` field is also a full labels update: if set (i.e. not
        `None`), all existing labels are removed and replaced by the new labels
        in the update.

        Args:
            service_connector_id: The ID of the service connector to update.
            update: The update to be applied to the service connector.

        Returns:
            The updated service connector.
        """
        connector_model = self._update_resource(
            resource_id=service_connector_id,
            resource_update=update,
            response_model=ServiceConnectorResponseModel,
            route=SERVICE_CONNECTORS,
        )
        self._populate_connector_type(connector_model)
        return connector_model

    def delete_service_connector(self, service_connector_id: UUID) -> None:
        """Deletes a service connector.

        Args:
            service_connector_id: The ID of the service connector to delete.
        """
        self._delete_resource(
            resource_id=service_connector_id, route=SERVICE_CONNECTORS
        )

    def verify_service_connector_config(
        self,
        service_connector: ServiceConnectorRequestModel,
        list_resources: bool = True,
    ) -> ServiceConnectorResourcesModel:
        """Verifies if a service connector configuration has access to resources.

        Args:
            service_connector: The service connector configuration to verify.
            list_resources: If True, the list of all resources accessible
                through the service connector and matching the supplied resource
                type and ID are returned.

        Returns:
            The list of resources that the service connector configuration has
            access to.
        """
        response_body = self.post(
            f"{SERVICE_CONNECTORS}{SERVICE_CONNECTOR_VERIFY}",
            body=service_connector,
            params={"list_resources": list_resources},
        )

        resources = ServiceConnectorResourcesModel.parse_obj(response_body)
        self._populate_connector_type(resources)
        return resources

    def verify_service_connector(
        self,
        service_connector_id: UUID,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
        list_resources: bool = True,
    ) -> ServiceConnectorResourcesModel:
        """Verifies if a service connector instance has access to one or more resources.

        Args:
            service_connector_id: The ID of the service connector to verify.
            resource_type: The type of resource to verify access to.
            resource_id: The ID of the resource to verify access to.
            list_resources: If True, the list of all resources accessible
                through the service connector and matching the supplied resource
                type and ID are returned.

        Returns:
            The list of resources that the service connector has access to,
            scoped to the supplied resource type and ID, if provided.
        """
        params: Dict[str, Any] = {"list_resources": list_resources}
        if resource_type:
            params["resource_type"] = resource_type
        if resource_id:
            params["resource_id"] = resource_id
        response_body = self.put(
            f"{SERVICE_CONNECTORS}/{str(service_connector_id)}{SERVICE_CONNECTOR_VERIFY}",
            params=params,
        )

        resources = ServiceConnectorResourcesModel.parse_obj(response_body)
        self._populate_connector_type(resources)
        return resources

    def get_service_connector_client(
        self,
        service_connector_id: UUID,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
    ) -> ServiceConnectorResponseModel:
        """Get a service connector client for a service connector and given resource.

        Args:
            service_connector_id: The ID of the base service connector to use.
            resource_type: The type of resource to get a client for.
            resource_id: The ID of the resource to get a client for.

        Returns:
            A service connector client that can be used to access the given
            resource.
        """
        params = {}
        if resource_type:
            params["resource_type"] = resource_type
        if resource_id:
            params["resource_id"] = resource_id
        response_body = self.get(
            f"{SERVICE_CONNECTORS}/{str(service_connector_id)}{SERVICE_CONNECTOR_CLIENT}",
            params=params,
        )

        connector = ServiceConnectorResponseModel.parse_obj(response_body)
        self._populate_connector_type(connector)
        return connector

    def list_service_connector_resources(
        self,
        user_name_or_id: Union[str, UUID],
        workspace_name_or_id: Union[str, UUID],
        connector_type: Optional[str] = None,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
    ) -> List[ServiceConnectorResourcesModel]:
        """List resources that can be accessed by service connectors.

        Args:
            user_name_or_id: The name or ID of the user to scope to.
            workspace_name_or_id: The name or ID of the workspace to scope to.
            connector_type: The type of service connector to scope to.
            resource_type: The type of resource to scope to.
            resource_id: The ID of the resource to scope to.

        Returns:
            The matching list of resources that available service
            connectors have access to.
        """
        params = {}
        if connector_type:
            params["connector_type"] = connector_type
        if resource_type:
            params["resource_type"] = resource_type
        if resource_id:
            params["resource_id"] = resource_id
        response_body = self.get(
            f"{WORKSPACES}/{workspace_name_or_id}{SERVICE_CONNECTORS}{SERVICE_CONNECTOR_RESOURCES}",
            params=params,
        )

        assert isinstance(response_body, list)
        resource_list = [
            ServiceConnectorResourcesModel.parse_obj(item)
            for item in response_body
        ]

        self._populate_connector_type(*resource_list)

        # For service connectors with types that are only locally available,
        # we need to retrieve the resource list locally
        for idx, resources in enumerate(resource_list):
            if isinstance(resources.connector_type, str):
                # Skip connector types that are neither locally nor remotely
                # available
                continue
            if resources.connector_type.remote:
                # Skip connector types that are remotely available
                continue

            # Retrieve the resource list locally
            assert resources.id is not None
            connector = self.get_service_connector(resources.id)
            connector_instance = (
                service_connector_registry.instantiate_connector(
                    model=connector
                )
            )

            try:
                local_resources = connector_instance.verify(
                    resource_type=resource_type,
                    resource_id=resource_id,
                )
            except (ValueError, AuthorizationException) as e:
                logger.error(
                    f'Failed to fetch {resource_type or "available"} '
                    f"resources from service connector {connector.name}/"
                    f"{connector.id}: {e}"
                )
                continue

            resource_list[idx] = local_resources

        return resource_list

    def list_service_connector_types(
        self,
        connector_type: Optional[str] = None,
        resource_type: Optional[str] = None,
        auth_method: Optional[str] = None,
    ) -> List[ServiceConnectorTypeModel]:
        """Get a list of service connector types.

        Args:
            connector_type: Filter by connector type.
            resource_type: Filter by resource type.
            auth_method: Filter by authentication method.

        Returns:
            List of service connector types.
        """
        params = {}
        if connector_type:
            params["connector_type"] = connector_type
        if resource_type:
            params["resource_type"] = resource_type
        if auth_method:
            params["auth_method"] = auth_method
        response_body = self.get(
            SERVICE_CONNECTOR_TYPES,
            params=params,
        )

        assert isinstance(response_body, list)
        remote_connector_types = [
            ServiceConnectorTypeModel.parse_obj(item) for item in response_body
        ]

        # Mark the remote connector types as being only remotely available
        for c in remote_connector_types:
            c.local = False
            c.remote = True

        local_connector_types = (
            service_connector_registry.list_service_connector_types(
                connector_type=connector_type,
                resource_type=resource_type,
                auth_method=auth_method,
            )
        )

        # Add the connector types in the local registry to the list of
        # connector types available remotely. Overwrite those that have
        # the same connector type but mark them as being remotely available.
        connector_types_map = {
            connector_type.connector_type: connector_type
            for connector_type in remote_connector_types
        }

        for connector in local_connector_types:
            if connector.connector_type in connector_types_map:
                connector.remote = True
            connector_types_map[connector.connector_type] = connector

        return list(connector_types_map.values())

    def get_service_connector_type(
        self,
        connector_type: str,
    ) -> ServiceConnectorTypeModel:
        """Returns the requested service connector type.

        Args:
            connector_type: the service connector type identifier.

        Returns:
            The requested service connector type.
        """
        # Use the local registry to get the service connector type, if it
        # exists.
        local_connector_type: Optional[ServiceConnectorTypeModel] = None
        if service_connector_registry.is_registered(connector_type):
            local_connector_type = (
                service_connector_registry.get_service_connector_type(
                    connector_type
                )
            )
        try:
            response_body = self.get(
                f"{SERVICE_CONNECTOR_TYPES}/{connector_type}",
            )
            remote_connector_type = ServiceConnectorTypeModel.parse_obj(
                response_body
            )
            if local_connector_type:
                # If locally available, return the local connector type but
                # mark it as being remotely available.
                local_connector_type.remote = True
                return local_connector_type

            # Mark the remote connector type as being only remotely available
            remote_connector_type.local = False
            remote_connector_type.remote = True

            return remote_connector_type
        except KeyError:
            # If the service connector type is not found, check the local
            # registry.
            return service_connector_registry.get_service_connector_type(
                connector_type
            )

    # =======================
    # Internal helper methods
    # =======================

    def _get_auth_token(self) -> str:
        """Get the authentication token for the REST store.

        Returns:
            The authentication token.

        Raises:
            ValueError: if the response from the server isn't in the right
                format.
        """
        if self._api_token is None:
            # Check if the API token is already stored in the config
            if self.config.api_token:
                self._api_token = self.config.api_token
            # Check if the username and password are provided in the config
            elif (
                self.config.username is not None
                and self.config.password is not None
            ):
                response = self._handle_response(
                    requests.post(
                        self.url + API + VERSION_1 + LOGIN,
                        data={
                            "username": self.config.username,
                            "password": self.config.password,
                        },
                        verify=self.config.verify_ssl,
                        timeout=self.config.http_timeout,
                    )
                )
                if (
                    not isinstance(response, dict)
                    or "access_token" not in response
                ):
                    raise ValueError(
                        f"Bad API Response. Expected access token dict, got "
                        f"{type(response)}"
                    )
                self._api_token = response["access_token"]
                self.config.api_token = self._api_token
            else:
                raise ValueError(
                    "No API token or username/password provided. Please "
                    "provide either a token or a username and password in "
                    "the ZenStore config."
                )
        return self._api_token

    @property
    def session(self) -> requests.Session:
        """Authenticate to the ZenML server.

        Returns:
            A requests session with the authentication token.
        """
        if self._session is None:
            if self.config.verify_ssl is False:
                urllib3.disable_warnings(
                    urllib3.exceptions.InsecureRequestWarning
                )

            self._session = requests.Session()
            self._session.verify = self.config.verify_ssl
            token = self._get_auth_token()
            self._session.headers.update({"Authorization": "Bearer " + token})
            logger.debug("Authenticated to ZenML server.")
        return self._session

    @staticmethod
    def _handle_response(response: requests.Response) -> Json:
        """Handle API response, translating http status codes to Exception.

        Args:
            response: The response to handle.

        Returns:
            The parsed response.

        Raises:
            ValueError: if the response is not in the right format.
            RuntimeError: if an error response is received from the server
                and a more specific exception cannot be determined.
            exc: the exception converted from an error response, if one
                is returned from the server.
        """
        if 200 <= response.status_code < 300:
            try:
                payload: Json = response.json()
                return payload
            except requests.exceptions.JSONDecodeError:
                raise ValueError(
                    "Bad response from API. Expected json, got\n"
                    f"{response.text}"
                )
        elif response.status_code >= 400:
            exc = exception_from_response(response)
            if exc is not None:
                raise exc
            else:
                raise RuntimeError(
                    f"{response.status_code} HTTP Error received from server: "
                    f"{response.text}"
                )
        else:
            raise RuntimeError(
                "Error retrieving from API. Got response "
                f"{response.status_code} with body:\n{response.text}"
            )

    def _request(
        self,
        method: str,
        url: str,
        params: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> Json:
        """Make a request to the REST API.

        Args:
            method: The HTTP method to use.
            url: The URL to request.
            params: The query parameters to pass to the endpoint.
            kwargs: Additional keyword arguments to pass to the request.

        Returns:
            The parsed response.
        """
        params = {k: str(v) for k, v in params.items()} if params else {}

        self.session.headers.update(
            {source_context.name: source_context.get().value}
        )

        try:
            return self._handle_response(
                self.session.request(
                    method,
                    url,
                    params=params,
                    verify=self.config.verify_ssl,
                    timeout=self.config.http_timeout,
                    **kwargs,
                )
            )
        except AuthorizationException:
            # The authentication token could have expired; refresh it and try
            # again
            self._session = None
            return self._handle_response(
                self.session.request(
                    method,
                    url,
                    params=params,
                    verify=self.config.verify_ssl,
                    timeout=self.config.http_timeout,
                    **kwargs,
                )
            )

    def get(
        self, path: str, params: Optional[Dict[str, Any]] = None, **kwargs: Any
    ) -> Json:
        """Make a GET request to the given endpoint path.

        Args:
            path: The path to the endpoint.
            params: The query parameters to pass to the endpoint.
            kwargs: Additional keyword arguments to pass to the request.

        Returns:
            The response body.
        """
        logger.debug(f"Sending GET request to {path}...")
        return self._request(
            "GET", self.url + API + VERSION_1 + path, params=params, **kwargs
        )

    def delete(
        self, path: str, params: Optional[Dict[str, Any]] = None, **kwargs: Any
    ) -> Json:
        """Make a DELETE request to the given endpoint path.

        Args:
            path: The path to the endpoint.
            params: The query parameters to pass to the endpoint.
            kwargs: Additional keyword arguments to pass to the request.

        Returns:
            The response body.
        """
        logger.debug(f"Sending DELETE request to {path}...")
        return self._request(
            "DELETE",
            self.url + API + VERSION_1 + path,
            params=params,
            **kwargs,
        )

    def post(
        self,
        path: str,
        body: BaseModel,
        params: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> Json:
        """Make a POST request to the given endpoint path.

        Args:
            path: The path to the endpoint.
            body: The body to send.
            params: The query parameters to pass to the endpoint.
            kwargs: Additional keyword arguments to pass to the request.

        Returns:
            The response body.
        """
        logger.debug(f"Sending POST request to {path}...")
        return self._request(
            "POST",
            self.url + API + VERSION_1 + path,
            data=body.json(),
            params=params,
            **kwargs,
        )

    def put(
        self,
        path: str,
        body: Optional[BaseModel] = None,
        params: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> Json:
        """Make a PUT request to the given endpoint path.

        Args:
            path: The path to the endpoint.
            body: The body to send.
            params: The query parameters to pass to the endpoint.
            kwargs: Additional keyword arguments to pass to the request.

        Returns:
            The response body.
        """
        logger.debug(f"Sending PUT request to {path}...")
        data = body.json(exclude_unset=True) if body else None
        return self._request(
            "PUT",
            self.url + API + VERSION_1 + path,
            data=data,
            params=params,
            **kwargs,
        )

    def _create_resource(
        self,
        resource: BaseRequestModel,
        response_model: Type[AnyResponseModel],
        route: str,
        params: Optional[Dict[str, Any]] = None,
    ) -> AnyResponseModel:
        """Create a new resource.

        Args:
            resource: The resource to create.
            route: The resource REST API route to use.
            response_model: Optional model to use to deserialize the response
                body. If not provided, the resource class itself will be used.
            params: Optional query parameters to pass to the endpoint.

        Returns:
            The created resource.
        """
        response_body = self.post(f"{route}", body=resource, params=params)
        return response_model.parse_obj(response_body)

    def _create_workspace_scoped_resource(
        self,
        resource: WorkspaceScopedRequestModel,
        response_model: Type[AnyResponseModel],
        route: str,
        params: Optional[Dict[str, Any]] = None,
    ) -> AnyResponseModel:
        """Create a new workspace scoped resource.

        Args:
            resource: The resource to create.
            route: The resource REST API route to use.
            response_model: Optional model to use to deserialize the response
                body. If not provided, the resource class itself will be used.
            params: Optional query parameters to pass to the endpoint.

        Returns:
            The created resource.
        """
        return self._create_resource(
            resource=resource,
            response_model=response_model,
            route=f"{WORKSPACES}/{str(resource.workspace)}{route}",
            params=params,
        )

    def _get_or_create_resource(
        self,
        resource: BaseRequestModel,
        response_model: Type[AnyResponseModel],
        route: str,
        params: Optional[Dict[str, Any]] = None,
    ) -> Tuple[AnyResponseModel, bool]:
        """Get or create a resource.

        Args:
            resource: The resource to get or create.
            route: The resource REST API route to use.
            response_model: Optional model to use to deserialize the response
                body. If not provided, the resource class itself will be used.
            params: Optional query parameters to pass to the endpoint.

        Returns:
            The created resource, and a boolean indicating whether the resource
            was created or not.

        Raises:
            ValueError: If the response body is not a list with 2 elements
                where the first element is the resource and the second element
                a boolean indicating whether the resource was created or not.
        """
        response_body = self.post(
            f"{route}{GET_OR_CREATE}",
            body=resource,
            params=params,
        )
        if not isinstance(response_body, list):
            raise ValueError(
                f"Expected a list response from the {route}{GET_OR_CREATE} "
                f"endpoint but got {type(response_body)} instead."
            )
        if len(response_body) != 2:
            raise ValueError(
                f"Expected a list response with 2 elements from the "
                f"{route}{GET_OR_CREATE} endpoint but got {len(response_body)} "
                f"elements instead."
            )
        model_json, was_created = response_body
        if not isinstance(was_created, bool):
            raise ValueError(
                f"Expected a boolean as the second element of the list "
                f"response from the {route}{GET_OR_CREATE} endpoint but got "
                f"{type(was_created)} instead."
            )
        return response_model.parse_obj(model_json), was_created

    def _get_or_create_workspace_scoped_resource(
        self,
        resource: WorkspaceScopedRequestModel,
        response_model: Type[AnyResponseModel],
        route: str,
        params: Optional[Dict[str, Any]] = None,
    ) -> Tuple[AnyResponseModel, bool]:
        """Get or create a workspace scoped resource.

        Args:
            resource: The resource to get or create.
            route: The resource REST API route to use.
            response_model: Optional model to use to deserialize the response
                body. If not provided, the resource class itself will be used.
            params: Optional query parameters to pass to the endpoint.

        Returns:
            The created resource, and a boolean indicating whether the resource
            was created or not.
        """
        return self._get_or_create_resource(
            resource=resource,
            response_model=response_model,
            route=f"{WORKSPACES}/{str(resource.workspace)}{route}",
            params=params,
        )

    def _get_resource(
        self,
        resource_id: Union[str, UUID],
        route: str,
        response_model: Type[AnyResponseModel],
        params: Optional[Dict[str, Any]] = None,
    ) -> AnyResponseModel:
        """Retrieve a single resource.

        Args:
            resource_id: The ID of the resource to retrieve.
            route: The resource REST API route to use.
            response_model: Model to use to serialize the response body.
            params: Optional query parameters to pass to the endpoint.

        Returns:
            The retrieved resource.
        """
        body = self.get(f"{route}/{str(resource_id)}", params=params)
        return response_model.parse_obj(body)

    def _list_paginated_resources(
        self,
        route: str,
        response_model: Type[AnyResponseModel],
        filter_model: BaseFilterModel,
        params: Optional[Dict[str, Any]] = None,
    ) -> Page[AnyResponseModel]:
        """Retrieve a list of resources filtered by some criteria.

        Args:
            route: The resource REST API route to use.
            response_model: Model to use to serialize the response body.
            filter_model: The filter model to use for the list query.
            params: Optional query parameters to pass to the endpoint.

        Returns:
            List of retrieved resources matching the filter criteria.

        Raises:
            ValueError: If the value returned by the server is not a list.
        """
        # leave out filter params that are not supplied
        params = params or {}
        params.update(filter_model.dict(exclude_none=True))
        body = self.get(f"{route}", params=params)
        if not isinstance(body, dict):
            raise ValueError(
                f"Bad API Response. Expected list, got {type(body)}"
            )
        # The initial page of items will be of type BaseResponseModel
        page_of_items: Page[AnyResponseModel] = Page.parse_obj(body)
        # So these items will be parsed into their correct types like here
        page_of_items.items = [
            response_model.parse_obj(generic_item)
            for generic_item in page_of_items.items
        ]
        return page_of_items

    def _list_resources(
        self,
        route: str,
        response_model: Type[AnyResponseModel],
        **filters: Any,
    ) -> List[AnyResponseModel]:
        """Retrieve a list of resources filtered by some criteria.

        Args:
            route: The resource REST API route to use.
            response_model: Model to use to serialize the response body.
            filters: Filter parameters to use in the query.

        Returns:
            List of retrieved resources matching the filter criteria.

        Raises:
            ValueError: If the value returned by the server is not a list.
        """
        # leave out filter params that are not supplied
        params = dict(filter(lambda x: x[1] is not None, filters.items()))
        body = self.get(f"{route}", params=params)
        if not isinstance(body, list):
            raise ValueError(
                f"Bad API Response. Expected list, got {type(body)}"
            )
        return [response_model.parse_obj(entry) for entry in body]

    def _update_resource(
        self,
        resource_id: UUID,
        resource_update: BaseModel,
        response_model: Type[AnyResponseModel],
        route: str,
        params: Optional[Dict[str, Any]] = None,
    ) -> AnyResponseModel:
        """Update an existing resource.

        Args:
            resource_id: The id of the resource to update.
            resource_update: The resource update.
            response_model: Optional model to use to deserialize the response
                body. If not provided, the resource class itself will be used.
            route: The resource REST API route to use.
            params: Optional query parameters to pass to the endpoint.

        Returns:
            The updated resource.
        """
        response_body = self.put(
            f"{route}/{str(resource_id)}", body=resource_update, params=params
        )

        return response_model.parse_obj(response_body)

    def _delete_resource(
        self, resource_id: Union[str, UUID], route: str
    ) -> None:
        """Delete a resource.

        Args:
            resource_id: The ID of the resource to delete.
            route: The resource REST API route to use.
        """
        self.delete(f"{route}/{str(resource_id)}")
session: Session property readonly

Authenticate to the ZenML server.

Returns:

Type Description
Session

A requests session with the authentication token.

CONFIG_TYPE (StoreConfiguration) pydantic-model

REST ZenML store configuration.

Attributes:

Name Type Description
type StoreType

The type of the store.

secrets_store Optional[zenml.config.secrets_store_config.SecretsStoreConfiguration]

The configuration of the secrets store to use. This defaults to a REST secrets store that extends the REST ZenML store.

username Optional[str]

The username to use to connect to the Zen server.

password Optional[str]

The password to use to connect to the Zen server.

verify_ssl Union[bool, str]

Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use or the CA bundle value itself.

http_timeout int

The timeout to use for all requests.

Source code in zenml/zen_stores/rest_zen_store.py
class RestZenStoreConfiguration(StoreConfiguration):
    """REST ZenML store configuration.

    Attributes:
        type: The type of the store.
        secrets_store: The configuration of the secrets store to use.
            This defaults to a REST secrets store that extends the REST ZenML
            store.
        username: The username to use to connect to the Zen server.
        password: The password to use to connect to the Zen server.
        verify_ssl: Either a boolean, in which case it controls whether we
            verify the server's TLS certificate, or a string, in which case it
            must be a path to a CA bundle to use or the CA bundle value itself.
        http_timeout: The timeout to use for all requests.

    """

    type: StoreType = StoreType.REST

    secrets_store: Optional[SecretsStoreConfiguration] = None

    username: Optional[str] = None
    password: Optional[str] = None
    api_token: Optional[str] = None
    verify_ssl: Union[bool, str] = True
    http_timeout: int = DEFAULT_HTTP_TIMEOUT

    @validator("secrets_store")
    def validate_secrets_store(
        cls, secrets_store: Optional[SecretsStoreConfiguration]
    ) -> SecretsStoreConfiguration:
        """Ensures that the secrets store uses an associated REST secrets store.

        Args:
            secrets_store: The secrets store config to be validated.

        Returns:
            The validated secrets store config.

        Raises:
            ValueError: If the secrets store is not of type REST.
        """
        if secrets_store is None:
            secrets_store = RestSecretsStoreConfiguration()
        elif secrets_store.type != SecretsStoreType.REST:
            raise ValueError(
                "The secrets store associated with a REST zen store must be "
                f"of type REST, but is of type {secrets_store.type}."
            )

        return secrets_store

    @root_validator
    def validate_credentials(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        """Validates the credentials provided in the values dictionary.

        Args:
            values: A dictionary containing the values to be validated.

        Raises:
            ValueError: If neither api_token nor username is set.

        Returns:
            The values dictionary.
        """
        # Check if the values dictionary contains either an api_token or a
        # username as non-empty strings.
        if values.get("api_token") or values.get("username"):
            return values
        else:
            raise ValueError(
                "Neither api_token nor username is set in the store config."
            )

    @validator("url")
    def validate_url(cls, url: str) -> str:
        """Validates that the URL is a well-formed REST store URL.

        Args:
            url: The URL to be validated.

        Returns:
            The validated URL without trailing slashes.

        Raises:
            ValueError: If the URL is not a well-formed REST store URL.
        """
        url = url.rstrip("/")
        scheme = re.search("^([a-z0-9]+://)", url)
        if scheme is None or scheme.group() not in ("https://", "http://"):
            raise ValueError(
                "Invalid URL for REST store: {url}. Should be in the form "
                "https://hostname[:port] or http://hostname[:port]."
            )

        # When running inside a container, if the URL uses localhost, the
        # target service will not be available. We try to replace localhost
        # with one of the special Docker or K3D internal hostnames.
        url = replace_localhost_with_internal_hostname(url)

        return url

    @validator("verify_ssl")
    def validate_verify_ssl(
        cls, verify_ssl: Union[bool, str]
    ) -> Union[bool, str]:
        """Validates that the verify_ssl either points to a file or is a bool.

        Args:
            verify_ssl: The verify_ssl value to be validated.

        Returns:
            The validated verify_ssl value.
        """
        secret_folder = Path(
            GlobalConfiguration().local_stores_path,
            "certificates",
        )
        if isinstance(verify_ssl, bool) or verify_ssl.startswith(
            str(secret_folder)
        ):
            return verify_ssl

        if os.path.isfile(verify_ssl):
            with open(verify_ssl, "r") as f:
                verify_ssl = f.read()

        fileio.makedirs(str(secret_folder))
        file_path = Path(secret_folder, "ca_bundle.pem")
        with open(file_path, "w") as f:
            f.write(verify_ssl)
        file_path.chmod(0o600)
        verify_ssl = str(file_path)

        return verify_ssl

    @classmethod
    def supports_url_scheme(cls, url: str) -> bool:
        """Check if a URL scheme is supported by this store.

        Args:
            url: The URL to check.

        Returns:
            True if the URL scheme is supported, False otherwise.
        """
        return urlparse(url).scheme in ("http", "https")

    def expand_certificates(self) -> None:
        """Expands the certificates in the verify_ssl field."""
        # Load the certificate values back into the configuration
        if isinstance(self.verify_ssl, str) and os.path.isfile(
            self.verify_ssl
        ):
            with open(self.verify_ssl, "r") as f:
                self.verify_ssl = f.read()

    @classmethod
    def copy_configuration(
        cls,
        config: "StoreConfiguration",
        config_path: str,
        load_config_path: Optional[PurePath] = None,
    ) -> "StoreConfiguration":
        """Create a copy of the store config using a different path.

        This method is used to create a copy of the store configuration that can
        be loaded using a different configuration path or in the context of a
        new environment, such as a container image.

        The configuration files accompanying the store configuration are also
        copied to the new configuration path (e.g. certificates etc.).

        Args:
            config: The store configuration to copy.
            config_path: new path where the configuration copy will be loaded
                from.
            load_config_path: absolute path that will be used to load the copied
                configuration. This can be set to a value different from
                `config_path` if the configuration copy will be loaded from
                a different environment, e.g. when the configuration is copied
                to a container image and loaded using a different absolute path.
                This will be reflected in the paths and URLs encoded in the
                copied configuration.

        Returns:
            A new store configuration object that reflects the new configuration
            path.
        """
        assert isinstance(config, RestZenStoreConfiguration)
        assert config.api_token is not None
        config = config.copy(exclude={"username", "password"}, deep=True)
        # Load the certificate values back into the configuration
        config.expand_certificates()
        return config

    class Config:
        """Pydantic configuration class."""

        # Don't validate attributes when assigning them. This is necessary
        # because the `verify_ssl` attribute can be expanded to the contents
        # of the certificate file.
        validate_assignment = False
        # Forbid extra attributes set in the class.
        extra = "forbid"
Config

Pydantic configuration class.

Source code in zenml/zen_stores/rest_zen_store.py
class Config:
    """Pydantic configuration class."""

    # Don't validate attributes when assigning them. This is necessary
    # because the `verify_ssl` attribute can be expanded to the contents
    # of the certificate file.
    validate_assignment = False
    # Forbid extra attributes set in the class.
    extra = "forbid"
copy_configuration(config, config_path, load_config_path=None) classmethod

Create a copy of the store config using a different path.

This method is used to create a copy of the store configuration that can be loaded using a different configuration path or in the context of a new environment, such as a container image.

The configuration files accompanying the store configuration are also copied to the new configuration path (e.g. certificates etc.).

Parameters:

Name Type Description Default
config StoreConfiguration

The store configuration to copy.

required
config_path str

new path where the configuration copy will be loaded from.

required
load_config_path Optional[pathlib.PurePath]

absolute path that will be used to load the copied configuration. This can be set to a value different from config_path if the configuration copy will be loaded from a different environment, e.g. when the configuration is copied to a container image and loaded using a different absolute path. This will be reflected in the paths and URLs encoded in the copied configuration.

None

Returns:

Type Description
StoreConfiguration

A new store configuration object that reflects the new configuration path.

Source code in zenml/zen_stores/rest_zen_store.py
@classmethod
def copy_configuration(
    cls,
    config: "StoreConfiguration",
    config_path: str,
    load_config_path: Optional[PurePath] = None,
) -> "StoreConfiguration":
    """Create a copy of the store config using a different path.

    This method is used to create a copy of the store configuration that can
    be loaded using a different configuration path or in the context of a
    new environment, such as a container image.

    The configuration files accompanying the store configuration are also
    copied to the new configuration path (e.g. certificates etc.).

    Args:
        config: The store configuration to copy.
        config_path: new path where the configuration copy will be loaded
            from.
        load_config_path: absolute path that will be used to load the copied
            configuration. This can be set to a value different from
            `config_path` if the configuration copy will be loaded from
            a different environment, e.g. when the configuration is copied
            to a container image and loaded using a different absolute path.
            This will be reflected in the paths and URLs encoded in the
            copied configuration.

    Returns:
        A new store configuration object that reflects the new configuration
        path.
    """
    assert isinstance(config, RestZenStoreConfiguration)
    assert config.api_token is not None
    config = config.copy(exclude={"username", "password"}, deep=True)
    # Load the certificate values back into the configuration
    config.expand_certificates()
    return config
expand_certificates(self)

Expands the certificates in the verify_ssl field.

Source code in zenml/zen_stores/rest_zen_store.py
def expand_certificates(self) -> None:
    """Expands the certificates in the verify_ssl field."""
    # Load the certificate values back into the configuration
    if isinstance(self.verify_ssl, str) and os.path.isfile(
        self.verify_ssl
    ):
        with open(self.verify_ssl, "r") as f:
            self.verify_ssl = f.read()
supports_url_scheme(url) classmethod

Check if a URL scheme is supported by this store.

Parameters:

Name Type Description Default
url str

The URL to check.

required

Returns:

Type Description
bool

True if the URL scheme is supported, False otherwise.

Source code in zenml/zen_stores/rest_zen_store.py
@classmethod
def supports_url_scheme(cls, url: str) -> bool:
    """Check if a URL scheme is supported by this store.

    Args:
        url: The URL to check.

    Returns:
        True if the URL scheme is supported, False otherwise.
    """
    return urlparse(url).scheme in ("http", "https")
validate_credentials(values) classmethod

Validates the credentials provided in the values dictionary.

Parameters:

Name Type Description Default
values Dict[str, Any]

A dictionary containing the values to be validated.

required

Exceptions:

Type Description
ValueError

If neither api_token nor username is set.

Returns:

Type Description
Dict[str, Any]

The values dictionary.

Source code in zenml/zen_stores/rest_zen_store.py
@root_validator
def validate_credentials(cls, values: Dict[str, Any]) -> Dict[str, Any]:
    """Validates the credentials provided in the values dictionary.

    Args:
        values: A dictionary containing the values to be validated.

    Raises:
        ValueError: If neither api_token nor username is set.

    Returns:
        The values dictionary.
    """
    # Check if the values dictionary contains either an api_token or a
    # username as non-empty strings.
    if values.get("api_token") or values.get("username"):
        return values
    else:
        raise ValueError(
            "Neither api_token nor username is set in the store config."
        )
validate_secrets_store(secrets_store) classmethod

Ensures that the secrets store uses an associated REST secrets store.

Parameters:

Name Type Description Default
secrets_store Optional[zenml.config.secrets_store_config.SecretsStoreConfiguration]

The secrets store config to be validated.

required

Returns:

Type Description
SecretsStoreConfiguration

The validated secrets store config.

Exceptions:

Type Description
ValueError

If the secrets store is not of type REST.

Source code in zenml/zen_stores/rest_zen_store.py
@validator("secrets_store")
def validate_secrets_store(
    cls, secrets_store: Optional[SecretsStoreConfiguration]
) -> SecretsStoreConfiguration:
    """Ensures that the secrets store uses an associated REST secrets store.

    Args:
        secrets_store: The secrets store config to be validated.

    Returns:
        The validated secrets store config.

    Raises:
        ValueError: If the secrets store is not of type REST.
    """
    if secrets_store is None:
        secrets_store = RestSecretsStoreConfiguration()
    elif secrets_store.type != SecretsStoreType.REST:
        raise ValueError(
            "The secrets store associated with a REST zen store must be "
            f"of type REST, but is of type {secrets_store.type}."
        )

    return secrets_store
validate_url(url) classmethod

Validates that the URL is a well-formed REST store URL.

Parameters:

Name Type Description Default
url str

The URL to be validated.

required

Returns:

Type Description
str

The validated URL without trailing slashes.

Exceptions:

Type Description
ValueError

If the URL is not a well-formed REST store URL.

Source code in zenml/zen_stores/rest_zen_store.py
@validator("url")
def validate_url(cls, url: str) -> str:
    """Validates that the URL is a well-formed REST store URL.

    Args:
        url: The URL to be validated.

    Returns:
        The validated URL without trailing slashes.

    Raises:
        ValueError: If the URL is not a well-formed REST store URL.
    """
    url = url.rstrip("/")
    scheme = re.search("^([a-z0-9]+://)", url)
    if scheme is None or scheme.group() not in ("https://", "http://"):
        raise ValueError(
            "Invalid URL for REST store: {url}. Should be in the form "
            "https://hostname[:port] or http://hostname[:port]."
        )

    # When running inside a container, if the URL uses localhost, the
    # target service will not be available. We try to replace localhost
    # with one of the special Docker or K3D internal hostnames.
    url = replace_localhost_with_internal_hostname(url)

    return url
validate_verify_ssl(verify_ssl) classmethod

Validates that the verify_ssl either points to a file or is a bool.

Parameters:

Name Type Description Default
verify_ssl Union[bool, str]

The verify_ssl value to be validated.

required

Returns:

Type Description
Union[bool, str]

The validated verify_ssl value.

Source code in zenml/zen_stores/rest_zen_store.py
@validator("verify_ssl")
def validate_verify_ssl(
    cls, verify_ssl: Union[bool, str]
) -> Union[bool, str]:
    """Validates that the verify_ssl either points to a file or is a bool.

    Args:
        verify_ssl: The verify_ssl value to be validated.

    Returns:
        The validated verify_ssl value.
    """
    secret_folder = Path(
        GlobalConfiguration().local_stores_path,
        "certificates",
    )
    if isinstance(verify_ssl, bool) or verify_ssl.startswith(
        str(secret_folder)
    ):
        return verify_ssl

    if os.path.isfile(verify_ssl):
        with open(verify_ssl, "r") as f:
            verify_ssl = f.read()

    fileio.makedirs(str(secret_folder))
    file_path = Path(secret_folder, "ca_bundle.pem")
    with open(file_path, "w") as f:
        f.write(verify_ssl)
    file_path.chmod(0o600)
    verify_ssl = str(file_path)

    return verify_ssl
create_artifact(self, artifact)

Creates an artifact.

Parameters:

Name Type Description Default
artifact ArtifactRequestModel

The artifact to create.

required

Returns:

Type Description
ArtifactResponseModel

The created artifact.

Source code in zenml/zen_stores/rest_zen_store.py
def create_artifact(
    self, artifact: ArtifactRequestModel
) -> ArtifactResponseModel:
    """Creates an artifact.

    Args:
        artifact: The artifact to create.

    Returns:
        The created artifact.
    """
    return self._create_resource(
        resource=artifact,
        response_model=ArtifactResponseModel,
        route=ARTIFACTS,
    )
create_build(self, build)

Creates a new build in a workspace.

Parameters:

Name Type Description Default
build PipelineBuildRequestModel

The build to create.

required

Returns:

Type Description
PipelineBuildResponseModel

The newly created build.

Source code in zenml/zen_stores/rest_zen_store.py
def create_build(
    self,
    build: PipelineBuildRequestModel,
) -> PipelineBuildResponseModel:
    """Creates a new build in a workspace.

    Args:
        build: The build to create.

    Returns:
        The newly created build.
    """
    return self._create_workspace_scoped_resource(
        resource=build,
        route=PIPELINE_BUILDS,
        response_model=PipelineBuildResponseModel,
    )
create_code_repository(self, code_repository)

Creates a new code repository.

Parameters:

Name Type Description Default
code_repository CodeRepositoryRequestModel

Code repository to be created.

required

Returns:

Type Description
CodeRepositoryResponseModel

The newly created code repository.

Source code in zenml/zen_stores/rest_zen_store.py
def create_code_repository(
    self, code_repository: CodeRepositoryRequestModel
) -> CodeRepositoryResponseModel:
    """Creates a new code repository.

    Args:
        code_repository: Code repository to be created.

    Returns:
        The newly created code repository.
    """
    return self._create_workspace_scoped_resource(
        resource=code_repository,
        response_model=CodeRepositoryResponseModel,
        route=CODE_REPOSITORIES,
    )
create_deployment(self, deployment)

Creates a new deployment in a workspace.

Parameters:

Name Type Description Default
deployment PipelineDeploymentRequestModel

The deployment to create.

required

Returns:

Type Description
PipelineDeploymentResponseModel

The newly created deployment.

Source code in zenml/zen_stores/rest_zen_store.py
def create_deployment(
    self,
    deployment: PipelineDeploymentRequestModel,
) -> PipelineDeploymentResponseModel:
    """Creates a new deployment in a workspace.

    Args:
        deployment: The deployment to create.

    Returns:
        The newly created deployment.
    """
    return self._create_workspace_scoped_resource(
        resource=deployment,
        route=PIPELINE_DEPLOYMENTS,
        response_model=PipelineDeploymentResponseModel,
    )
create_flavor(self, flavor)

Creates a new stack component flavor.

Parameters:

Name Type Description Default
flavor FlavorRequestModel

The stack component flavor to create.

required

Returns:

Type Description
FlavorResponseModel

The newly created flavor.

Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.CREATED_FLAVOR)
def create_flavor(self, flavor: FlavorRequestModel) -> FlavorResponseModel:
    """Creates a new stack component flavor.

    Args:
        flavor: The stack component flavor to create.

    Returns:
        The newly created flavor.
    """
    return self._create_resource(
        resource=flavor,
        route=FLAVORS,
        response_model=FlavorResponseModel,
    )
create_pipeline(self, pipeline)

Creates a new pipeline in a workspace.

Parameters:

Name Type Description Default
pipeline PipelineRequestModel

The pipeline to create.

required

Returns:

Type Description
PipelineResponseModel

The newly created pipeline.

Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.CREATE_PIPELINE)
def create_pipeline(
    self, pipeline: PipelineRequestModel
) -> PipelineResponseModel:
    """Creates a new pipeline in a workspace.

    Args:
        pipeline: The pipeline to create.

    Returns:
        The newly created pipeline.
    """
    return self._create_workspace_scoped_resource(
        resource=pipeline,
        route=PIPELINES,
        response_model=PipelineResponseModel,
    )
create_role(self, role)

Creates a new role.

Parameters:

Name Type Description Default
role RoleRequestModel

The role model to create.

required

Returns:

Type Description
RoleResponseModel

The newly created role.

Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.CREATED_ROLE)
def create_role(self, role: RoleRequestModel) -> RoleResponseModel:
    """Creates a new role.

    Args:
        role: The role model to create.

    Returns:
        The newly created role.
    """
    return self._create_resource(
        resource=role,
        route=ROLES,
        response_model=RoleResponseModel,
    )
create_run(self, pipeline_run)

Creates a pipeline run.

Parameters:

Name Type Description Default
pipeline_run PipelineRunRequestModel

The pipeline run to create.

required

Returns:

Type Description
PipelineRunResponseModel

The created pipeline run.

Source code in zenml/zen_stores/rest_zen_store.py
def create_run(
    self, pipeline_run: PipelineRunRequestModel
) -> PipelineRunResponseModel:
    """Creates a pipeline run.

    Args:
        pipeline_run: The pipeline run to create.

    Returns:
        The created pipeline run.
    """
    return self._create_workspace_scoped_resource(
        resource=pipeline_run,
        response_model=PipelineRunResponseModel,
        route=RUNS,
    )
create_run_metadata(self, run_metadata)

Creates run metadata.

Parameters:

Name Type Description Default
run_metadata RunMetadataRequestModel

The run metadata to create.

required

Returns:

Type Description
RunMetadataResponseModel

The created run metadata.

Source code in zenml/zen_stores/rest_zen_store.py
def create_run_metadata(
    self, run_metadata: RunMetadataRequestModel
) -> RunMetadataResponseModel:
    """Creates run metadata.

    Args:
        run_metadata: The run metadata to create.

    Returns:
        The created run metadata.
    """
    return self._create_workspace_scoped_resource(
        resource=run_metadata,
        response_model=RunMetadataResponseModel,
        route=RUN_METADATA,
    )
create_run_step(self, step_run)

Creates a step run.

Parameters:

Name Type Description Default
step_run StepRunRequestModel

The step run to create.

required

Returns:

Type Description
StepRunResponseModel

The created step run.

Source code in zenml/zen_stores/rest_zen_store.py
def create_run_step(
    self, step_run: StepRunRequestModel
) -> StepRunResponseModel:
    """Creates a step run.

    Args:
        step_run: The step run to create.

    Returns:
        The created step run.
    """
    return self._create_resource(
        resource=step_run,
        response_model=StepRunResponseModel,
        route=STEPS,
    )
create_schedule(self, schedule)

Creates a new schedule.

Parameters:

Name Type Description Default
schedule ScheduleRequestModel

The schedule to create.

required

Returns:

Type Description
ScheduleResponseModel

The newly created schedule.

Source code in zenml/zen_stores/rest_zen_store.py
def create_schedule(
    self, schedule: ScheduleRequestModel
) -> ScheduleResponseModel:
    """Creates a new schedule.

    Args:
        schedule: The schedule to create.

    Returns:
        The newly created schedule.
    """
    return self._create_workspace_scoped_resource(
        resource=schedule,
        route=SCHEDULES,
        response_model=ScheduleResponseModel,
    )
create_service_connector(self, service_connector)

Creates a new service connector.

Parameters:

Name Type Description Default
service_connector ServiceConnectorRequestModel

Service connector to be created.

required

Returns:

Type Description
ServiceConnectorResponseModel

The newly created service connector.

Source code in zenml/zen_stores/rest_zen_store.py
def create_service_connector(
    self, service_connector: ServiceConnectorRequestModel
) -> ServiceConnectorResponseModel:
    """Creates a new service connector.

    Args:
        service_connector: Service connector to be created.

    Returns:
        The newly created service connector.
    """
    connector_model = self._create_workspace_scoped_resource(
        resource=service_connector,
        route=SERVICE_CONNECTORS,
        response_model=ServiceConnectorResponseModel,
    )
    self._populate_connector_type(connector_model)
    return connector_model
create_stack(self, stack)

Register a new stack.

Parameters:

Name Type Description Default
stack StackRequestModel

The stack to register.

required

Returns:

Type Description
StackResponseModel

The registered stack.

Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.REGISTERED_STACK)
def create_stack(self, stack: StackRequestModel) -> StackResponseModel:
    """Register a new stack.

    Args:
        stack: The stack to register.

    Returns:
        The registered stack.
    """
    return self._create_workspace_scoped_resource(
        resource=stack,
        route=STACKS,
        response_model=StackResponseModel,
    )
create_stack_component(self, component)

Create a stack component.

Parameters:

Name Type Description Default
component ComponentRequestModel

The stack component to create.

required

Returns:

Type Description
ComponentResponseModel

The created stack component.

Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.REGISTERED_STACK_COMPONENT)
def create_stack_component(
    self,
    component: ComponentRequestModel,
) -> ComponentResponseModel:
    """Create a stack component.

    Args:
        component: The stack component to create.

    Returns:
        The created stack component.
    """
    return self._create_workspace_scoped_resource(
        resource=component,
        route=STACK_COMPONENTS,
        response_model=ComponentResponseModel,
    )
create_team(self, team)

Creates a new team.

Parameters:

Name Type Description Default
team TeamRequestModel

The team model to create.

required

Returns:

Type Description
TeamResponseModel

The newly created team.

Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.CREATED_TEAM)
def create_team(self, team: TeamRequestModel) -> TeamResponseModel:
    """Creates a new team.

    Args:
        team: The team model to create.

    Returns:
        The newly created team.
    """
    return self._create_resource(
        resource=team,
        route=TEAMS,
        response_model=TeamResponseModel,
    )
create_team_role_assignment(self, team_role_assignment)

Creates a new team role assignment.

Parameters:

Name Type Description Default
team_role_assignment TeamRoleAssignmentRequestModel

The role assignment model to create.

required

Returns:

Type Description
TeamRoleAssignmentResponseModel

The newly created role assignment.

Source code in zenml/zen_stores/rest_zen_store.py
def create_team_role_assignment(
    self, team_role_assignment: TeamRoleAssignmentRequestModel
) -> TeamRoleAssignmentResponseModel:
    """Creates a new team role assignment.

    Args:
        team_role_assignment: The role assignment model to create.

    Returns:
        The newly created role assignment.
    """
    return self._create_resource(
        resource=team_role_assignment,
        route=TEAM_ROLE_ASSIGNMENTS,
        response_model=TeamRoleAssignmentResponseModel,
    )
create_user(self, user)

Creates a new user.

Parameters:

Name Type Description Default
user UserRequestModel

User to be created.

required

Returns:

Type Description
UserResponseModel

The newly created user.

Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.CREATED_USER)
def create_user(self, user: UserRequestModel) -> UserResponseModel:
    """Creates a new user.

    Args:
        user: User to be created.

    Returns:
        The newly created user.
    """
    return self._create_resource(
        resource=user,
        route=USERS + "?assign_default_role=False",
        response_model=UserResponseModel,
    )
create_user_role_assignment(self, user_role_assignment)

Creates a new role assignment.

Parameters:

Name Type Description Default
user_role_assignment UserRoleAssignmentRequestModel

The role assignment to create.

required

Returns:

Type Description
UserRoleAssignmentResponseModel

The newly created workspace.

Source code in zenml/zen_stores/rest_zen_store.py
def create_user_role_assignment(
    self, user_role_assignment: UserRoleAssignmentRequestModel
) -> UserRoleAssignmentResponseModel:
    """Creates a new role assignment.

    Args:
        user_role_assignment: The role assignment to create.

    Returns:
        The newly created workspace.
    """
    return self._create_resource(
        resource=user_role_assignment,
        route=USER_ROLE_ASSIGNMENTS,
        response_model=UserRoleAssignmentResponseModel,
    )
create_workspace(self, workspace)

Creates a new workspace.

Parameters:

Name Type Description Default
workspace WorkspaceRequestModel

The workspace to create.

required

Returns:

Type Description
WorkspaceResponseModel

The newly created workspace.

Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.CREATED_WORKSPACE)
def create_workspace(
    self, workspace: WorkspaceRequestModel
) -> WorkspaceResponseModel:
    """Creates a new workspace.

    Args:
        workspace: The workspace to create.

    Returns:
        The newly created workspace.
    """
    return self._create_resource(
        resource=workspace,
        route=WORKSPACES,
        response_model=WorkspaceResponseModel,
    )
delete(self, path, params=None, **kwargs)

Make a DELETE request to the given endpoint path.

Parameters:

Name Type Description Default
path str

The path to the endpoint.

required
params Optional[Dict[str, Any]]

The query parameters to pass to the endpoint.

None
kwargs Any

Additional keyword arguments to pass to the request.

{}

Returns:

Type Description
Union[Dict[str, Any], List[Any], str, int, float, bool]

The response body.

Source code in zenml/zen_stores/rest_zen_store.py
def delete(
    self, path: str, params: Optional[Dict[str, Any]] = None, **kwargs: Any
) -> Json:
    """Make a DELETE request to the given endpoint path.

    Args:
        path: The path to the endpoint.
        params: The query parameters to pass to the endpoint.
        kwargs: Additional keyword arguments to pass to the request.

    Returns:
        The response body.
    """
    logger.debug(f"Sending DELETE request to {path}...")
    return self._request(
        "DELETE",
        self.url + API + VERSION_1 + path,
        params=params,
        **kwargs,
    )
delete_artifact(self, artifact_id)

Deletes an artifact.

Parameters:

Name Type Description Default
artifact_id UUID

The ID of the artifact to delete.

required
Source code in zenml/zen_stores/rest_zen_store.py
def delete_artifact(self, artifact_id: UUID) -> None:
    """Deletes an artifact.

    Args:
        artifact_id: The ID of the artifact to delete.
    """
    self._delete_resource(resource_id=artifact_id, route=ARTIFACTS)
delete_build(self, build_id)

Deletes a build.

Parameters:

Name Type Description Default
build_id UUID

The ID of the build to delete.

required
Source code in zenml/zen_stores/rest_zen_store.py
def delete_build(self, build_id: UUID) -> None:
    """Deletes a build.

    Args:
        build_id: The ID of the build to delete.
    """
    self._delete_resource(
        resource_id=build_id,
        route=PIPELINE_BUILDS,
    )
delete_code_repository(self, code_repository_id)

Deletes a code repository.

Parameters:

Name Type Description Default
code_repository_id UUID

The ID of the code repository to delete.

required
Source code in zenml/zen_stores/rest_zen_store.py
def delete_code_repository(self, code_repository_id: UUID) -> None:
    """Deletes a code repository.

    Args:
        code_repository_id: The ID of the code repository to delete.
    """
    self._delete_resource(
        resource_id=code_repository_id, route=CODE_REPOSITORIES
    )
delete_deployment(self, deployment_id)

Deletes a deployment.

Parameters:

Name Type Description Default
deployment_id UUID

The ID of the deployment to delete.

required
Source code in zenml/zen_stores/rest_zen_store.py
def delete_deployment(self, deployment_id: UUID) -> None:
    """Deletes a deployment.

    Args:
        deployment_id: The ID of the deployment to delete.
    """
    self._delete_resource(
        resource_id=deployment_id,
        route=PIPELINE_DEPLOYMENTS,
    )
delete_flavor(self, flavor_id)

Delete a stack component flavor.

Parameters:

Name Type Description Default
flavor_id UUID

The ID of the stack component flavor to delete.

required
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.DELETED_FLAVOR)
def delete_flavor(self, flavor_id: UUID) -> None:
    """Delete a stack component flavor.

    Args:
        flavor_id: The ID of the stack component flavor to delete.
    """
    self._delete_resource(
        resource_id=flavor_id,
        route=FLAVORS,
    )
delete_pipeline(self, pipeline_id)

Deletes a pipeline.

Parameters:

Name Type Description Default
pipeline_id UUID

The ID of the pipeline to delete.

required
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.DELETE_PIPELINE)
def delete_pipeline(self, pipeline_id: UUID) -> None:
    """Deletes a pipeline.

    Args:
        pipeline_id: The ID of the pipeline to delete.
    """
    self._delete_resource(
        resource_id=pipeline_id,
        route=PIPELINES,
    )
delete_role(self, role_name_or_id)

Deletes a role.

Parameters:

Name Type Description Default
role_name_or_id Union[str, uuid.UUID]

Name or ID of the role to delete.

required
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.DELETED_ROLE)
def delete_role(self, role_name_or_id: Union[str, UUID]) -> None:
    """Deletes a role.

    Args:
        role_name_or_id: Name or ID of the role to delete.
    """
    self._delete_resource(
        resource_id=role_name_or_id,
        route=ROLES,
    )
delete_run(self, run_id)

Deletes a pipeline run.

Parameters:

Name Type Description Default
run_id UUID

The ID of the pipeline run to delete.

required
Source code in zenml/zen_stores/rest_zen_store.py
def delete_run(self, run_id: UUID) -> None:
    """Deletes a pipeline run.

    Args:
        run_id: The ID of the pipeline run to delete.
    """
    self._delete_resource(
        resource_id=run_id,
        route=RUNS,
    )
delete_schedule(self, schedule_id)

Deletes a schedule.

Parameters:

Name Type Description Default
schedule_id UUID

The ID of the schedule to delete.

required
Source code in zenml/zen_stores/rest_zen_store.py
def delete_schedule(self, schedule_id: UUID) -> None:
    """Deletes a schedule.

    Args:
        schedule_id: The ID of the schedule to delete.
    """
    self._delete_resource(
        resource_id=schedule_id,
        route=SCHEDULES,
    )
delete_service_connector(self, service_connector_id)

Deletes a service connector.

Parameters:

Name Type Description Default
service_connector_id UUID

The ID of the service connector to delete.

required
Source code in zenml/zen_stores/rest_zen_store.py
def delete_service_connector(self, service_connector_id: UUID) -> None:
    """Deletes a service connector.

    Args:
        service_connector_id: The ID of the service connector to delete.
    """
    self._delete_resource(
        resource_id=service_connector_id, route=SERVICE_CONNECTORS
    )
delete_stack(self, stack_id)

Delete a stack.

Parameters:

Name Type Description Default
stack_id UUID

The ID of the stack to delete.

required
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.DELETED_STACK)
def delete_stack(self, stack_id: UUID) -> None:
    """Delete a stack.

    Args:
        stack_id: The ID of the stack to delete.
    """
    self._delete_resource(
        resource_id=stack_id,
        route=STACKS,
    )
delete_stack_component(self, component_id)

Delete a stack component.

Parameters:

Name Type Description Default
component_id UUID

The ID of the stack component to delete.

required
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.DELETED_STACK_COMPONENT)
def delete_stack_component(self, component_id: UUID) -> None:
    """Delete a stack component.

    Args:
        component_id: The ID of the stack component to delete.
    """
    self._delete_resource(
        resource_id=component_id,
        route=STACK_COMPONENTS,
    )
delete_team(self, team_name_or_id)

Deletes a team.

Parameters:

Name Type Description Default
team_name_or_id Union[str, uuid.UUID]

Name or ID of the team to delete.

required
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.DELETED_TEAM)
def delete_team(self, team_name_or_id: Union[str, UUID]) -> None:
    """Deletes a team.

    Args:
        team_name_or_id: Name or ID of the team to delete.
    """
    self._delete_resource(
        resource_id=team_name_or_id,
        route=TEAMS,
    )
delete_team_role_assignment(self, team_role_assignment_id)

Delete a specific role assignment.

Parameters:

Name Type Description Default
team_role_assignment_id UUID

The ID of the specific role assignment

required
Source code in zenml/zen_stores/rest_zen_store.py
def delete_team_role_assignment(
    self, team_role_assignment_id: UUID
) -> None:
    """Delete a specific role assignment.

    Args:
        team_role_assignment_id: The ID of the specific role assignment
    """
    self._delete_resource(
        resource_id=team_role_assignment_id,
        route=TEAM_ROLE_ASSIGNMENTS,
    )
delete_user(self, user_name_or_id)

Deletes a user.

Parameters:

Name Type Description Default
user_name_or_id Union[str, uuid.UUID]

The name or ID of the user to delete.

required
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.DELETED_USER)
def delete_user(self, user_name_or_id: Union[str, UUID]) -> None:
    """Deletes a user.

    Args:
        user_name_or_id: The name or ID of the user to delete.
    """
    self._delete_resource(
        resource_id=user_name_or_id,
        route=USERS,
    )
delete_user_role_assignment(self, user_role_assignment_id)

Delete a specific role assignment.

Parameters:

Name Type Description Default
user_role_assignment_id UUID

The ID of the specific role assignment

required
Source code in zenml/zen_stores/rest_zen_store.py
def delete_user_role_assignment(
    self, user_role_assignment_id: UUID
) -> None:
    """Delete a specific role assignment.

    Args:
        user_role_assignment_id: The ID of the specific role assignment
    """
    self._delete_resource(
        resource_id=user_role_assignment_id,
        route=USER_ROLE_ASSIGNMENTS,
    )
delete_workspace(self, workspace_name_or_id)

Deletes a workspace.

Parameters:

Name Type Description Default
workspace_name_or_id Union[str, uuid.UUID]

Name or ID of the workspace to delete.

required
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.DELETED_WORKSPACE)
def delete_workspace(self, workspace_name_or_id: Union[str, UUID]) -> None:
    """Deletes a workspace.

    Args:
        workspace_name_or_id: Name or ID of the workspace to delete.
    """
    self._delete_resource(
        resource_id=workspace_name_or_id,
        route=WORKSPACES,
    )
get(self, path, params=None, **kwargs)

Make a GET request to the given endpoint path.

Parameters:

Name Type Description Default
path str

The path to the endpoint.

required
params Optional[Dict[str, Any]]

The query parameters to pass to the endpoint.

None
kwargs Any

Additional keyword arguments to pass to the request.

{}

Returns:

Type Description
Union[Dict[str, Any], List[Any], str, int, float, bool]

The response body.

Source code in zenml/zen_stores/rest_zen_store.py
def get(
    self, path: str, params: Optional[Dict[str, Any]] = None, **kwargs: Any
) -> Json:
    """Make a GET request to the given endpoint path.

    Args:
        path: The path to the endpoint.
        params: The query parameters to pass to the endpoint.
        kwargs: Additional keyword arguments to pass to the request.

    Returns:
        The response body.
    """
    logger.debug(f"Sending GET request to {path}...")
    return self._request(
        "GET", self.url + API + VERSION_1 + path, params=params, **kwargs
    )
get_artifact(self, artifact_id)

Gets an artifact.

Parameters:

Name Type Description Default
artifact_id UUID

The ID of the artifact to get.

required

Returns:

Type Description
ArtifactResponseModel

The artifact.

Source code in zenml/zen_stores/rest_zen_store.py
def get_artifact(self, artifact_id: UUID) -> ArtifactResponseModel:
    """Gets an artifact.

    Args:
        artifact_id: The ID of the artifact to get.

    Returns:
        The artifact.
    """
    return self._get_resource(
        resource_id=artifact_id,
        route=ARTIFACTS,
        response_model=ArtifactResponseModel,
    )
get_build(self, build_id)

Get a build with a given ID.

Parameters:

Name Type Description Default
build_id UUID

ID of the build.

required

Returns:

Type Description
PipelineBuildResponseModel

The build.

Source code in zenml/zen_stores/rest_zen_store.py
def get_build(self, build_id: UUID) -> PipelineBuildResponseModel:
    """Get a build with a given ID.

    Args:
        build_id: ID of the build.

    Returns:
        The build.
    """
    return self._get_resource(
        resource_id=build_id,
        route=PIPELINE_BUILDS,
        response_model=PipelineBuildResponseModel,
    )
get_code_repository(self, code_repository_id)

Gets a specific code repository.

Parameters:

Name Type Description Default
code_repository_id UUID

The ID of the code repository to get.

required

Returns:

Type Description
CodeRepositoryResponseModel

The requested code repository, if it was found.

Source code in zenml/zen_stores/rest_zen_store.py
def get_code_repository(
    self, code_repository_id: UUID
) -> CodeRepositoryResponseModel:
    """Gets a specific code repository.

    Args:
        code_repository_id: The ID of the code repository to get.

    Returns:
        The requested code repository, if it was found.
    """
    return self._get_resource(
        resource_id=code_repository_id,
        route=CODE_REPOSITORIES,
        response_model=CodeRepositoryResponseModel,
    )
get_deployment(self, deployment_id)

Get a deployment with a given ID.

Parameters:

Name Type Description Default
deployment_id UUID

ID of the deployment.

required

Returns:

Type Description
PipelineDeploymentResponseModel

The deployment.

Source code in zenml/zen_stores/rest_zen_store.py
def get_deployment(
    self, deployment_id: UUID
) -> PipelineDeploymentResponseModel:
    """Get a deployment with a given ID.

    Args:
        deployment_id: ID of the deployment.

    Returns:
        The deployment.
    """
    return self._get_resource(
        resource_id=deployment_id,
        route=PIPELINE_DEPLOYMENTS,
        response_model=PipelineDeploymentResponseModel,
    )
get_flavor(self, flavor_id)

Get a stack component flavor by ID.

Parameters:

Name Type Description Default
flavor_id UUID

The ID of the stack component flavor to get.

required

Returns:

Type Description
FlavorResponseModel

The stack component flavor.

Source code in zenml/zen_stores/rest_zen_store.py
def get_flavor(self, flavor_id: UUID) -> FlavorResponseModel:
    """Get a stack component flavor by ID.

    Args:
        flavor_id: The ID of the stack component flavor to get.

    Returns:
        The stack component flavor.
    """
    return self._get_resource(
        resource_id=flavor_id,
        route=FLAVORS,
        response_model=FlavorResponseModel,
    )
get_or_create_run(self, pipeline_run)

Gets or creates a pipeline run.

If a run with the same ID or name already exists, it is returned. Otherwise, a new run is created.

Parameters:

Name Type Description Default
pipeline_run PipelineRunRequestModel

The pipeline run to get or create.

required

Returns:

Type Description
Tuple[zenml.models.pipeline_run_models.PipelineRunResponseModel, bool]

The pipeline run, and a boolean indicating whether the run was created or not.

Source code in zenml/zen_stores/rest_zen_store.py
def get_or_create_run(
    self, pipeline_run: PipelineRunRequestModel
) -> Tuple[PipelineRunResponseModel, bool]:
    """Gets or creates a pipeline run.

    If a run with the same ID or name already exists, it is returned.
    Otherwise, a new run is created.

    Args:
        pipeline_run: The pipeline run to get or create.

    Returns:
        The pipeline run, and a boolean indicating whether the run was
        created or not.
    """
    return self._get_or_create_workspace_scoped_resource(
        resource=pipeline_run,
        route=RUNS,
        response_model=PipelineRunResponseModel,
    )
get_pipeline(self, pipeline_id)

Get a pipeline with a given ID.

Parameters:

Name Type Description Default
pipeline_id UUID

ID of the pipeline.

required

Returns:

Type Description
PipelineResponseModel

The pipeline.

Source code in zenml/zen_stores/rest_zen_store.py
def get_pipeline(self, pipeline_id: UUID) -> PipelineResponseModel:
    """Get a pipeline with a given ID.

    Args:
        pipeline_id: ID of the pipeline.

    Returns:
        The pipeline.
    """
    return self._get_resource(
        resource_id=pipeline_id,
        route=PIPELINES,
        response_model=PipelineResponseModel,
    )
get_role(self, role_name_or_id)

Gets a specific role.

Parameters:

Name Type Description Default
role_name_or_id Union[str, uuid.UUID]

Name or ID of the role to get.

required

Returns:

Type Description
RoleResponseModel

The requested role.

Source code in zenml/zen_stores/rest_zen_store.py
def get_role(self, role_name_or_id: Union[str, UUID]) -> RoleResponseModel:
    """Gets a specific role.

    Args:
        role_name_or_id: Name or ID of the role to get.

    Returns:
        The requested role.
    """
    return self._get_resource(
        resource_id=role_name_or_id,
        route=ROLES,
        response_model=RoleResponseModel,
    )
get_run(self, run_name_or_id)

Gets a pipeline run.

Parameters:

Name Type Description Default
run_name_or_id Union[uuid.UUID, str]

The name or ID of the pipeline run to get.

required

Returns:

Type Description
PipelineRunResponseModel

The pipeline run.

Source code in zenml/zen_stores/rest_zen_store.py
def get_run(
    self, run_name_or_id: Union[UUID, str]
) -> PipelineRunResponseModel:
    """Gets a pipeline run.

    Args:
        run_name_or_id: The name or ID of the pipeline run to get.

    Returns:
        The pipeline run.
    """
    return self._get_resource(
        resource_id=run_name_or_id,
        route=RUNS,
        response_model=PipelineRunResponseModel,
    )
get_run_step(self, step_run_id)

Get a step run by ID.

Parameters:

Name Type Description Default
step_run_id UUID

The ID of the step run to get.

required

Returns:

Type Description
StepRunResponseModel

The step run.

Source code in zenml/zen_stores/rest_zen_store.py
def get_run_step(self, step_run_id: UUID) -> StepRunResponseModel:
    """Get a step run by ID.

    Args:
        step_run_id: The ID of the step run to get.

    Returns:
        The step run.
    """
    return self._get_resource(
        resource_id=step_run_id,
        route=STEPS,
        response_model=StepRunResponseModel,
    )
get_schedule(self, schedule_id)

Get a schedule with a given ID.

Parameters:

Name Type Description Default
schedule_id UUID

ID of the schedule.

required

Returns:

Type Description
ScheduleResponseModel

The schedule.

Source code in zenml/zen_stores/rest_zen_store.py
def get_schedule(self, schedule_id: UUID) -> ScheduleResponseModel:
    """Get a schedule with a given ID.

    Args:
        schedule_id: ID of the schedule.

    Returns:
        The schedule.
    """
    return self._get_resource(
        resource_id=schedule_id,
        route=SCHEDULES,
        response_model=ScheduleResponseModel,
    )
get_service_connector(self, service_connector_id)

Gets a specific service connector.

Parameters:

Name Type Description Default
service_connector_id UUID

The ID of the service connector to get.

required

Returns:

Type Description
ServiceConnectorResponseModel

The requested service connector, if it was found.

Source code in zenml/zen_stores/rest_zen_store.py
def get_service_connector(
    self, service_connector_id: UUID
) -> ServiceConnectorResponseModel:
    """Gets a specific service connector.

    Args:
        service_connector_id: The ID of the service connector to get.

    Returns:
        The requested service connector, if it was found.
    """
    connector_model = self._get_resource(
        resource_id=service_connector_id,
        route=SERVICE_CONNECTORS,
        response_model=ServiceConnectorResponseModel,
        params={"expand_secrets": False},
    )
    self._populate_connector_type(connector_model)
    return connector_model
get_service_connector_client(self, service_connector_id, resource_type=None, resource_id=None)

Get a service connector client for a service connector and given resource.

Parameters:

Name Type Description Default
service_connector_id UUID

The ID of the base service connector to use.

required
resource_type Optional[str]

The type of resource to get a client for.

None
resource_id Optional[str]

The ID of the resource to get a client for.

None

Returns:

Type Description
ServiceConnectorResponseModel

A service connector client that can be used to access the given resource.

Source code in zenml/zen_stores/rest_zen_store.py
def get_service_connector_client(
    self,
    service_connector_id: UUID,
    resource_type: Optional[str] = None,
    resource_id: Optional[str] = None,
) -> ServiceConnectorResponseModel:
    """Get a service connector client for a service connector and given resource.

    Args:
        service_connector_id: The ID of the base service connector to use.
        resource_type: The type of resource to get a client for.
        resource_id: The ID of the resource to get a client for.

    Returns:
        A service connector client that can be used to access the given
        resource.
    """
    params = {}
    if resource_type:
        params["resource_type"] = resource_type
    if resource_id:
        params["resource_id"] = resource_id
    response_body = self.get(
        f"{SERVICE_CONNECTORS}/{str(service_connector_id)}{SERVICE_CONNECTOR_CLIENT}",
        params=params,
    )

    connector = ServiceConnectorResponseModel.parse_obj(response_body)
    self._populate_connector_type(connector)
    return connector
get_service_connector_type(self, connector_type)

Returns the requested service connector type.

Parameters:

Name Type Description Default
connector_type str

the service connector type identifier.

required

Returns:

Type Description
ServiceConnectorTypeModel

The requested service connector type.

Source code in zenml/zen_stores/rest_zen_store.py
def get_service_connector_type(
    self,
    connector_type: str,
) -> ServiceConnectorTypeModel:
    """Returns the requested service connector type.

    Args:
        connector_type: the service connector type identifier.

    Returns:
        The requested service connector type.
    """
    # Use the local registry to get the service connector type, if it
    # exists.
    local_connector_type: Optional[ServiceConnectorTypeModel] = None
    if service_connector_registry.is_registered(connector_type):
        local_connector_type = (
            service_connector_registry.get_service_connector_type(
                connector_type
            )
        )
    try:
        response_body = self.get(
            f"{SERVICE_CONNECTOR_TYPES}/{connector_type}",
        )
        remote_connector_type = ServiceConnectorTypeModel.parse_obj(
            response_body
        )
        if local_connector_type:
            # If locally available, return the local connector type but
            # mark it as being remotely available.
            local_connector_type.remote = True
            return local_connector_type

        # Mark the remote connector type as being only remotely available
        remote_connector_type.local = False
        remote_connector_type.remote = True

        return remote_connector_type
    except KeyError:
        # If the service connector type is not found, check the local
        # registry.
        return service_connector_registry.get_service_connector_type(
            connector_type
        )
get_stack(self, stack_id)

Get a stack by its unique ID.

Parameters:

Name Type Description Default
stack_id UUID

The ID of the stack to get.

required

Returns:

Type Description
StackResponseModel

The stack with the given ID.

Source code in zenml/zen_stores/rest_zen_store.py
def get_stack(self, stack_id: UUID) -> StackResponseModel:
    """Get a stack by its unique ID.

    Args:
        stack_id: The ID of the stack to get.

    Returns:
        The stack with the given ID.
    """
    return self._get_resource(
        resource_id=stack_id,
        route=STACKS,
        response_model=StackResponseModel,
    )
get_stack_component(self, component_id)

Get a stack component by ID.

Parameters:

Name Type Description Default
component_id UUID

The ID of the stack component to get.

required

Returns:

Type Description
ComponentResponseModel

The stack component.

Source code in zenml/zen_stores/rest_zen_store.py
def get_stack_component(
    self, component_id: UUID
) -> ComponentResponseModel:
    """Get a stack component by ID.

    Args:
        component_id: The ID of the stack component to get.

    Returns:
        The stack component.
    """
    return self._get_resource(
        resource_id=component_id,
        route=STACK_COMPONENTS,
        response_model=ComponentResponseModel,
    )
get_store_info(self)

Get information about the server.

Returns:

Type Description
ServerModel

Information about the server.

Source code in zenml/zen_stores/rest_zen_store.py
def get_store_info(self) -> ServerModel:
    """Get information about the server.

    Returns:
        Information about the server.
    """
    body = self.get(INFO)
    return ServerModel.parse_obj(body)
get_team(self, team_name_or_id)

Gets a specific team.

Parameters:

Name Type Description Default
team_name_or_id Union[str, uuid.UUID]

Name or ID of the team to get.

required

Returns:

Type Description
TeamResponseModel

The requested team.

Source code in zenml/zen_stores/rest_zen_store.py
def get_team(self, team_name_or_id: Union[str, UUID]) -> TeamResponseModel:
    """Gets a specific team.

    Args:
        team_name_or_id: Name or ID of the team to get.

    Returns:
        The requested team.
    """
    return self._get_resource(
        resource_id=team_name_or_id,
        route=TEAMS,
        response_model=TeamResponseModel,
    )
get_team_role_assignment(self, team_role_assignment_id)

Gets a specific role assignment.

Parameters:

Name Type Description Default
team_role_assignment_id UUID

ID of the role assignment to get.

required

Returns:

Type Description
TeamRoleAssignmentResponseModel

The requested role assignment.

Source code in zenml/zen_stores/rest_zen_store.py
def get_team_role_assignment(
    self, team_role_assignment_id: UUID
) -> TeamRoleAssignmentResponseModel:
    """Gets a specific role assignment.

    Args:
        team_role_assignment_id: ID of the role assignment to get.

    Returns:
        The requested role assignment.
    """
    return self._get_resource(
        resource_id=team_role_assignment_id,
        route=TEAM_ROLE_ASSIGNMENTS,
        response_model=TeamRoleAssignmentResponseModel,
    )
get_user(self, user_name_or_id=None, include_private=False)

Gets a specific user, when no id is specified the active user is returned.

The include_private parameter is ignored here as it is handled implicitly by the /current-user endpoint that is queried when no user_name_or_id is set. Raises a KeyError in case a user with that id does not exist.

Parameters:

Name Type Description Default
user_name_or_id Union[str, uuid.UUID]

The name or ID of the user to get.

None
include_private bool

Whether to include private user information

False

Returns:

Type Description
UserResponseModel

The requested user, if it was found.

Source code in zenml/zen_stores/rest_zen_store.py
def get_user(
    self,
    user_name_or_id: Optional[Union[str, UUID]] = None,
    include_private: bool = False,
) -> UserResponseModel:
    """Gets a specific user, when no id is specified the active user is returned.

    The `include_private` parameter is ignored here as it is handled
    implicitly by the /current-user endpoint that is queried when no
    user_name_or_id is set. Raises a KeyError in case a user with that id
    does not exist.

    Args:
        user_name_or_id: The name or ID of the user to get.
        include_private: Whether to include private user information

    Returns:
        The requested user, if it was found.
    """
    if user_name_or_id:
        return self._get_resource(
            resource_id=user_name_or_id,
            route=USERS,
            response_model=UserResponseModel,
        )
    else:
        body = self.get(CURRENT_USER)
        return UserResponseModel.parse_obj(body)
get_user_role_assignment(self, user_role_assignment_id)

Get an existing role assignment by name or ID.

Parameters:

Name Type Description Default
user_role_assignment_id UUID

Name or ID of the role assignment to get.

required

Returns:

Type Description
UserRoleAssignmentResponseModel

The requested workspace.

Source code in zenml/zen_stores/rest_zen_store.py
def get_user_role_assignment(
    self, user_role_assignment_id: UUID
) -> UserRoleAssignmentResponseModel:
    """Get an existing role assignment by name or ID.

    Args:
        user_role_assignment_id: Name or ID of the role assignment to get.

    Returns:
        The requested workspace.
    """
    return self._get_resource(
        resource_id=user_role_assignment_id,
        route=USER_ROLE_ASSIGNMENTS,
        response_model=UserRoleAssignmentResponseModel,
    )
get_workspace(self, workspace_name_or_id)

Get an existing workspace by name or ID.

Parameters:

Name Type Description Default
workspace_name_or_id Union[uuid.UUID, str]

Name or ID of the workspace to get.

required

Returns:

Type Description
WorkspaceResponseModel

The requested workspace.

Source code in zenml/zen_stores/rest_zen_store.py
def get_workspace(
    self, workspace_name_or_id: Union[UUID, str]
) -> WorkspaceResponseModel:
    """Get an existing workspace by name or ID.

    Args:
        workspace_name_or_id: Name or ID of the workspace to get.

    Returns:
        The requested workspace.
    """
    return self._get_resource(
        resource_id=workspace_name_or_id,
        route=WORKSPACES,
        response_model=WorkspaceResponseModel,
    )
list_artifacts(self, artifact_filter_model)

List all artifacts matching the given filter criteria.

Parameters:

Name Type Description Default
artifact_filter_model ArtifactFilterModel

All filter parameters including pagination params.

required

Returns:

Type Description
Page[ArtifactResponseModel]

A list of all artifacts matching the filter criteria.

Source code in zenml/zen_stores/rest_zen_store.py
def list_artifacts(
    self, artifact_filter_model: ArtifactFilterModel
) -> Page[ArtifactResponseModel]:
    """List all artifacts matching the given filter criteria.

    Args:
        artifact_filter_model: All filter parameters including pagination
            params.

    Returns:
        A list of all artifacts matching the filter criteria.
    """
    return self._list_paginated_resources(
        route=ARTIFACTS,
        response_model=ArtifactResponseModel,
        filter_model=artifact_filter_model,
    )
list_builds(self, build_filter_model)

List all builds matching the given filter criteria.

Parameters:

Name Type Description Default
build_filter_model PipelineBuildFilterModel

All filter parameters including pagination params.

required

Returns:

Type Description
Page[PipelineBuildResponseModel]

A page of all builds matching the filter criteria.

Source code in zenml/zen_stores/rest_zen_store.py
def list_builds(
    self, build_filter_model: PipelineBuildFilterModel
) -> Page[PipelineBuildResponseModel]:
    """List all builds matching the given filter criteria.

    Args:
        build_filter_model: All filter parameters including pagination
            params.

    Returns:
        A page of all builds matching the filter criteria.
    """
    return self._list_paginated_resources(
        route=PIPELINE_BUILDS,
        response_model=PipelineBuildResponseModel,
        filter_model=build_filter_model,
    )
list_code_repositories(self, filter_model)

List all code repositories.

Parameters:

Name Type Description Default
filter_model CodeRepositoryFilterModel

All filter parameters including pagination params.

required

Returns:

Type Description
Page[CodeRepositoryResponseModel]

A page of all code repositories.

Source code in zenml/zen_stores/rest_zen_store.py
def list_code_repositories(
    self, filter_model: CodeRepositoryFilterModel
) -> Page[CodeRepositoryResponseModel]:
    """List all code repositories.

    Args:
        filter_model: All filter parameters including pagination
            params.

    Returns:
        A page of all code repositories.
    """
    return self._list_paginated_resources(
        route=CODE_REPOSITORIES,
        response_model=CodeRepositoryResponseModel,
        filter_model=filter_model,
    )
list_deployments(self, deployment_filter_model)

List all deployments matching the given filter criteria.

Parameters:

Name Type Description Default
deployment_filter_model PipelineDeploymentFilterModel

All filter parameters including pagination params.

required

Returns:

Type Description
Page[PipelineDeploymentResponseModel]

A page of all deployments matching the filter criteria.

Source code in zenml/zen_stores/rest_zen_store.py
def list_deployments(
    self, deployment_filter_model: PipelineDeploymentFilterModel
) -> Page[PipelineDeploymentResponseModel]:
    """List all deployments matching the given filter criteria.

    Args:
        deployment_filter_model: All filter parameters including pagination
            params.

    Returns:
        A page of all deployments matching the filter criteria.
    """
    return self._list_paginated_resources(
        route=PIPELINE_DEPLOYMENTS,
        response_model=PipelineDeploymentResponseModel,
        filter_model=deployment_filter_model,
    )
list_flavors(self, flavor_filter_model)

List all stack component flavors matching the given filter criteria.

Parameters:

Name Type Description Default
flavor_filter_model FlavorFilterModel

All filter parameters including pagination params

required

Returns:

Type Description
Page[FlavorResponseModel]

List of all the stack component flavors matching the given criteria.

Source code in zenml/zen_stores/rest_zen_store.py
def list_flavors(
    self, flavor_filter_model: FlavorFilterModel
) -> Page[FlavorResponseModel]:
    """List all stack component flavors matching the given filter criteria.

    Args:
        flavor_filter_model: All filter parameters including pagination
            params

    Returns:
        List of all the stack component flavors matching the given criteria.
    """
    return self._list_paginated_resources(
        route=FLAVORS,
        response_model=FlavorResponseModel,
        filter_model=flavor_filter_model,
    )
list_pipelines(self, pipeline_filter_model)

List all pipelines matching the given filter criteria.

Parameters:

Name Type Description Default
pipeline_filter_model PipelineFilterModel

All filter parameters including pagination params.

required

Returns:

Type Description
Page[PipelineResponseModel]

A list of all pipelines matching the filter criteria.

Source code in zenml/zen_stores/rest_zen_store.py
def list_pipelines(
    self, pipeline_filter_model: PipelineFilterModel
) -> Page[PipelineResponseModel]:
    """List all pipelines matching the given filter criteria.

    Args:
        pipeline_filter_model: All filter parameters including pagination
            params.

    Returns:
        A list of all pipelines matching the filter criteria.
    """
    return self._list_paginated_resources(
        route=PIPELINES,
        response_model=PipelineResponseModel,
        filter_model=pipeline_filter_model,
    )
list_roles(self, role_filter_model)

List all roles matching the given filter criteria.

Parameters:

Name Type Description Default
role_filter_model RoleFilterModel

All filter parameters including pagination params.

required

Returns:

Type Description
Page[RoleResponseModel]

A list of all roles matching the filter criteria.

Source code in zenml/zen_stores/rest_zen_store.py
def list_roles(
    self, role_filter_model: RoleFilterModel
) -> Page[RoleResponseModel]:
    """List all roles matching the given filter criteria.

    Args:
        role_filter_model: All filter parameters including pagination
            params.

    Returns:
        A list of all roles matching the filter criteria.
    """
    return self._list_paginated_resources(
        route=ROLES,
        response_model=RoleResponseModel,
        filter_model=role_filter_model,
    )
list_run_metadata(self, run_metadata_filter_model)

List run metadata.

Parameters:

Name Type Description Default
run_metadata_filter_model RunMetadataFilterModel

All filter parameters including pagination params.

required

Returns:

Type Description
Page[RunMetadataResponseModel]

The run metadata.

Source code in zenml/zen_stores/rest_zen_store.py
def list_run_metadata(
    self,
    run_metadata_filter_model: RunMetadataFilterModel,
) -> Page[RunMetadataResponseModel]:
    """List run metadata.

    Args:
        run_metadata_filter_model: All filter parameters including
            pagination params.

    Returns:
        The run metadata.
    """
    return self._list_paginated_resources(
        route=RUN_METADATA,
        response_model=RunMetadataResponseModel,
        filter_model=run_metadata_filter_model,
    )
list_run_steps(self, step_run_filter_model)

List all step runs matching the given filter criteria.

Parameters:

Name Type Description Default
step_run_filter_model StepRunFilterModel

All filter parameters including pagination params.

required

Returns:

Type Description
Page[StepRunResponseModel]

A list of all step runs matching the filter criteria.

Source code in zenml/zen_stores/rest_zen_store.py
def list_run_steps(
    self, step_run_filter_model: StepRunFilterModel
) -> Page[StepRunResponseModel]:
    """List all step runs matching the given filter criteria.

    Args:
        step_run_filter_model: All filter parameters including pagination
            params.

    Returns:
        A list of all step runs matching the filter criteria.
    """
    return self._list_paginated_resources(
        route=STEPS,
        response_model=StepRunResponseModel,
        filter_model=step_run_filter_model,
    )
list_runs(self, runs_filter_model)

List all pipeline runs matching the given filter criteria.

Parameters:

Name Type Description Default
runs_filter_model PipelineRunFilterModel

All filter parameters including pagination params.

required

Returns:

Type Description
Page[PipelineRunResponseModel]

A list of all pipeline runs matching the filter criteria.

Source code in zenml/zen_stores/rest_zen_store.py
def list_runs(
    self, runs_filter_model: PipelineRunFilterModel
) -> Page[PipelineRunResponseModel]:
    """List all pipeline runs matching the given filter criteria.

    Args:
        runs_filter_model: All filter parameters including pagination
            params.

    Returns:
        A list of all pipeline runs matching the filter criteria.
    """
    return self._list_paginated_resources(
        route=RUNS,
        response_model=PipelineRunResponseModel,
        filter_model=runs_filter_model,
    )
list_schedules(self, schedule_filter_model)

List all schedules in the workspace.

Parameters:

Name Type Description Default
schedule_filter_model ScheduleFilterModel

All filter parameters including pagination params

required

Returns:

Type Description
Page[ScheduleResponseModel]

A list of schedules.

Source code in zenml/zen_stores/rest_zen_store.py
def list_schedules(
    self, schedule_filter_model: ScheduleFilterModel
) -> Page[ScheduleResponseModel]:
    """List all schedules in the workspace.

    Args:
        schedule_filter_model: All filter parameters including pagination
            params

    Returns:
        A list of schedules.
    """
    return self._list_paginated_resources(
        route=SCHEDULES,
        response_model=ScheduleResponseModel,
        filter_model=schedule_filter_model,
    )
list_service_connector_resources(self, user_name_or_id, workspace_name_or_id, connector_type=None, resource_type=None, resource_id=None)

List resources that can be accessed by service connectors.

Parameters:

Name Type Description Default
user_name_or_id Union[str, uuid.UUID]

The name or ID of the user to scope to.

required
workspace_name_or_id Union[str, uuid.UUID]

The name or ID of the workspace to scope to.

required
connector_type Optional[str]

The type of service connector to scope to.

None
resource_type Optional[str]

The type of resource to scope to.

None
resource_id Optional[str]

The ID of the resource to scope to.

None

Returns:

Type Description
List[zenml.models.service_connector_models.ServiceConnectorResourcesModel]

The matching list of resources that available service connectors have access to.

Source code in zenml/zen_stores/rest_zen_store.py
def list_service_connector_resources(
    self,
    user_name_or_id: Union[str, UUID],
    workspace_name_or_id: Union[str, UUID],
    connector_type: Optional[str] = None,
    resource_type: Optional[str] = None,
    resource_id: Optional[str] = None,
) -> List[ServiceConnectorResourcesModel]:
    """List resources that can be accessed by service connectors.

    Args:
        user_name_or_id: The name or ID of the user to scope to.
        workspace_name_or_id: The name or ID of the workspace to scope to.
        connector_type: The type of service connector to scope to.
        resource_type: The type of resource to scope to.
        resource_id: The ID of the resource to scope to.

    Returns:
        The matching list of resources that available service
        connectors have access to.
    """
    params = {}
    if connector_type:
        params["connector_type"] = connector_type
    if resource_type:
        params["resource_type"] = resource_type
    if resource_id:
        params["resource_id"] = resource_id
    response_body = self.get(
        f"{WORKSPACES}/{workspace_name_or_id}{SERVICE_CONNECTORS}{SERVICE_CONNECTOR_RESOURCES}",
        params=params,
    )

    assert isinstance(response_body, list)
    resource_list = [
        ServiceConnectorResourcesModel.parse_obj(item)
        for item in response_body
    ]

    self._populate_connector_type(*resource_list)

    # For service connectors with types that are only locally available,
    # we need to retrieve the resource list locally
    for idx, resources in enumerate(resource_list):
        if isinstance(resources.connector_type, str):
            # Skip connector types that are neither locally nor remotely
            # available
            continue
        if resources.connector_type.remote:
            # Skip connector types that are remotely available
            continue

        # Retrieve the resource list locally
        assert resources.id is not None
        connector = self.get_service_connector(resources.id)
        connector_instance = (
            service_connector_registry.instantiate_connector(
                model=connector
            )
        )

        try:
            local_resources = connector_instance.verify(
                resource_type=resource_type,
                resource_id=resource_id,
            )
        except (ValueError, AuthorizationException) as e:
            logger.error(
                f'Failed to fetch {resource_type or "available"} '
                f"resources from service connector {connector.name}/"
                f"{connector.id}: {e}"
            )
            continue

        resource_list[idx] = local_resources

    return resource_list
list_service_connector_types(self, connector_type=None, resource_type=None, auth_method=None)

Get a list of service connector types.

Parameters:

Name Type Description Default
connector_type Optional[str]

Filter by connector type.

None
resource_type Optional[str]

Filter by resource type.

None
auth_method Optional[str]

Filter by authentication method.

None

Returns:

Type Description
List[zenml.models.service_connector_models.ServiceConnectorTypeModel]

List of service connector types.

Source code in zenml/zen_stores/rest_zen_store.py
def list_service_connector_types(
    self,
    connector_type: Optional[str] = None,
    resource_type: Optional[str] = None,
    auth_method: Optional[str] = None,
) -> List[ServiceConnectorTypeModel]:
    """Get a list of service connector types.

    Args:
        connector_type: Filter by connector type.
        resource_type: Filter by resource type.
        auth_method: Filter by authentication method.

    Returns:
        List of service connector types.
    """
    params = {}
    if connector_type:
        params["connector_type"] = connector_type
    if resource_type:
        params["resource_type"] = resource_type
    if auth_method:
        params["auth_method"] = auth_method
    response_body = self.get(
        SERVICE_CONNECTOR_TYPES,
        params=params,
    )

    assert isinstance(response_body, list)
    remote_connector_types = [
        ServiceConnectorTypeModel.parse_obj(item) for item in response_body
    ]

    # Mark the remote connector types as being only remotely available
    for c in remote_connector_types:
        c.local = False
        c.remote = True

    local_connector_types = (
        service_connector_registry.list_service_connector_types(
            connector_type=connector_type,
            resource_type=resource_type,
            auth_method=auth_method,
        )
    )

    # Add the connector types in the local registry to the list of
    # connector types available remotely. Overwrite those that have
    # the same connector type but mark them as being remotely available.
    connector_types_map = {
        connector_type.connector_type: connector_type
        for connector_type in remote_connector_types
    }

    for connector in local_connector_types:
        if connector.connector_type in connector_types_map:
            connector.remote = True
        connector_types_map[connector.connector_type] = connector

    return list(connector_types_map.values())
list_service_connectors(self, filter_model)

List all service connectors.

Parameters:

Name Type Description Default
filter_model ServiceConnectorFilterModel

All filter parameters including pagination params.

required

Returns:

Type Description
Page[ServiceConnectorResponseModel]

A page of all service connectors.

Source code in zenml/zen_stores/rest_zen_store.py
def list_service_connectors(
    self, filter_model: ServiceConnectorFilterModel
) -> Page[ServiceConnectorResponseModel]:
    """List all service connectors.

    Args:
        filter_model: All filter parameters including pagination
            params.

    Returns:
        A page of all service connectors.
    """
    connector_models = self._list_paginated_resources(
        route=SERVICE_CONNECTORS,
        response_model=ServiceConnectorResponseModel,
        filter_model=filter_model,
        params={"expand_secrets": False},
    )
    self._populate_connector_type(*connector_models.items)
    return connector_models
list_stack_components(self, component_filter_model)

List all stack components matching the given filter criteria.

Parameters:

Name Type Description Default
component_filter_model ComponentFilterModel

All filter parameters including pagination params.

required

Returns:

Type Description
Page[ComponentResponseModel]

A list of all stack components matching the filter criteria.

Source code in zenml/zen_stores/rest_zen_store.py
def list_stack_components(
    self, component_filter_model: ComponentFilterModel
) -> Page[ComponentResponseModel]:
    """List all stack components matching the given filter criteria.

    Args:
        component_filter_model: All filter parameters including pagination
            params.

    Returns:
        A list of all stack components matching the filter criteria.
    """
    return self._list_paginated_resources(
        route=STACK_COMPONENTS,
        response_model=ComponentResponseModel,
        filter_model=component_filter_model,
    )
list_stacks(self, stack_filter_model)

List all stacks matching the given filter criteria.

Parameters:

Name Type Description Default
stack_filter_model StackFilterModel

All filter parameters including pagination params.

required

Returns:

Type Description
Page[StackResponseModel]

A list of all stacks matching the filter criteria.

Source code in zenml/zen_stores/rest_zen_store.py
def list_stacks(
    self, stack_filter_model: StackFilterModel
) -> Page[StackResponseModel]:
    """List all stacks matching the given filter criteria.

    Args:
        stack_filter_model: All filter parameters including pagination
            params.

    Returns:
        A list of all stacks matching the filter criteria.
    """
    return self._list_paginated_resources(
        route=STACKS,
        response_model=StackResponseModel,
        filter_model=stack_filter_model,
    )
list_team_role_assignments(self, team_role_assignment_filter_model)

List all roles assignments matching the given filter criteria.

Parameters:

Name Type Description Default
team_role_assignment_filter_model TeamRoleAssignmentFilterModel

All filter parameters including pagination params.

required

Returns:

Type Description
Page[TeamRoleAssignmentResponseModel]

A list of all roles assignments matching the filter criteria.

Source code in zenml/zen_stores/rest_zen_store.py
def list_team_role_assignments(
    self, team_role_assignment_filter_model: TeamRoleAssignmentFilterModel
) -> Page[TeamRoleAssignmentResponseModel]:
    """List all roles assignments matching the given filter criteria.

    Args:
        team_role_assignment_filter_model: All filter parameters including
            pagination params.

    Returns:
        A list of all roles assignments matching the filter criteria.
    """
    return self._list_paginated_resources(
        route=TEAM_ROLE_ASSIGNMENTS,
        response_model=TeamRoleAssignmentResponseModel,
        filter_model=team_role_assignment_filter_model,
    )
list_teams(self, team_filter_model)

List all teams matching the given filter criteria.

Parameters:

Name Type Description Default
team_filter_model TeamFilterModel

All filter parameters including pagination params.

required

Returns:

Type Description
Page[TeamResponseModel]

A list of all teams matching the filter criteria.

Source code in zenml/zen_stores/rest_zen_store.py
def list_teams(
    self, team_filter_model: TeamFilterModel
) -> Page[TeamResponseModel]:
    """List all teams matching the given filter criteria.

    Args:
        team_filter_model: All filter parameters including pagination
            params.

    Returns:
        A list of all teams matching the filter criteria.
    """
    return self._list_paginated_resources(
        route=TEAMS,
        response_model=TeamResponseModel,
        filter_model=team_filter_model,
    )
list_user_role_assignments(self, user_role_assignment_filter_model)

List all roles assignments matching the given filter criteria.

Parameters:

Name Type Description Default
user_role_assignment_filter_model UserRoleAssignmentFilterModel

All filter parameters including pagination params.

required

Returns:

Type Description
Page[UserRoleAssignmentResponseModel]

A list of all roles assignments matching the filter criteria.

Source code in zenml/zen_stores/rest_zen_store.py
def list_user_role_assignments(
    self, user_role_assignment_filter_model: UserRoleAssignmentFilterModel
) -> Page[UserRoleAssignmentResponseModel]:
    """List all roles assignments matching the given filter criteria.

    Args:
        user_role_assignment_filter_model: All filter parameters including
            pagination params.

    Returns:
        A list of all roles assignments matching the filter criteria.
    """
    return self._list_paginated_resources(
        route=USER_ROLE_ASSIGNMENTS,
        response_model=UserRoleAssignmentResponseModel,
        filter_model=user_role_assignment_filter_model,
    )
list_users(self, user_filter_model)

List all users.

Parameters:

Name Type Description Default
user_filter_model UserFilterModel

All filter parameters including pagination params.

required

Returns:

Type Description
Page[UserResponseModel]

A list of all users.

Source code in zenml/zen_stores/rest_zen_store.py
def list_users(
    self, user_filter_model: UserFilterModel
) -> Page[UserResponseModel]:
    """List all users.

    Args:
        user_filter_model: All filter parameters including pagination
            params.

    Returns:
        A list of all users.
    """
    return self._list_paginated_resources(
        route=USERS,
        response_model=UserResponseModel,
        filter_model=user_filter_model,
    )
list_workspaces(self, workspace_filter_model)

List all workspace matching the given filter criteria.

Parameters:

Name Type Description Default
workspace_filter_model WorkspaceFilterModel

All filter parameters including pagination params.

required

Returns:

Type Description
Page[WorkspaceResponseModel]

A list of all workspace matching the filter criteria.

Source code in zenml/zen_stores/rest_zen_store.py
def list_workspaces(
    self, workspace_filter_model: WorkspaceFilterModel
) -> Page[WorkspaceResponseModel]:
    """List all workspace matching the given filter criteria.

    Args:
        workspace_filter_model: All filter parameters including pagination
            params.

    Returns:
        A list of all workspace matching the filter criteria.
    """
    return self._list_paginated_resources(
        route=WORKSPACES,
        response_model=WorkspaceResponseModel,
        filter_model=workspace_filter_model,
    )
post(self, path, body, params=None, **kwargs)

Make a POST request to the given endpoint path.

Parameters:

Name Type Description Default
path str

The path to the endpoint.

required
body BaseModel

The body to send.

required
params Optional[Dict[str, Any]]

The query parameters to pass to the endpoint.

None
kwargs Any

Additional keyword arguments to pass to the request.

{}

Returns:

Type Description
Union[Dict[str, Any], List[Any], str, int, float, bool]

The response body.

Source code in zenml/zen_stores/rest_zen_store.py
def post(
    self,
    path: str,
    body: BaseModel,
    params: Optional[Dict[str, Any]] = None,
    **kwargs: Any,
) -> Json:
    """Make a POST request to the given endpoint path.

    Args:
        path: The path to the endpoint.
        body: The body to send.
        params: The query parameters to pass to the endpoint.
        kwargs: Additional keyword arguments to pass to the request.

    Returns:
        The response body.
    """
    logger.debug(f"Sending POST request to {path}...")
    return self._request(
        "POST",
        self.url + API + VERSION_1 + path,
        data=body.json(),
        params=params,
        **kwargs,
    )
put(self, path, body=None, params=None, **kwargs)

Make a PUT request to the given endpoint path.

Parameters:

Name Type Description Default
path str

The path to the endpoint.

required
body Optional[pydantic.main.BaseModel]

The body to send.

None
params Optional[Dict[str, Any]]

The query parameters to pass to the endpoint.

None
kwargs Any

Additional keyword arguments to pass to the request.

{}

Returns:

Type Description
Union[Dict[str, Any], List[Any], str, int, float, bool]

The response body.

Source code in zenml/zen_stores/rest_zen_store.py
def put(
    self,
    path: str,
    body: Optional[BaseModel] = None,
    params: Optional[Dict[str, Any]] = None,
    **kwargs: Any,
) -> Json:
    """Make a PUT request to the given endpoint path.

    Args:
        path: The path to the endpoint.
        body: The body to send.
        params: The query parameters to pass to the endpoint.
        kwargs: Additional keyword arguments to pass to the request.

    Returns:
        The response body.
    """
    logger.debug(f"Sending PUT request to {path}...")
    data = body.json(exclude_unset=True) if body else None
    return self._request(
        "PUT",
        self.url + API + VERSION_1 + path,
        data=data,
        params=params,
        **kwargs,
    )
update_code_repository(self, code_repository_id, update)

Updates an existing code repository.

Parameters:

Name Type Description Default
code_repository_id UUID

The ID of the code repository to update.

required
update CodeRepositoryUpdateModel

The update to be applied to the code repository.

required

Returns:

Type Description
CodeRepositoryResponseModel

The updated code repository.

Source code in zenml/zen_stores/rest_zen_store.py
def update_code_repository(
    self, code_repository_id: UUID, update: CodeRepositoryUpdateModel
) -> CodeRepositoryResponseModel:
    """Updates an existing code repository.

    Args:
        code_repository_id: The ID of the code repository to update.
        update: The update to be applied to the code repository.

    Returns:
        The updated code repository.
    """
    return self._update_resource(
        resource_id=code_repository_id,
        resource_update=update,
        response_model=CodeRepositoryResponseModel,
        route=CODE_REPOSITORIES,
    )
update_flavor(self, flavor_id, flavor_update)

Updates an existing user.

Parameters:

Name Type Description Default
flavor_id UUID

The id of the flavor to update.

required
flavor_update FlavorUpdateModel

The update to be applied to the flavor.

required

Returns:

Type Description
FlavorResponseModel

The updated flavor.

Source code in zenml/zen_stores/rest_zen_store.py
def update_flavor(
    self, flavor_id: UUID, flavor_update: FlavorUpdateModel
) -> FlavorResponseModel:
    """Updates an existing user.

    Args:
        flavor_id: The id of the flavor to update.
        flavor_update: The update to be applied to the flavor.

    Returns:
        The updated flavor.
    """
    return self._update_resource(
        resource_id=flavor_id,
        resource_update=flavor_update,
        route=FLAVORS,
        response_model=FlavorResponseModel,
    )
update_pipeline(self, pipeline_id, pipeline_update)

Updates a pipeline.

Parameters:

Name Type Description Default
pipeline_id UUID

The ID of the pipeline to be updated.

required
pipeline_update PipelineUpdateModel

The update to be applied.

required

Returns:

Type Description
PipelineResponseModel

The updated pipeline.

Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.UPDATE_PIPELINE)
def update_pipeline(
    self, pipeline_id: UUID, pipeline_update: PipelineUpdateModel
) -> PipelineResponseModel:
    """Updates a pipeline.

    Args:
        pipeline_id: The ID of the pipeline to be updated.
        pipeline_update: The update to be applied.

    Returns:
        The updated pipeline.
    """
    return self._update_resource(
        resource_id=pipeline_id,
        resource_update=pipeline_update,
        route=PIPELINES,
        response_model=PipelineResponseModel,
    )
update_role(self, role_id, role_update)

Update an existing role.

Parameters:

Name Type Description Default
role_id UUID

The ID of the role to be updated.

required
role_update RoleUpdateModel

The update to be applied to the role.

required

Returns:

Type Description
RoleResponseModel

The updated role.

Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.UPDATED_ROLE)
def update_role(
    self, role_id: UUID, role_update: RoleUpdateModel
) -> RoleResponseModel:
    """Update an existing role.

    Args:
        role_id: The ID of the role to be updated.
        role_update: The update to be applied to the role.

    Returns:
        The updated role.
    """
    return self._update_resource(
        resource_id=role_id,
        resource_update=role_update,
        route=ROLES,
        response_model=RoleResponseModel,
    )
update_run(self, run_id, run_update)

Updates a pipeline run.

Parameters:

Name Type Description Default
run_id UUID

The ID of the pipeline run to update.

required
run_update PipelineRunUpdateModel

The update to be applied to the pipeline run.

required

Returns:

Type Description
PipelineRunResponseModel

The updated pipeline run.

Source code in zenml/zen_stores/rest_zen_store.py
def update_run(
    self, run_id: UUID, run_update: PipelineRunUpdateModel
) -> PipelineRunResponseModel:
    """Updates a pipeline run.

    Args:
        run_id: The ID of the pipeline run to update.
        run_update: The update to be applied to the pipeline run.


    Returns:
        The updated pipeline run.
    """
    return self._update_resource(
        resource_id=run_id,
        resource_update=run_update,
        response_model=PipelineRunResponseModel,
        route=RUNS,
    )
update_run_step(self, step_run_id, step_run_update)

Updates a step run.

Parameters:

Name Type Description Default
step_run_id UUID

The ID of the step to update.

required
step_run_update StepRunUpdateModel

The update to be applied to the step.

required

Returns:

Type Description
StepRunResponseModel

The updated step run.

Source code in zenml/zen_stores/rest_zen_store.py
def update_run_step(
    self,
    step_run_id: UUID,
    step_run_update: StepRunUpdateModel,
) -> StepRunResponseModel:
    """Updates a step run.

    Args:
        step_run_id: The ID of the step to update.
        step_run_update: The update to be applied to the step.

    Returns:
        The updated step run.
    """
    return self._update_resource(
        resource_id=step_run_id,
        resource_update=step_run_update,
        response_model=StepRunResponseModel,
        route=STEPS,
    )
update_schedule(self, schedule_id, schedule_update)

Updates a schedule.

Parameters:

Name Type Description Default
schedule_id UUID

The ID of the schedule to be updated.

required
schedule_update ScheduleUpdateModel

The update to be applied.

required

Returns:

Type Description
ScheduleResponseModel

The updated schedule.

Source code in zenml/zen_stores/rest_zen_store.py
def update_schedule(
    self,
    schedule_id: UUID,
    schedule_update: ScheduleUpdateModel,
) -> ScheduleResponseModel:
    """Updates a schedule.

    Args:
        schedule_id: The ID of the schedule to be updated.
        schedule_update: The update to be applied.

    Returns:
        The updated schedule.
    """
    return self._update_resource(
        resource_id=schedule_id,
        resource_update=schedule_update,
        route=SCHEDULES,
        response_model=ScheduleResponseModel,
    )
update_service_connector(self, service_connector_id, update)

Updates an existing service connector.

The update model contains the fields to be updated. If a field value is set to None in the model, the field is not updated, but there are special rules concerning some fields:

  • the configuration and secrets fields together represent a full valid configuration update, not just a partial update. If either is set (i.e. not None) in the update, their values are merged together and will replace the existing configuration and secrets values.
  • the resource_id field value is also a full replacement value: if set to None, the resource ID is removed from the service connector.
  • the expiration_seconds field value is also a full replacement value: if set to None, the expiration is removed from the service connector.
  • the secret_id field value in the update is ignored, given that secrets are managed internally by the ZenML store.
  • the labels field is also a full labels update: if set (i.e. not None), all existing labels are removed and replaced by the new labels in the update.

Parameters:

Name Type Description Default
service_connector_id UUID

The ID of the service connector to update.

required
update ServiceConnectorUpdateModel

The update to be applied to the service connector.

required

Returns:

Type Description
ServiceConnectorResponseModel

The updated service connector.

Source code in zenml/zen_stores/rest_zen_store.py
def update_service_connector(
    self, service_connector_id: UUID, update: ServiceConnectorUpdateModel
) -> ServiceConnectorResponseModel:
    """Updates an existing service connector.

    The update model contains the fields to be updated. If a field value is
    set to None in the model, the field is not updated, but there are
    special rules concerning some fields:

    * the `configuration` and `secrets` fields together represent a full
    valid configuration update, not just a partial update. If either is
    set (i.e. not None) in the update, their values are merged together and
    will replace the existing configuration and secrets values.
    * the `resource_id` field value is also a full replacement value: if set
    to `None`, the resource ID is removed from the service connector.
    * the `expiration_seconds` field value is also a full replacement value:
    if set to `None`, the expiration is removed from the service connector.
    * the `secret_id` field value in the update is ignored, given that
    secrets are managed internally by the ZenML store.
    * the `labels` field is also a full labels update: if set (i.e. not
    `None`), all existing labels are removed and replaced by the new labels
    in the update.

    Args:
        service_connector_id: The ID of the service connector to update.
        update: The update to be applied to the service connector.

    Returns:
        The updated service connector.
    """
    connector_model = self._update_resource(
        resource_id=service_connector_id,
        resource_update=update,
        response_model=ServiceConnectorResponseModel,
        route=SERVICE_CONNECTORS,
    )
    self._populate_connector_type(connector_model)
    return connector_model
update_stack(self, stack_id, stack_update)

Update a stack.

Parameters:

Name Type Description Default
stack_id UUID

The ID of the stack update.

required
stack_update StackUpdateModel

The update request on the stack.

required

Returns:

Type Description
StackResponseModel

The updated stack.

Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.UPDATED_STACK)
def update_stack(
    self, stack_id: UUID, stack_update: StackUpdateModel
) -> StackResponseModel:
    """Update a stack.

    Args:
        stack_id: The ID of the stack update.
        stack_update: The update request on the stack.

    Returns:
        The updated stack.
    """
    return self._update_resource(
        resource_id=stack_id,
        resource_update=stack_update,
        route=STACKS,
        response_model=StackResponseModel,
    )
update_stack_component(self, component_id, component_update)

Update an existing stack component.

Parameters:

Name Type Description Default
component_id UUID

The ID of the stack component to update.

required
component_update ComponentUpdateModel

The update to be applied to the stack component.

required

Returns:

Type Description
ComponentResponseModel

The updated stack component.

Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.UPDATED_STACK_COMPONENT)
def update_stack_component(
    self,
    component_id: UUID,
    component_update: ComponentUpdateModel,
) -> ComponentResponseModel:
    """Update an existing stack component.

    Args:
        component_id: The ID of the stack component to update.
        component_update: The update to be applied to the stack component.

    Returns:
        The updated stack component.
    """
    return self._update_resource(
        resource_id=component_id,
        resource_update=component_update,
        route=STACK_COMPONENTS,
        response_model=ComponentResponseModel,
    )
update_team(self, team_id, team_update)

Upda