Zen Stores
zenml.zen_stores
special
ZenStores define ways to store ZenML relevant data locally or remotely.
base_zen_store
Base Zen Store implementation.
BaseZenStore (BaseModel, ZenStoreInterface, SecretsStoreInterface, AnalyticsTrackerMixin, ABC)
pydantic-model
Base class for accessing and persisting ZenML core objects.
Attributes:
Name | Type | Description |
---|---|---|
config |
StoreConfiguration |
The configuration of the store. |
track_analytics |
bool |
Only send analytics if set to |
secrets_store |
The secrets store to use for storing sensitive data. |
Source code in zenml/zen_stores/base_zen_store.py
class BaseZenStore(
BaseModel,
ZenStoreInterface,
SecretsStoreInterface,
AnalyticsTrackerMixin,
ABC,
):
"""Base class for accessing and persisting ZenML core objects.
Attributes:
config: The configuration of the store.
track_analytics: Only send analytics if set to `True`.
secrets_store: The secrets store to use for storing sensitive data.
"""
config: StoreConfiguration
track_analytics: bool = True
_secrets_store: Optional[BaseSecretsStore] = None
_event_handlers: Dict[StoreEvent, List[Callable[..., Any]]] = {}
TYPE: ClassVar[StoreType]
CONFIG_TYPE: ClassVar[Type[StoreConfiguration]]
# ---------------------------------
# Initialization and configuration
# ---------------------------------
def __init__(
self,
skip_default_registrations: bool = False,
**kwargs: Any,
) -> None:
"""Create and initialize a store.
Args:
skip_default_registrations: If `True`, the creation of the default
stack and user in the store will be skipped.
**kwargs: Additional keyword arguments to pass to the Pydantic
constructor.
Raises:
RuntimeError: If the store cannot be initialized.
"""
super().__init__(**kwargs)
try:
self._initialize()
except Exception as e:
raise RuntimeError(
f"Error initializing {self.type.value} store with URL "
f"'{self.url}': {str(e)}"
) from e
if not skip_default_registrations:
logger.debug("Initializing database")
self._initialize_database()
else:
logger.debug("Skipping database initialization")
@staticmethod
def get_store_class(store_type: StoreType) -> Type["BaseZenStore"]:
"""Returns the class of the given store type.
Args:
store_type: The type of the store to get the class for.
Returns:
The class of the given store type or None if the type is unknown.
Raises:
TypeError: If the store type is unsupported.
"""
if store_type == StoreType.SQL:
from zenml.zen_stores.sql_zen_store import SqlZenStore
return SqlZenStore
elif store_type == StoreType.REST:
from zenml.zen_stores.rest_zen_store import RestZenStore
return RestZenStore
else:
raise TypeError(
f"No store implementation found for store type "
f"`{store_type.value}`."
)
@staticmethod
def get_store_config_class(
store_type: StoreType,
) -> Type["StoreConfiguration"]:
"""Returns the store config class of the given store type.
Args:
store_type: The type of the store to get the class for.
Returns:
The config class of the given store type.
"""
store_class = BaseZenStore.get_store_class(store_type)
return store_class.CONFIG_TYPE
@staticmethod
def get_store_type(url: str) -> StoreType:
"""Returns the store type associated with a URL schema.
Args:
url: The store URL.
Returns:
The store type associated with the supplied URL schema.
Raises:
TypeError: If no store type was found to support the supplied URL.
"""
from zenml.zen_stores.rest_zen_store import RestZenStoreConfiguration
from zenml.zen_stores.sql_zen_store import SqlZenStoreConfiguration
if SqlZenStoreConfiguration.supports_url_scheme(url):
return StoreType.SQL
elif RestZenStoreConfiguration.supports_url_scheme(url):
return StoreType.REST
else:
raise TypeError(f"No store implementation found for URL: {url}.")
@staticmethod
def create_store(
config: StoreConfiguration,
skip_default_registrations: bool = False,
**kwargs: Any,
) -> "BaseZenStore":
"""Create and initialize a store from a store configuration.
Args:
config: The store configuration to use.
skip_default_registrations: If `True`, the creation of the default
stack and user in the store will be skipped.
**kwargs: Additional keyword arguments to pass to the store class
Returns:
The initialized store.
"""
logger.debug(f"Creating store with config '{config}'...")
store_class = BaseZenStore.get_store_class(config.type)
store = store_class(
config=config,
skip_default_registrations=skip_default_registrations,
**kwargs,
)
secrets_store_config = store.config.secrets_store
# Initialize the secrets store
if (
secrets_store_config
and secrets_store_config.type != SecretsStoreType.NONE
):
secrets_store_class = BaseSecretsStore.get_store_class(
secrets_store_config
)
store._secrets_store = secrets_store_class(
zen_store=store,
config=secrets_store_config,
)
# Update the config with the actual secrets store config
# to reflect the default values in the saved configuration
store.config.secrets_store = store._secrets_store.config
return store
@staticmethod
def get_default_store_config(path: str) -> StoreConfiguration:
"""Get the default store configuration.
The default store is a SQLite store that saves the DB contents on the
local filesystem.
Args:
path: The local path where the store DB will be stored.
Returns:
The default store configuration.
"""
from zenml.zen_stores.sql_zen_store import SqlZenStoreConfiguration
config = SqlZenStoreConfiguration(
type=StoreType.SQL,
url=SqlZenStoreConfiguration.get_local_url(path),
secrets_store=SqlSecretsStoreConfiguration(
type=SecretsStoreType.SQL,
),
)
return config
def _initialize_database(self) -> None:
"""Initialize the database on first use."""
try:
default_workspace = self._default_workspace
except KeyError:
default_workspace = self._create_default_workspace()
try:
assert self._admin_role
except KeyError:
self._create_admin_role()
try:
assert self._guest_role
except KeyError:
self._create_guest_role()
try:
default_user = self._default_user
except KeyError:
default_user = self._create_default_user()
try:
self._get_default_stack(
workspace_name_or_id=default_workspace.id,
user_name_or_id=default_user.id,
)
except KeyError:
self._create_default_stack(
workspace_name_or_id=default_workspace.id,
user_name_or_id=default_user.id,
)
@property
def url(self) -> str:
"""The URL of the store.
Returns:
The URL of the store.
"""
return self.config.url
@property
def type(self) -> StoreType:
"""The type of the store.
Returns:
The type of the store.
"""
return self.TYPE
@property
def secrets_store(self) -> Optional["BaseSecretsStore"]:
"""The secrets store associated with this store.
Returns:
The secrets store associated with this store.
"""
return self._secrets_store
def validate_active_config(
self,
active_workspace_name_or_id: Optional[Union[str, UUID]] = None,
active_stack_id: Optional[UUID] = None,
config_name: str = "",
) -> Tuple[WorkspaceResponseModel, StackResponseModel]:
"""Validate the active configuration.
Call this method to validate the supplied active workspace and active
stack values.
This method is guaranteed to return valid workspace ID and stack ID
values. If the supplied workspace and stack are not set or are not valid
(e.g. they do not exist or are not accessible), the default workspace and
default workspace stack will be returned in their stead.
Args:
active_workspace_name_or_id: The name or ID of the active workspace.
active_stack_id: The ID of the active stack.
config_name: The name of the configuration to validate (used in the
displayed logs/messages).
Returns:
A tuple containing the active workspace and active stack.
"""
active_workspace: WorkspaceResponseModel
if active_workspace_name_or_id:
try:
active_workspace = self.get_workspace(
active_workspace_name_or_id
)
except KeyError:
active_workspace = self._get_or_create_default_workspace()
logger.warning(
f"The current {config_name} active workspace is no longer "
f"available. Resetting the active workspace to "
f"'{active_workspace.name}'."
)
else:
active_workspace = self._get_or_create_default_workspace()
logger.info(
f"Setting the {config_name} active workspace "
f"to '{active_workspace.name}'."
)
active_stack: StackResponseModel
# Sanitize the active stack
if active_stack_id:
# Ensure that the active stack is still valid
try:
active_stack = self.get_stack(stack_id=active_stack_id)
except KeyError:
logger.warning(
"The current %s active stack is no longer available. "
"Resetting the active stack to default.",
config_name,
)
active_stack = self._get_or_create_default_stack(
active_workspace
)
else:
if active_stack.workspace.id != active_workspace.id:
logger.warning(
"The current %s active stack is not part of the active "
"workspace. Resetting the active stack to default.",
config_name,
)
active_stack = self._get_or_create_default_stack(
active_workspace
)
elif not active_stack.is_shared and (
not active_stack.user
or (active_stack.user.id != self.get_user().id)
):
logger.warning(
"The current %s active stack is not shared and not "
"owned by the active user. "
"Resetting the active stack to default.",
config_name,
)
active_stack = self._get_or_create_default_stack(
active_workspace
)
else:
logger.warning(
"Setting the %s active stack to default.",
config_name,
)
active_stack = self._get_or_create_default_stack(active_workspace)
return active_workspace, active_stack
def get_store_info(self) -> ServerModel:
"""Get information about the store.
Returns:
Information about the store.
"""
return ServerModel(
id=GlobalConfiguration().user_id,
version=zenml.__version__,
deployment_type=os.environ.get(
ENV_ZENML_SERVER_DEPLOYMENT_TYPE, ServerDeploymentType.OTHER
),
database_type=ServerDatabaseType.OTHER,
debug=IS_DEBUG_ENV,
secrets_store_type=self.secrets_store.type
if self.secrets_store
else SecretsStoreType.NONE,
)
def is_local_store(self) -> bool:
"""Check if the store is local or connected to a local ZenML server.
Returns:
True if the store is local, False otherwise.
"""
return self.get_store_info().is_local()
def _get_or_create_default_stack(
self, workspace: "WorkspaceResponseModel"
) -> "StackResponseModel":
try:
return self._get_default_stack(
workspace_name_or_id=workspace.id,
user_name_or_id=self.get_user().id,
)
except KeyError:
return self._create_default_stack(
workspace_name_or_id=workspace.id,
user_name_or_id=self.get_user().id,
)
def _get_or_create_default_workspace(self) -> "WorkspaceResponseModel":
try:
return self._default_workspace
except KeyError:
return self._create_default_workspace()
# --------------
# Event Handlers
# --------------
def register_event_handler(
self,
event: StoreEvent,
handler: Callable[..., Any],
) -> None:
"""Register an external event handler.
The handler will be called when the store event is triggered.
Args:
event: The event to register the handler for.
handler: The handler function to register.
"""
self._event_handlers.setdefault(event, []).append(handler)
def _trigger_event(self, event: StoreEvent, **kwargs: Any) -> None:
"""Trigger an event and call all registered handlers.
Args:
event: The event to trigger.
**kwargs: The event arguments.
"""
for handler in self._event_handlers.get(event, []):
try:
handler(event, **kwargs)
except Exception as e:
logger.error(
f"Silently ignoring error caught while triggering event "
f"store handler for event {event.value}: {e}",
exc_info=True,
)
# ------
# Stacks
# ------
@track(AnalyticsEvent.REGISTERED_DEFAULT_STACK)
def _create_default_stack(
self,
workspace_name_or_id: Union[str, UUID],
user_name_or_id: Union[str, UUID],
) -> StackResponseModel:
"""Create the default stack components and stack.
The default stack contains a local orchestrator and a local artifact
store.
Args:
workspace_name_or_id: Name or ID of the workspace to which the stack
belongs.
user_name_or_id: The name or ID of the user that owns the stack.
Returns:
The model of the created default stack.
"""
workspace = self.get_workspace(
workspace_name_or_id=workspace_name_or_id
)
user = self.get_user(user_name_or_id=user_name_or_id)
logger.info(
f"Creating default stack for user '{user.name}' in workspace "
f"{workspace.name}..."
)
# Register the default orchestrator
orchestrator = self.create_stack_component(
component=ComponentRequestModel(
user=user.id,
workspace=workspace.id,
name=DEFAULT_STACK_COMPONENT_NAME,
type=StackComponentType.ORCHESTRATOR,
flavor="local",
configuration={},
),
)
# Register the default artifact store
artifact_store = self.create_stack_component(
component=ComponentRequestModel(
user=user.id,
workspace=workspace.id,
name=DEFAULT_STACK_COMPONENT_NAME,
type=StackComponentType.ARTIFACT_STORE,
flavor="local",
configuration={},
),
)
components = {c.type: [c.id] for c in [orchestrator, artifact_store]}
# Register the default stack
stack = StackRequestModel(
name=DEFAULT_STACK_NAME,
components=components,
is_shared=False,
workspace=workspace.id,
user=user.id,
)
return self.create_stack(stack=stack)
def _get_default_stack(
self,
workspace_name_or_id: Union[str, UUID],
user_name_or_id: Union[str, UUID],
) -> StackResponseModel:
"""Get the default stack for a user in a workspace.
Args:
workspace_name_or_id: Name or ID of the workspace.
user_name_or_id: Name or ID of the user.
Returns:
The default stack in the workspace owned by the supplied user.
Raises:
KeyError: if the workspace or default stack doesn't exist.
"""
default_stacks = self.list_stacks(
StackFilterModel(
workspace_id=workspace_name_or_id,
user_id=user_name_or_id,
name=DEFAULT_STACK_NAME,
)
)
if default_stacks.total == 0:
raise KeyError(
f"No default stack found for user {str(user_name_or_id)} in "
f"workspace {str(workspace_name_or_id)}"
)
return default_stacks.items[0]
# -----
# Roles
# -----
@property
def _admin_role(self) -> RoleResponseModel:
"""Get the admin role.
Returns:
The default admin role.
"""
return self.get_role(DEFAULT_ADMIN_ROLE)
@track(AnalyticsEvent.CREATED_DEFAULT_ROLES)
def _create_admin_role(self) -> RoleResponseModel:
"""Creates the admin role.
Returns:
The admin role
"""
logger.info(f"Creating '{DEFAULT_ADMIN_ROLE}' role ...")
return self.create_role(
RoleRequestModel(
name=DEFAULT_ADMIN_ROLE,
permissions={
PermissionType.READ,
PermissionType.WRITE,
PermissionType.ME,
},
)
)
@property
def _guest_role(self) -> RoleResponseModel:
"""Get the guest role.
Returns:
The guest role.
"""
return self.get_role(DEFAULT_GUEST_ROLE)
@track(AnalyticsEvent.CREATED_DEFAULT_ROLES)
def _create_guest_role(self) -> RoleResponseModel:
"""Creates the guest role.
Returns:
The guest role
"""
logger.info(f"Creating '{DEFAULT_GUEST_ROLE}' role ...")
return self.create_role(
RoleRequestModel(
name=DEFAULT_GUEST_ROLE,
permissions={
PermissionType.READ,
PermissionType.ME,
},
)
)
# -----
# Users
# -----
@property
def _default_user_name(self) -> str:
"""Get the default user name.
Returns:
The default user name.
"""
return os.getenv(ENV_ZENML_DEFAULT_USER_NAME, DEFAULT_USERNAME)
@property
def _default_user(self) -> UserResponseModel:
"""Get the default user.
Returns:
The default user.
Raises:
KeyError: If the default user doesn't exist.
"""
user_name = self._default_user_name
try:
return self.get_user(user_name)
except KeyError:
raise KeyError(f"The default user '{user_name}' is not configured")
@track(AnalyticsEvent.CREATED_DEFAULT_USER)
def _create_default_user(self) -> UserResponseModel:
"""Creates a default user with the admin role.
Returns:
The default user.
"""
user_name = os.getenv(ENV_ZENML_DEFAULT_USER_NAME, DEFAULT_USERNAME)
user_password = os.getenv(
ENV_ZENML_DEFAULT_USER_PASSWORD, DEFAULT_PASSWORD
)
logger.info(f"Creating default user '{user_name}' ...")
new_user = self.create_user(
UserRequestModel(
name=user_name,
active=True,
password=user_password,
)
)
self.create_user_role_assignment(
UserRoleAssignmentRequestModel(
role=self._admin_role.id,
user=new_user.id,
workspace=None,
)
)
return new_user
# -----
# Roles
# -----
@property
def roles(self) -> Page[RoleResponseModel]:
"""All existing roles.
Returns:
A list of all existing roles.
"""
return self.list_roles(RoleFilterModel())
# --------
# Workspaces
# --------
@property
def _default_workspace_name(self) -> str:
"""Get the default workspace name.
Returns:
The default workspace name.
"""
return os.getenv(
ENV_ZENML_DEFAULT_WORKSPACE_NAME, DEFAULT_WORKSPACE_NAME
)
@property
def _default_workspace(self) -> WorkspaceResponseModel:
"""Get the default workspace.
Returns:
The default workspace.
Raises:
KeyError: if the default workspace doesn't exist.
"""
workspace_name = self._default_workspace_name
try:
return self.get_workspace(workspace_name)
except KeyError:
raise KeyError(
f"The default workspace '{workspace_name}' is not configured"
)
@track(AnalyticsEvent.CREATED_DEFAULT_WORKSPACE)
def _create_default_workspace(self) -> WorkspaceResponseModel:
"""Creates a default workspace.
Returns:
The default workspace.
"""
workspace_name = self._default_workspace_name
logger.info(f"Creating default workspace '{workspace_name}' ...")
return self.create_workspace(
WorkspaceRequestModel(name=workspace_name)
)
# ---------
# Analytics
# ---------
def track_event(
self,
event: AnalyticsEvent,
metadata: Optional[Dict[str, Any]] = None,
) -> None:
"""Track an analytics event.
Args:
event: The event to track.
metadata: Additional metadata to track with the event.
"""
if self.track_analytics:
# Server information is always tracked, if available.
track_event(event, metadata)
class Config:
"""Pydantic configuration class."""
# Validate attributes when assigning them. We need to set this in order
# to have a mix of mutable and immutable attributes
validate_assignment = True
# Ignore extra attributes from configs of previous ZenML versions
extra = "ignore"
# all attributes with leading underscore are private and therefore
# are mutable and not included in serialization
underscore_attrs_are_private = True
roles: Page[RoleResponseModel]
property
readonly
All existing roles.
Returns:
Type | Description |
---|---|
Page[RoleResponseModel] |
A list of all existing roles. |
secrets_store: Optional[BaseSecretsStore]
property
readonly
The secrets store associated with this store.
Returns:
Type | Description |
---|---|
Optional[BaseSecretsStore] |
The secrets store associated with this store. |
type: StoreType
property
readonly
The type of the store.
Returns:
Type | Description |
---|---|
StoreType |
The type of the store. |
url: str
property
readonly
The URL of the store.
Returns:
Type | Description |
---|---|
str |
The URL of the store. |
Config
Pydantic configuration class.
Source code in zenml/zen_stores/base_zen_store.py
class Config:
"""Pydantic configuration class."""
# Validate attributes when assigning them. We need to set this in order
# to have a mix of mutable and immutable attributes
validate_assignment = True
# Ignore extra attributes from configs of previous ZenML versions
extra = "ignore"
# all attributes with leading underscore are private and therefore
# are mutable and not included in serialization
underscore_attrs_are_private = True
__init__(self, skip_default_registrations=False, **kwargs)
special
Create and initialize a store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
skip_default_registrations |
bool |
If |
False |
**kwargs |
Any |
Additional keyword arguments to pass to the Pydantic constructor. |
{} |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the store cannot be initialized. |
Source code in zenml/zen_stores/base_zen_store.py
def __init__(
self,
skip_default_registrations: bool = False,
**kwargs: Any,
) -> None:
"""Create and initialize a store.
Args:
skip_default_registrations: If `True`, the creation of the default
stack and user in the store will be skipped.
**kwargs: Additional keyword arguments to pass to the Pydantic
constructor.
Raises:
RuntimeError: If the store cannot be initialized.
"""
super().__init__(**kwargs)
try:
self._initialize()
except Exception as e:
raise RuntimeError(
f"Error initializing {self.type.value} store with URL "
f"'{self.url}': {str(e)}"
) from e
if not skip_default_registrations:
logger.debug("Initializing database")
self._initialize_database()
else:
logger.debug("Skipping database initialization")
create_secret(self, secret)
Creates a new secret.
The new secret is also validated against the scoping rules enforced in the secrets store:
- only one workspace-scoped secret with the given name can exist in the target workspace.
- only one user-scoped secret with the given name can exist in the target workspace for the target user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
SecretRequestModel |
The secret to create. |
required |
Returns:
Type | Description |
---|---|
SecretResponseModel |
The newly created secret. |
Exceptions:
Type | Description |
---|---|
KeyError |
if the user or workspace does not exist. |
EntityExistsError |
If a secret with the same name already exists in the same scope. |
ValueError |
if the secret is invalid. |
Source code in zenml/zen_stores/base_zen_store.py
@abstractmethod
def create_secret(
self,
secret: SecretRequestModel,
) -> SecretResponseModel:
"""Creates a new secret.
The new secret is also validated against the scoping rules enforced in
the secrets store:
- only one workspace-scoped secret with the given name can exist
in the target workspace.
- only one user-scoped secret with the given name can exist in the
target workspace for the target user.
Args:
secret: The secret to create.
Returns:
The newly created secret.
Raises:
KeyError: if the user or workspace does not exist.
EntityExistsError: If a secret with the same name already exists in
the same scope.
ValueError: if the secret is invalid.
"""
create_store(config, skip_default_registrations=False, **kwargs)
staticmethod
Create and initialize a store from a store configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
StoreConfiguration |
The store configuration to use. |
required |
skip_default_registrations |
bool |
If |
False |
**kwargs |
Any |
Additional keyword arguments to pass to the store class |
{} |
Returns:
Type | Description |
---|---|
BaseZenStore |
The initialized store. |
Source code in zenml/zen_stores/base_zen_store.py
@staticmethod
def create_store(
config: StoreConfiguration,
skip_default_registrations: bool = False,
**kwargs: Any,
) -> "BaseZenStore":
"""Create and initialize a store from a store configuration.
Args:
config: The store configuration to use.
skip_default_registrations: If `True`, the creation of the default
stack and user in the store will be skipped.
**kwargs: Additional keyword arguments to pass to the store class
Returns:
The initialized store.
"""
logger.debug(f"Creating store with config '{config}'...")
store_class = BaseZenStore.get_store_class(config.type)
store = store_class(
config=config,
skip_default_registrations=skip_default_registrations,
**kwargs,
)
secrets_store_config = store.config.secrets_store
# Initialize the secrets store
if (
secrets_store_config
and secrets_store_config.type != SecretsStoreType.NONE
):
secrets_store_class = BaseSecretsStore.get_store_class(
secrets_store_config
)
store._secrets_store = secrets_store_class(
zen_store=store,
config=secrets_store_config,
)
# Update the config with the actual secrets store config
# to reflect the default values in the saved configuration
store.config.secrets_store = store._secrets_store.config
return store
delete_secret(self, secret_id)
Deletes a secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_id |
UUID |
The ID of the secret to delete. |
required |
Exceptions:
Type | Description |
---|---|
KeyError |
if the secret doesn't exist. |
Source code in zenml/zen_stores/base_zen_store.py
@abstractmethod
def delete_secret(self, secret_id: UUID) -> None:
"""Deletes a secret.
Args:
secret_id: The ID of the secret to delete.
Raises:
KeyError: if the secret doesn't exist.
"""
get_default_store_config(path)
staticmethod
Get the default store configuration.
The default store is a SQLite store that saves the DB contents on the local filesystem.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The local path where the store DB will be stored. |
required |
Returns:
Type | Description |
---|---|
StoreConfiguration |
The default store configuration. |
Source code in zenml/zen_stores/base_zen_store.py
@staticmethod
def get_default_store_config(path: str) -> StoreConfiguration:
"""Get the default store configuration.
The default store is a SQLite store that saves the DB contents on the
local filesystem.
Args:
path: The local path where the store DB will be stored.
Returns:
The default store configuration.
"""
from zenml.zen_stores.sql_zen_store import SqlZenStoreConfiguration
config = SqlZenStoreConfiguration(
type=StoreType.SQL,
url=SqlZenStoreConfiguration.get_local_url(path),
secrets_store=SqlSecretsStoreConfiguration(
type=SecretsStoreType.SQL,
),
)
return config
get_secret(self, secret_id)
Get a secret with a given name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_id |
UUID |
ID of the secret. |
required |
Returns:
Type | Description |
---|---|
SecretResponseModel |
The secret. |
Exceptions:
Type | Description |
---|---|
KeyError |
if the secret does not exist. |
Source code in zenml/zen_stores/base_zen_store.py
@abstractmethod
def get_secret(self, secret_id: UUID) -> SecretResponseModel:
"""Get a secret with a given name.
Args:
secret_id: ID of the secret.
Returns:
The secret.
Raises:
KeyError: if the secret does not exist.
"""
get_store_class(store_type)
staticmethod
Returns the class of the given store type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
store_type |
StoreType |
The type of the store to get the class for. |
required |
Returns:
Type | Description |
---|---|
Type[BaseZenStore] |
The class of the given store type or None if the type is unknown. |
Exceptions:
Type | Description |
---|---|
TypeError |
If the store type is unsupported. |
Source code in zenml/zen_stores/base_zen_store.py
@staticmethod
def get_store_class(store_type: StoreType) -> Type["BaseZenStore"]:
"""Returns the class of the given store type.
Args:
store_type: The type of the store to get the class for.
Returns:
The class of the given store type or None if the type is unknown.
Raises:
TypeError: If the store type is unsupported.
"""
if store_type == StoreType.SQL:
from zenml.zen_stores.sql_zen_store import SqlZenStore
return SqlZenStore
elif store_type == StoreType.REST:
from zenml.zen_stores.rest_zen_store import RestZenStore
return RestZenStore
else:
raise TypeError(
f"No store implementation found for store type "
f"`{store_type.value}`."
)
get_store_config_class(store_type)
staticmethod
Returns the store config class of the given store type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
store_type |
StoreType |
The type of the store to get the class for. |
required |
Returns:
Type | Description |
---|---|
Type[StoreConfiguration] |
The config class of the given store type. |
Source code in zenml/zen_stores/base_zen_store.py
@staticmethod
def get_store_config_class(
store_type: StoreType,
) -> Type["StoreConfiguration"]:
"""Returns the store config class of the given store type.
Args:
store_type: The type of the store to get the class for.
Returns:
The config class of the given store type.
"""
store_class = BaseZenStore.get_store_class(store_type)
return store_class.CONFIG_TYPE
get_store_info(self)
Get information about the store.
Returns:
Type | Description |
---|---|
ServerModel |
Information about the store. |
Source code in zenml/zen_stores/base_zen_store.py
def get_store_info(self) -> ServerModel:
"""Get information about the store.
Returns:
Information about the store.
"""
return ServerModel(
id=GlobalConfiguration().user_id,
version=zenml.__version__,
deployment_type=os.environ.get(
ENV_ZENML_SERVER_DEPLOYMENT_TYPE, ServerDeploymentType.OTHER
),
database_type=ServerDatabaseType.OTHER,
debug=IS_DEBUG_ENV,
secrets_store_type=self.secrets_store.type
if self.secrets_store
else SecretsStoreType.NONE,
)
get_store_type(url)
staticmethod
Returns the store type associated with a URL schema.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The store URL. |
required |
Returns:
Type | Description |
---|---|
StoreType |
The store type associated with the supplied URL schema. |
Exceptions:
Type | Description |
---|---|
TypeError |
If no store type was found to support the supplied URL. |
Source code in zenml/zen_stores/base_zen_store.py
@staticmethod
def get_store_type(url: str) -> StoreType:
"""Returns the store type associated with a URL schema.
Args:
url: The store URL.
Returns:
The store type associated with the supplied URL schema.
Raises:
TypeError: If no store type was found to support the supplied URL.
"""
from zenml.zen_stores.rest_zen_store import RestZenStoreConfiguration
from zenml.zen_stores.sql_zen_store import SqlZenStoreConfiguration
if SqlZenStoreConfiguration.supports_url_scheme(url):
return StoreType.SQL
elif RestZenStoreConfiguration.supports_url_scheme(url):
return StoreType.REST
else:
raise TypeError(f"No store implementation found for URL: {url}.")
is_local_store(self)
Check if the store is local or connected to a local ZenML server.
Returns:
Type | Description |
---|---|
bool |
True if the store is local, False otherwise. |
Source code in zenml/zen_stores/base_zen_store.py
def is_local_store(self) -> bool:
"""Check if the store is local or connected to a local ZenML server.
Returns:
True if the store is local, False otherwise.
"""
return self.get_store_info().is_local()
list_secrets(self, secret_filter_model)
List all secrets matching the given filter criteria.
Note that returned secrets do not include any secret values. To fetch
the secret values, use get_secret
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_filter_model |
SecretFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[SecretResponseModel] |
A list of all secrets matching the filter criteria, with pagination
information and sorted according to the filter criteria. The
returned secrets do not include any secret values, only metadata. To
fetch the secret values, use |
Source code in zenml/zen_stores/base_zen_store.py
@abstractmethod
def list_secrets(
self, secret_filter_model: SecretFilterModel
) -> Page[SecretResponseModel]:
"""List all secrets matching the given filter criteria.
Note that returned secrets do not include any secret values. To fetch
the secret values, use `get_secret`.
Args:
secret_filter_model: All filter parameters including pagination
params.
Returns:
A list of all secrets matching the filter criteria, with pagination
information and sorted according to the filter criteria. The
returned secrets do not include any secret values, only metadata. To
fetch the secret values, use `get_secret` individually with each
secret.
"""
register_event_handler(self, event, handler)
Register an external event handler.
The handler will be called when the store event is triggered.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
StoreEvent |
The event to register the handler for. |
required |
handler |
Callable[..., Any] |
The handler function to register. |
required |
Source code in zenml/zen_stores/base_zen_store.py
def register_event_handler(
self,
event: StoreEvent,
handler: Callable[..., Any],
) -> None:
"""Register an external event handler.
The handler will be called when the store event is triggered.
Args:
event: The event to register the handler for.
handler: The handler function to register.
"""
self._event_handlers.setdefault(event, []).append(handler)
track_event(self, event, metadata=None)
Track an analytics event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
AnalyticsEvent |
The event to track. |
required |
metadata |
Optional[Dict[str, Any]] |
Additional metadata to track with the event. |
None |
Source code in zenml/zen_stores/base_zen_store.py
def track_event(
self,
event: AnalyticsEvent,
metadata: Optional[Dict[str, Any]] = None,
) -> None:
"""Track an analytics event.
Args:
event: The event to track.
metadata: Additional metadata to track with the event.
"""
if self.track_analytics:
# Server information is always tracked, if available.
track_event(event, metadata)
update_secret(self, secret_id, secret_update)
Updates a secret.
Secret values that are specified as None
in the update that are
present in the existing secret are removed from the existing secret.
Values that are present in both secrets are overwritten. All other
values in both the existing secret and the update are kept (merged).
If the update includes a change of name or scope, the scoping rules enforced in the secrets store are used to validate the update:
- only one workspace-scoped secret with the given name can exist in the target workspace.
- only one user-scoped secret with the given name can exist in the target workspace for the target user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_id |
UUID |
The ID of the secret to be updated. |
required |
secret_update |
SecretUpdateModel |
The update to be applied. |
required |
Returns:
Type | Description |
---|---|
SecretResponseModel |
The updated secret. |
Exceptions:
Type | Description |
---|---|
KeyError |
if the secret doesn't exist. |
EntityExistsError |
If a secret with the same name already exists in the same scope. |
ValueError |
if the secret is invalid. |
Source code in zenml/zen_stores/base_zen_store.py
@abstractmethod
def update_secret(
self,
secret_id: UUID,
secret_update: SecretUpdateModel,
) -> SecretResponseModel:
"""Updates a secret.
Secret values that are specified as `None` in the update that are
present in the existing secret are removed from the existing secret.
Values that are present in both secrets are overwritten. All other
values in both the existing secret and the update are kept (merged).
If the update includes a change of name or scope, the scoping rules
enforced in the secrets store are used to validate the update:
- only one workspace-scoped secret with the given name can exist
in the target workspace.
- only one user-scoped secret with the given name can exist in the
target workspace for the target user.
Args:
secret_id: The ID of the secret to be updated.
secret_update: The update to be applied.
Returns:
The updated secret.
Raises:
KeyError: if the secret doesn't exist.
EntityExistsError: If a secret with the same name already exists in
the same scope.
ValueError: if the secret is invalid.
"""
validate_active_config(self, active_workspace_name_or_id=None, active_stack_id=None, config_name='')
Validate the active configuration.
Call this method to validate the supplied active workspace and active stack values.
This method is guaranteed to return valid workspace ID and stack ID values. If the supplied workspace and stack are not set or are not valid (e.g. they do not exist or are not accessible), the default workspace and default workspace stack will be returned in their stead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
active_workspace_name_or_id |
Union[str, uuid.UUID] |
The name or ID of the active workspace. |
None |
active_stack_id |
Optional[uuid.UUID] |
The ID of the active stack. |
None |
config_name |
str |
The name of the configuration to validate (used in the displayed logs/messages). |
'' |
Returns:
Type | Description |
---|---|
Tuple[zenml.models.workspace_models.WorkspaceResponseModel, zenml.models.stack_models.StackResponseModel] |
A tuple containing the active workspace and active stack. |
Source code in zenml/zen_stores/base_zen_store.py
def validate_active_config(
self,
active_workspace_name_or_id: Optional[Union[str, UUID]] = None,
active_stack_id: Optional[UUID] = None,
config_name: str = "",
) -> Tuple[WorkspaceResponseModel, StackResponseModel]:
"""Validate the active configuration.
Call this method to validate the supplied active workspace and active
stack values.
This method is guaranteed to return valid workspace ID and stack ID
values. If the supplied workspace and stack are not set or are not valid
(e.g. they do not exist or are not accessible), the default workspace and
default workspace stack will be returned in their stead.
Args:
active_workspace_name_or_id: The name or ID of the active workspace.
active_stack_id: The ID of the active stack.
config_name: The name of the configuration to validate (used in the
displayed logs/messages).
Returns:
A tuple containing the active workspace and active stack.
"""
active_workspace: WorkspaceResponseModel
if active_workspace_name_or_id:
try:
active_workspace = self.get_workspace(
active_workspace_name_or_id
)
except KeyError:
active_workspace = self._get_or_create_default_workspace()
logger.warning(
f"The current {config_name} active workspace is no longer "
f"available. Resetting the active workspace to "
f"'{active_workspace.name}'."
)
else:
active_workspace = self._get_or_create_default_workspace()
logger.info(
f"Setting the {config_name} active workspace "
f"to '{active_workspace.name}'."
)
active_stack: StackResponseModel
# Sanitize the active stack
if active_stack_id:
# Ensure that the active stack is still valid
try:
active_stack = self.get_stack(stack_id=active_stack_id)
except KeyError:
logger.warning(
"The current %s active stack is no longer available. "
"Resetting the active stack to default.",
config_name,
)
active_stack = self._get_or_create_default_stack(
active_workspace
)
else:
if active_stack.workspace.id != active_workspace.id:
logger.warning(
"The current %s active stack is not part of the active "
"workspace. Resetting the active stack to default.",
config_name,
)
active_stack = self._get_or_create_default_stack(
active_workspace
)
elif not active_stack.is_shared and (
not active_stack.user
or (active_stack.user.id != self.get_user().id)
):
logger.warning(
"The current %s active stack is not shared and not "
"owned by the active user. "
"Resetting the active stack to default.",
config_name,
)
active_stack = self._get_or_create_default_stack(
active_workspace
)
else:
logger.warning(
"Setting the %s active stack to default.",
config_name,
)
active_stack = self._get_or_create_default_stack(active_workspace)
return active_workspace, active_stack
enums
Zen Store enums.
StoreEvent (StrEnum)
Events that can be triggered by the store.
Source code in zenml/zen_stores/enums.py
class StoreEvent(StrEnum):
"""Events that can be triggered by the store."""
# Triggered just before deleting a workspace. The workspace ID is passed as
# a `workspace_id` UUID argument.
WORKSPACE_DELETED = "workspace_deleted"
# Triggered just before deleting a user. The user ID is passed as
# a `user_id` UUID argument.
USER_DELETED = "user_deleted"
migrations
special
Alembic database migration utilities.
alembic
Alembic utilities wrapper.
The Alembic class defined here acts as a wrapper around the Alembic library that automatically configures Alembic to use the ZenML SQL store database connection.
Alembic
Alembic environment and migration API.
This class provides a wrapper around the Alembic library that automatically configures Alembic to use the ZenML SQL store database connection.
Source code in zenml/zen_stores/migrations/alembic.py
class Alembic:
"""Alembic environment and migration API.
This class provides a wrapper around the Alembic library that automatically
configures Alembic to use the ZenML SQL store database connection.
"""
def __init__(
self,
engine: Engine,
metadata: MetaData = SQLModel.metadata,
context: Optional[EnvironmentContext] = None,
**kwargs: Any,
) -> None:
"""Initialize the Alembic wrapper.
Args:
engine: The SQLAlchemy engine to use.
metadata: The SQLAlchemy metadata to use.
context: The Alembic environment context to use. If not set, a new
context is created pointing to the ZenML migrations directory.
**kwargs: Additional keyword arguments to pass to the Alembic
environment context.
"""
self.engine = engine
self.metadata = metadata
self.context_kwargs = kwargs
self.config = Config()
self.config.set_main_option(
"script_location", str(Path(__file__).parent)
)
self.config.set_main_option(
"version_locations", str(Path(__file__).parent / "versions")
)
self.script_directory = ScriptDirectory.from_config(self.config)
if context is None:
self.environment_context = EnvironmentContext(
self.config, self.script_directory
)
else:
self.environment_context = context
def db_is_empty(self) -> bool:
"""Check if the database is empty.
Returns:
True if the database is empty, False otherwise.
"""
# Check the existence of any of the SQLModel tables
return not self.engine.dialect.has_table(
self.engine.connect(), schemas.StackSchema.__tablename__
)
def run_migrations(
self,
fn: Optional[Callable[[_RevIdType, MigrationContext], List[Any]]],
) -> None:
"""Run an online migration function in the current migration context.
Args:
fn: Migration function to run. If not set, the function configured
externally by the Alembic CLI command is used.
"""
fn_context_args: Dict[Any, Any] = {}
if fn is not None:
fn_context_args["fn"] = fn
with self.engine.connect() as connection:
self.environment_context.configure(
connection=connection,
target_metadata=self.metadata,
include_object=include_object,
compare_type=True,
render_as_batch=True,
**fn_context_args,
**self.context_kwargs,
)
with self.environment_context.begin_transaction():
self.environment_context.run_migrations()
def current_revisions(self) -> List[str]:
"""Get the current database revisions.
Returns:
List of head revisions.
"""
current_revisions: List[str] = []
def do_get_current_rev(rev: _RevIdType, context: Any) -> List[Any]:
nonlocal current_revisions
for r in self.script_directory.get_all_current(
rev # type:ignore [arg-type]
):
if r is None:
continue
current_revisions.append(r.revision)
return []
self.run_migrations(do_get_current_rev)
return current_revisions
def stamp(self, revision: str) -> None:
"""Stamp the revision table with the given revision without running any migrations.
Args:
revision: String revision target.
"""
def do_stamp(rev: _RevIdType, context: Any) -> List[Any]:
return self.script_directory._stamp_revs(revision, rev)
self.run_migrations(do_stamp)
def upgrade(self, revision: str = "heads") -> None:
"""Upgrade the database to a later version.
Args:
revision: String revision target.
"""
def do_upgrade(rev: _RevIdType, context: Any) -> List[Any]:
return self.script_directory._upgrade_revs(
revision, rev # type:ignore [arg-type]
)
self.run_migrations(do_upgrade)
def downgrade(self, revision: str) -> None:
"""Revert the database to a previous version.
Args:
revision: String revision target.
"""
def do_downgrade(rev: _RevIdType, context: Any) -> List[Any]:
return self.script_directory._downgrade_revs(
revision, rev # type:ignore [arg-type]
)
self.run_migrations(do_downgrade)
__init__(self, engine, metadata=MetaData(), context=None, **kwargs)
special
Initialize the Alembic wrapper.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
engine |
Engine |
The SQLAlchemy engine to use. |
required |
metadata |
MetaData |
The SQLAlchemy metadata to use. |
MetaData() |
context |
Optional[alembic.runtime.environment.EnvironmentContext] |
The Alembic environment context to use. If not set, a new context is created pointing to the ZenML migrations directory. |
None |
**kwargs |
Any |
Additional keyword arguments to pass to the Alembic environment context. |
{} |
Source code in zenml/zen_stores/migrations/alembic.py
def __init__(
self,
engine: Engine,
metadata: MetaData = SQLModel.metadata,
context: Optional[EnvironmentContext] = None,
**kwargs: Any,
) -> None:
"""Initialize the Alembic wrapper.
Args:
engine: The SQLAlchemy engine to use.
metadata: The SQLAlchemy metadata to use.
context: The Alembic environment context to use. If not set, a new
context is created pointing to the ZenML migrations directory.
**kwargs: Additional keyword arguments to pass to the Alembic
environment context.
"""
self.engine = engine
self.metadata = metadata
self.context_kwargs = kwargs
self.config = Config()
self.config.set_main_option(
"script_location", str(Path(__file__).parent)
)
self.config.set_main_option(
"version_locations", str(Path(__file__).parent / "versions")
)
self.script_directory = ScriptDirectory.from_config(self.config)
if context is None:
self.environment_context = EnvironmentContext(
self.config, self.script_directory
)
else:
self.environment_context = context
current_revisions(self)
Get the current database revisions.
Returns:
Type | Description |
---|---|
List[str] |
List of head revisions. |
Source code in zenml/zen_stores/migrations/alembic.py
def current_revisions(self) -> List[str]:
"""Get the current database revisions.
Returns:
List of head revisions.
"""
current_revisions: List[str] = []
def do_get_current_rev(rev: _RevIdType, context: Any) -> List[Any]:
nonlocal current_revisions
for r in self.script_directory.get_all_current(
rev # type:ignore [arg-type]
):
if r is None:
continue
current_revisions.append(r.revision)
return []
self.run_migrations(do_get_current_rev)
return current_revisions
db_is_empty(self)
Check if the database is empty.
Returns:
Type | Description |
---|---|
bool |
True if the database is empty, False otherwise. |
Source code in zenml/zen_stores/migrations/alembic.py
def db_is_empty(self) -> bool:
"""Check if the database is empty.
Returns:
True if the database is empty, False otherwise.
"""
# Check the existence of any of the SQLModel tables
return not self.engine.dialect.has_table(
self.engine.connect(), schemas.StackSchema.__tablename__
)
downgrade(self, revision)
Revert the database to a previous version.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
revision |
str |
String revision target. |
required |
Source code in zenml/zen_stores/migrations/alembic.py
def downgrade(self, revision: str) -> None:
"""Revert the database to a previous version.
Args:
revision: String revision target.
"""
def do_downgrade(rev: _RevIdType, context: Any) -> List[Any]:
return self.script_directory._downgrade_revs(
revision, rev # type:ignore [arg-type]
)
self.run_migrations(do_downgrade)
run_migrations(self, fn)
Run an online migration function in the current migration context.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn |
Optional[Callable[[Union[str, Sequence[str]], alembic.runtime.migration.MigrationContext], List[Any]]] |
Migration function to run. If not set, the function configured externally by the Alembic CLI command is used. |
required |
Source code in zenml/zen_stores/migrations/alembic.py
def run_migrations(
self,
fn: Optional[Callable[[_RevIdType, MigrationContext], List[Any]]],
) -> None:
"""Run an online migration function in the current migration context.
Args:
fn: Migration function to run. If not set, the function configured
externally by the Alembic CLI command is used.
"""
fn_context_args: Dict[Any, Any] = {}
if fn is not None:
fn_context_args["fn"] = fn
with self.engine.connect() as connection:
self.environment_context.configure(
connection=connection,
target_metadata=self.metadata,
include_object=include_object,
compare_type=True,
render_as_batch=True,
**fn_context_args,
**self.context_kwargs,
)
with self.environment_context.begin_transaction():
self.environment_context.run_migrations()
stamp(self, revision)
Stamp the revision table with the given revision without running any migrations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
revision |
str |
String revision target. |
required |
Source code in zenml/zen_stores/migrations/alembic.py
def stamp(self, revision: str) -> None:
"""Stamp the revision table with the given revision without running any migrations.
Args:
revision: String revision target.
"""
def do_stamp(rev: _RevIdType, context: Any) -> List[Any]:
return self.script_directory._stamp_revs(revision, rev)
self.run_migrations(do_stamp)
upgrade(self, revision='heads')
Upgrade the database to a later version.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
revision |
str |
String revision target. |
'heads' |
Source code in zenml/zen_stores/migrations/alembic.py
def upgrade(self, revision: str = "heads") -> None:
"""Upgrade the database to a later version.
Args:
revision: String revision target.
"""
def do_upgrade(rev: _RevIdType, context: Any) -> List[Any]:
return self.script_directory._upgrade_revs(
revision, rev # type:ignore [arg-type]
)
self.run_migrations(do_upgrade)
AlembicVersion (Base)
Alembic version table.
Source code in zenml/zen_stores/migrations/alembic.py
class AlembicVersion(Base): # type: ignore[valid-type,misc]
"""Alembic version table."""
__tablename__ = "alembic_version"
version_num = Column(String, nullable=False, primary_key=True)
include_object(object, name, type_, *args, **kwargs)
Function used to exclude tables from the migration scripts.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
object |
Any |
The schema item object to check. |
required |
name |
str |
The name of the object to check. |
required |
type_ |
str |
The type of the object to check. |
required |
*args |
Any |
Additional arguments. |
() |
**kwargs |
Any |
Additional keyword arguments. |
{} |
Returns:
Type | Description |
---|---|
bool |
True if the object should be included, False otherwise. |
Source code in zenml/zen_stores/migrations/alembic.py
def include_object(
object: Any, name: str, type_: str, *args: Any, **kwargs: Any
) -> bool:
"""Function used to exclude tables from the migration scripts.
Args:
object: The schema item object to check.
name: The name of the object to check.
type_: The type of the object to check.
*args: Additional arguments.
**kwargs: Additional keyword arguments.
Returns:
True if the object should be included, False otherwise.
"""
return not (type_ == "table" and name in exclude_tables)
rest_zen_store
REST Zen Store implementation.
RestZenStore (BaseZenStore)
pydantic-model
Store implementation for accessing data from a REST API.
Source code in zenml/zen_stores/rest_zen_store.py
class RestZenStore(BaseZenStore):
"""Store implementation for accessing data from a REST API."""
config: RestZenStoreConfiguration
TYPE: ClassVar[StoreType] = StoreType.REST
CONFIG_TYPE: ClassVar[Type[StoreConfiguration]] = RestZenStoreConfiguration
_api_token: Optional[str] = None
_session: Optional[requests.Session] = None
def _initialize_database(self) -> None:
"""Initialize the database."""
# don't do anything for a REST store
# ====================================
# ZenML Store interface implementation
# ====================================
# --------------------------------
# Initialization and configuration
# --------------------------------
def _initialize(self) -> None:
"""Initialize the REST store."""
client_version = zenml.__version__
server_version = self.get_store_info().version
if not DISABLE_CLIENT_SERVER_MISMATCH_WARNING and (
server_version != client_version
):
logger.warning(
"Your ZenML client version (%s) does not match the server "
"version (%s). This version mismatch might lead to errors or "
"unexpected behavior. \nTo disable this warning message, set "
"the environment variable `%s=True`",
client_version,
server_version,
ENV_ZENML_DISABLE_CLIENT_SERVER_MISMATCH_WARNING,
)
def get_store_info(self) -> ServerModel:
"""Get information about the server.
Returns:
Information about the server.
"""
body = self.get(INFO)
return ServerModel.parse_obj(body)
# ------
# Stacks
# ------
@track(AnalyticsEvent.REGISTERED_STACK)
def create_stack(self, stack: StackRequestModel) -> StackResponseModel:
"""Register a new stack.
Args:
stack: The stack to register.
Returns:
The registered stack.
"""
return self._create_workspace_scoped_resource(
resource=stack,
route=STACKS,
response_model=StackResponseModel,
)
def get_stack(self, stack_id: UUID) -> StackResponseModel:
"""Get a stack by its unique ID.
Args:
stack_id: The ID of the stack to get.
Returns:
The stack with the given ID.
"""
return self._get_resource(
resource_id=stack_id,
route=STACKS,
response_model=StackResponseModel,
)
def list_stacks(
self, stack_filter_model: StackFilterModel
) -> Page[StackResponseModel]:
"""List all stacks matching the given filter criteria.
Args:
stack_filter_model: All filter parameters including pagination
params.
Returns:
A list of all stacks matching the filter criteria.
"""
return self._list_paginated_resources(
route=STACKS,
response_model=StackResponseModel,
filter_model=stack_filter_model,
)
@track(AnalyticsEvent.UPDATED_STACK)
def update_stack(
self, stack_id: UUID, stack_update: StackUpdateModel
) -> StackResponseModel:
"""Update a stack.
Args:
stack_id: The ID of the stack update.
stack_update: The update request on the stack.
Returns:
The updated stack.
"""
return self._update_resource(
resource_id=stack_id,
resource_update=stack_update,
route=STACKS,
response_model=StackResponseModel,
)
@track(AnalyticsEvent.DELETED_STACK)
def delete_stack(self, stack_id: UUID) -> None:
"""Delete a stack.
Args:
stack_id: The ID of the stack to delete.
"""
self._delete_resource(
resource_id=stack_id,
route=STACKS,
)
# ----------------
# Stack components
# ----------------
@track(AnalyticsEvent.REGISTERED_STACK_COMPONENT)
def create_stack_component(
self,
component: ComponentRequestModel,
) -> ComponentResponseModel:
"""Create a stack component.
Args:
component: The stack component to create.
Returns:
The created stack component.
"""
return self._create_workspace_scoped_resource(
resource=component,
route=STACK_COMPONENTS,
response_model=ComponentResponseModel,
)
def get_stack_component(
self, component_id: UUID
) -> ComponentResponseModel:
"""Get a stack component by ID.
Args:
component_id: The ID of the stack component to get.
Returns:
The stack component.
"""
return self._get_resource(
resource_id=component_id,
route=STACK_COMPONENTS,
response_model=ComponentResponseModel,
)
def list_stack_components(
self, component_filter_model: ComponentFilterModel
) -> Page[ComponentResponseModel]:
"""List all stack components matching the given filter criteria.
Args:
component_filter_model: All filter parameters including pagination
params.
Returns:
A list of all stack components matching the filter criteria.
"""
return self._list_paginated_resources(
route=STACK_COMPONENTS,
response_model=ComponentResponseModel,
filter_model=component_filter_model,
)
@track(AnalyticsEvent.UPDATED_STACK_COMPONENT)
def update_stack_component(
self,
component_id: UUID,
component_update: ComponentUpdateModel,
) -> ComponentResponseModel:
"""Update an existing stack component.
Args:
component_id: The ID of the stack component to update.
component_update: The update to be applied to the stack component.
Returns:
The updated stack component.
"""
return self._update_resource(
resource_id=component_id,
resource_update=component_update,
route=STACK_COMPONENTS,
response_model=ComponentResponseModel,
)
@track(AnalyticsEvent.DELETED_STACK_COMPONENT)
def delete_stack_component(self, component_id: UUID) -> None:
"""Delete a stack component.
Args:
component_id: The ID of the stack component to delete.
"""
self._delete_resource(
resource_id=component_id,
route=STACK_COMPONENTS,
)
# -----------------------
# Stack component flavors
# -----------------------
@track(AnalyticsEvent.CREATED_FLAVOR)
def create_flavor(self, flavor: FlavorRequestModel) -> FlavorResponseModel:
"""Creates a new stack component flavor.
Args:
flavor: The stack component flavor to create.
Returns:
The newly created flavor.
"""
return self._create_resource(
resource=flavor,
route=FLAVORS,
response_model=FlavorResponseModel,
)
def update_flavor(
self, flavor_id: UUID, flavor_update: FlavorUpdateModel
) -> FlavorResponseModel:
"""Updates an existing user.
Args:
flavor_id: The id of the flavor to update.
flavor_update: The update to be applied to the flavor.
Returns:
The updated flavor.
"""
return self._update_resource(
resource_id=flavor_id,
resource_update=flavor_update,
route=FLAVORS,
response_model=FlavorResponseModel,
)
def get_flavor(self, flavor_id: UUID) -> FlavorResponseModel:
"""Get a stack component flavor by ID.
Args:
flavor_id: The ID of the stack component flavor to get.
Returns:
The stack component flavor.
"""
return self._get_resource(
resource_id=flavor_id,
route=FLAVORS,
response_model=FlavorResponseModel,
)
def list_flavors(
self, flavor_filter_model: FlavorFilterModel
) -> Page[FlavorResponseModel]:
"""List all stack component flavors matching the given filter criteria.
Args:
flavor_filter_model: All filter parameters including pagination
params
Returns:
List of all the stack component flavors matching the given criteria.
"""
return self._list_paginated_resources(
route=FLAVORS,
response_model=FlavorResponseModel,
filter_model=flavor_filter_model,
)
@track(AnalyticsEvent.DELETED_FLAVOR)
def delete_flavor(self, flavor_id: UUID) -> None:
"""Delete a stack component flavor.
Args:
flavor_id: The ID of the stack component flavor to delete.
"""
self._delete_resource(
resource_id=flavor_id,
route=FLAVORS,
)
# -----
# Users
# -----
@track(AnalyticsEvent.CREATED_USER)
def create_user(self, user: UserRequestModel) -> UserResponseModel:
"""Creates a new user.
Args:
user: User to be created.
Returns:
The newly created user.
"""
return self._create_resource(
resource=user,
route=USERS + "?assign_default_role=False",
response_model=UserResponseModel,
)
def get_user(
self,
user_name_or_id: Optional[Union[str, UUID]] = None,
include_private: bool = False,
) -> UserResponseModel:
"""Gets a specific user, when no id is specified the active user is returned.
The `include_private` parameter is ignored here as it is handled
implicitly by the /current-user endpoint that is queried when no
user_name_or_id is set. Raises a KeyError in case a user with that id
does not exist.
Args:
user_name_or_id: The name or ID of the user to get.
include_private: Whether to include private user information
Returns:
The requested user, if it was found.
"""
if user_name_or_id:
return self._get_resource(
resource_id=user_name_or_id,
route=USERS,
response_model=UserResponseModel,
)
else:
body = self.get(CURRENT_USER)
return UserResponseModel.parse_obj(body)
def list_users(
self, user_filter_model: UserFilterModel
) -> Page[UserResponseModel]:
"""List all users.
Args:
user_filter_model: All filter parameters including pagination
params.
Returns:
A list of all users.
"""
return self._list_paginated_resources(
route=USERS,
response_model=UserResponseModel,
filter_model=user_filter_model,
)
@track(AnalyticsEvent.UPDATED_USER)
def update_user(
self, user_id: UUID, user_update: UserUpdateModel
) -> UserResponseModel:
"""Updates an existing user.
Args:
user_id: The id of the user to update.
user_update: The update to be applied to the user.
Returns:
The updated user.
"""
return self._update_resource(
resource_id=user_id,
resource_update=user_update,
route=USERS,
response_model=UserResponseModel,
)
@track(AnalyticsEvent.DELETED_USER)
def delete_user(self, user_name_or_id: Union[str, UUID]) -> None:
"""Deletes a user.
Args:
user_name_or_id: The name or ID of the user to delete.
"""
self._delete_resource(
resource_id=user_name_or_id,
route=USERS,
)
# -----
# Teams
# -----
@track(AnalyticsEvent.CREATED_TEAM)
def create_team(self, team: TeamRequestModel) -> TeamResponseModel:
"""Creates a new team.
Args:
team: The team model to create.
Returns:
The newly created team.
"""
return self._create_resource(
resource=team,
route=TEAMS,
response_model=TeamResponseModel,
)
def get_team(self, team_name_or_id: Union[str, UUID]) -> TeamResponseModel:
"""Gets a specific team.
Args:
team_name_or_id: Name or ID of the team to get.
Returns:
The requested team.
"""
return self._get_resource(
resource_id=team_name_or_id,
route=TEAMS,
response_model=TeamResponseModel,
)
def list_teams(
self, team_filter_model: TeamFilterModel
) -> Page[TeamResponseModel]:
"""List all teams matching the given filter criteria.
Args:
team_filter_model: All filter parameters including pagination
params.
Returns:
A list of all teams matching the filter criteria.
"""
return self._list_paginated_resources(
route=TEAMS,
response_model=TeamResponseModel,
filter_model=team_filter_model,
)
@track(AnalyticsEvent.UPDATED_TEAM)
def update_team(
self, team_id: UUID, team_update: TeamUpdateModel
) -> TeamResponseModel:
"""Update an existing team.
Args:
team_id: The ID of the team to be updated.
team_update: The update to be applied to the team.
Returns:
The updated team.
"""
return self._update_resource(
resource_id=team_id,
resource_update=team_update,
route=TEAMS,
response_model=TeamResponseModel,
)
@track(AnalyticsEvent.DELETED_TEAM)
def delete_team(self, team_name_or_id: Union[str, UUID]) -> None:
"""Deletes a team.
Args:
team_name_or_id: Name or ID of the team to delete.
"""
self._delete_resource(
resource_id=team_name_or_id,
route=TEAMS,
)
# -----
# Roles
# -----
@track(AnalyticsEvent.CREATED_ROLE)
def create_role(self, role: RoleRequestModel) -> RoleResponseModel:
"""Creates a new role.
Args:
role: The role model to create.
Returns:
The newly created role.
"""
return self._create_resource(
resource=role,
route=ROLES,
response_model=RoleResponseModel,
)
def get_role(self, role_name_or_id: Union[str, UUID]) -> RoleResponseModel:
"""Gets a specific role.
Args:
role_name_or_id: Name or ID of the role to get.
Returns:
The requested role.
"""
return self._get_resource(
resource_id=role_name_or_id,
route=ROLES,
response_model=RoleResponseModel,
)
def list_roles(
self, role_filter_model: RoleFilterModel
) -> Page[RoleResponseModel]:
"""List all roles matching the given filter criteria.
Args:
role_filter_model: All filter parameters including pagination
params.
Returns:
A list of all roles matching the filter criteria.
"""
return self._list_paginated_resources(
route=ROLES,
response_model=RoleResponseModel,
filter_model=role_filter_model,
)
@track(AnalyticsEvent.UPDATED_ROLE)
def update_role(
self, role_id: UUID, role_update: RoleUpdateModel
) -> RoleResponseModel:
"""Update an existing role.
Args:
role_id: The ID of the role to be updated.
role_update: The update to be applied to the role.
Returns:
The updated role.
"""
return self._update_resource(
resource_id=role_id,
resource_update=role_update,
route=ROLES,
response_model=RoleResponseModel,
)
@track(AnalyticsEvent.DELETED_ROLE)
def delete_role(self, role_name_or_id: Union[str, UUID]) -> None:
"""Deletes a role.
Args:
role_name_or_id: Name or ID of the role to delete.
"""
self._delete_resource(
resource_id=role_name_or_id,
route=ROLES,
)
# ----------------
# Role assignments
# ----------------
def list_user_role_assignments(
self, user_role_assignment_filter_model: UserRoleAssignmentFilterModel
) -> Page[UserRoleAssignmentResponseModel]:
"""List all roles assignments matching the given filter criteria.
Args:
user_role_assignment_filter_model: All filter parameters including
pagination params.
Returns:
A list of all roles assignments matching the filter criteria.
"""
return self._list_paginated_resources(
route=USER_ROLE_ASSIGNMENTS,
response_model=UserRoleAssignmentResponseModel,
filter_model=user_role_assignment_filter_model,
)
def get_user_role_assignment(
self, user_role_assignment_id: UUID
) -> UserRoleAssignmentResponseModel:
"""Get an existing role assignment by name or ID.
Args:
user_role_assignment_id: Name or ID of the role assignment to get.
Returns:
The requested workspace.
"""
return self._get_resource(
resource_id=user_role_assignment_id,
route=USER_ROLE_ASSIGNMENTS,
response_model=UserRoleAssignmentResponseModel,
)
def delete_user_role_assignment(
self, user_role_assignment_id: UUID
) -> None:
"""Delete a specific role assignment.
Args:
user_role_assignment_id: The ID of the specific role assignment
"""
self._delete_resource(
resource_id=user_role_assignment_id,
route=USER_ROLE_ASSIGNMENTS,
)
def create_user_role_assignment(
self, user_role_assignment: UserRoleAssignmentRequestModel
) -> UserRoleAssignmentResponseModel:
"""Creates a new role assignment.
Args:
user_role_assignment: The role assignment to create.
Returns:
The newly created workspace.
"""
return self._create_resource(
resource=user_role_assignment,
route=USER_ROLE_ASSIGNMENTS,
response_model=UserRoleAssignmentResponseModel,
)
# ---------------------
# Team Role assignments
# ---------------------
def create_team_role_assignment(
self, team_role_assignment: TeamRoleAssignmentRequestModel
) -> TeamRoleAssignmentResponseModel:
"""Creates a new team role assignment.
Args:
team_role_assignment: The role assignment model to create.
Returns:
The newly created role assignment.
"""
return self._create_resource(
resource=team_role_assignment,
route=TEAM_ROLE_ASSIGNMENTS,
response_model=TeamRoleAssignmentResponseModel,
)
def get_team_role_assignment(
self, team_role_assignment_id: UUID
) -> TeamRoleAssignmentResponseModel:
"""Gets a specific role assignment.
Args:
team_role_assignment_id: ID of the role assignment to get.
Returns:
The requested role assignment.
"""
return self._get_resource(
resource_id=team_role_assignment_id,
route=TEAM_ROLE_ASSIGNMENTS,
response_model=TeamRoleAssignmentResponseModel,
)
def delete_team_role_assignment(
self, team_role_assignment_id: UUID
) -> None:
"""Delete a specific role assignment.
Args:
team_role_assignment_id: The ID of the specific role assignment
"""
self._delete_resource(
resource_id=team_role_assignment_id,
route=TEAM_ROLE_ASSIGNMENTS,
)
def list_team_role_assignments(
self, team_role_assignment_filter_model: TeamRoleAssignmentFilterModel
) -> Page[TeamRoleAssignmentResponseModel]:
"""List all roles assignments matching the given filter criteria.
Args:
team_role_assignment_filter_model: All filter parameters including
pagination params.
Returns:
A list of all roles assignments matching the filter criteria.
"""
return self._list_paginated_resources(
route=TEAM_ROLE_ASSIGNMENTS,
response_model=TeamRoleAssignmentResponseModel,
filter_model=team_role_assignment_filter_model,
)
# --------
# Workspaces
# --------
@track(AnalyticsEvent.CREATED_WORKSPACE)
def create_workspace(
self, workspace: WorkspaceRequestModel
) -> WorkspaceResponseModel:
"""Creates a new workspace.
Args:
workspace: The workspace to create.
Returns:
The newly created workspace.
"""
return self._create_resource(
resource=workspace,
route=WORKSPACES,
response_model=WorkspaceResponseModel,
)
def get_workspace(
self, workspace_name_or_id: Union[UUID, str]
) -> WorkspaceResponseModel:
"""Get an existing workspace by name or ID.
Args:
workspace_name_or_id: Name or ID of the workspace to get.
Returns:
The requested workspace.
"""
return self._get_resource(
resource_id=workspace_name_or_id,
route=WORKSPACES,
response_model=WorkspaceResponseModel,
)
def list_workspaces(
self, workspace_filter_model: WorkspaceFilterModel
) -> Page[WorkspaceResponseModel]:
"""List all workspace matching the given filter criteria.
Args:
workspace_filter_model: All filter parameters including pagination
params.
Returns:
A list of all workspace matching the filter criteria.
"""
return self._list_paginated_resources(
route=WORKSPACES,
response_model=WorkspaceResponseModel,
filter_model=workspace_filter_model,
)
@track(AnalyticsEvent.UPDATED_WORKSPACE)
def update_workspace(
self, workspace_id: UUID, workspace_update: WorkspaceUpdateModel
) -> WorkspaceResponseModel:
"""Update an existing workspace.
Args:
workspace_id: The ID of the workspace to be updated.
workspace_update: The update to be applied to the workspace.
Returns:
The updated workspace.
"""
return self._update_resource(
resource_id=workspace_id,
resource_update=workspace_update,
route=WORKSPACES,
response_model=WorkspaceResponseModel,
)
@track(AnalyticsEvent.DELETED_WORKSPACE)
def delete_workspace(self, workspace_name_or_id: Union[str, UUID]) -> None:
"""Deletes a workspace.
Args:
workspace_name_or_id: Name or ID of the workspace to delete.
"""
self._delete_resource(
resource_id=workspace_name_or_id,
route=WORKSPACES,
)
# ---------
# Pipelines
# ---------
@track(AnalyticsEvent.CREATE_PIPELINE)
def create_pipeline(
self, pipeline: PipelineRequestModel
) -> PipelineResponseModel:
"""Creates a new pipeline in a workspace.
Args:
pipeline: The pipeline to create.
Returns:
The newly created pipeline.
"""
return self._create_workspace_scoped_resource(
resource=pipeline,
route=PIPELINES,
response_model=PipelineResponseModel,
)
def get_pipeline(self, pipeline_id: UUID) -> PipelineResponseModel:
"""Get a pipeline with a given ID.
Args:
pipeline_id: ID of the pipeline.
Returns:
The pipeline.
"""
return self._get_resource(
resource_id=pipeline_id,
route=PIPELINES,
response_model=PipelineResponseModel,
)
def list_pipelines(
self, pipeline_filter_model: PipelineFilterModel
) -> Page[PipelineResponseModel]:
"""List all pipelines matching the given filter criteria.
Args:
pipeline_filter_model: All filter parameters including pagination
params.
Returns:
A list of all pipelines matching the filter criteria.
"""
return self._list_paginated_resources(
route=PIPELINES,
response_model=PipelineResponseModel,
filter_model=pipeline_filter_model,
)
@track(AnalyticsEvent.UPDATE_PIPELINE)
def update_pipeline(
self, pipeline_id: UUID, pipeline_update: PipelineUpdateModel
) -> PipelineResponseModel:
"""Updates a pipeline.
Args:
pipeline_id: The ID of the pipeline to be updated.
pipeline_update: The update to be applied.
Returns:
The updated pipeline.
"""
return self._update_resource(
resource_id=pipeline_id,
resource_update=pipeline_update,
route=PIPELINES,
response_model=PipelineResponseModel,
)
@track(AnalyticsEvent.DELETE_PIPELINE)
def delete_pipeline(self, pipeline_id: UUID) -> None:
"""Deletes a pipeline.
Args:
pipeline_id: The ID of the pipeline to delete.
"""
self._delete_resource(
resource_id=pipeline_id,
route=PIPELINES,
)
# ---------
# Builds
# ---------
def create_build(
self,
build: PipelineBuildRequestModel,
) -> PipelineBuildResponseModel:
"""Creates a new build in a workspace.
Args:
build: The build to create.
Returns:
The newly created build.
"""
return self._create_workspace_scoped_resource(
resource=build,
route=PIPELINE_BUILDS,
response_model=PipelineBuildResponseModel,
)
def get_build(self, build_id: UUID) -> PipelineBuildResponseModel:
"""Get a build with a given ID.
Args:
build_id: ID of the build.
Returns:
The build.
"""
return self._get_resource(
resource_id=build_id,
route=PIPELINE_BUILDS,
response_model=PipelineBuildResponseModel,
)
def list_builds(
self, build_filter_model: PipelineBuildFilterModel
) -> Page[PipelineBuildResponseModel]:
"""List all builds matching the given filter criteria.
Args:
build_filter_model: All filter parameters including pagination
params.
Returns:
A page of all builds matching the filter criteria.
"""
return self._list_paginated_resources(
route=PIPELINE_BUILDS,
response_model=PipelineBuildResponseModel,
filter_model=build_filter_model,
)
def delete_build(self, build_id: UUID) -> None:
"""Deletes a build.
Args:
build_id: The ID of the build to delete.
"""
self._delete_resource(
resource_id=build_id,
route=PIPELINE_BUILDS,
)
# ----------------------
# Pipeline Deployments
# ----------------------
def create_deployment(
self,
deployment: PipelineDeploymentRequestModel,
) -> PipelineDeploymentResponseModel:
"""Creates a new deployment in a workspace.
Args:
deployment: The deployment to create.
Returns:
The newly created deployment.
"""
return self._create_workspace_scoped_resource(
resource=deployment,
route=PIPELINE_DEPLOYMENTS,
response_model=PipelineDeploymentResponseModel,
)
def get_deployment(
self, deployment_id: UUID
) -> PipelineDeploymentResponseModel:
"""Get a deployment with a given ID.
Args:
deployment_id: ID of the deployment.
Returns:
The deployment.
"""
return self._get_resource(
resource_id=deployment_id,
route=PIPELINE_DEPLOYMENTS,
response_model=PipelineDeploymentResponseModel,
)
def list_deployments(
self, deployment_filter_model: PipelineDeploymentFilterModel
) -> Page[PipelineDeploymentResponseModel]:
"""List all deployments matching the given filter criteria.
Args:
deployment_filter_model: All filter parameters including pagination
params.
Returns:
A page of all deployments matching the filter criteria.
"""
return self._list_paginated_resources(
route=PIPELINE_DEPLOYMENTS,
response_model=PipelineDeploymentResponseModel,
filter_model=deployment_filter_model,
)
def delete_deployment(self, deployment_id: UUID) -> None:
"""Deletes a deployment.
Args:
deployment_id: The ID of the deployment to delete.
"""
self._delete_resource(
resource_id=deployment_id,
route=PIPELINE_DEPLOYMENTS,
)
# ---------
# Schedules
# ---------
def create_schedule(
self, schedule: ScheduleRequestModel
) -> ScheduleResponseModel:
"""Creates a new schedule.
Args:
schedule: The schedule to create.
Returns:
The newly created schedule.
"""
return self._create_workspace_scoped_resource(
resource=schedule,
route=SCHEDULES,
response_model=ScheduleResponseModel,
)
def get_schedule(self, schedule_id: UUID) -> ScheduleResponseModel:
"""Get a schedule with a given ID.
Args:
schedule_id: ID of the schedule.
Returns:
The schedule.
"""
return self._get_resource(
resource_id=schedule_id,
route=SCHEDULES,
response_model=ScheduleResponseModel,
)
def list_schedules(
self, schedule_filter_model: ScheduleFilterModel
) -> Page[ScheduleResponseModel]:
"""List all schedules in the workspace.
Args:
schedule_filter_model: All filter parameters including pagination
params
Returns:
A list of schedules.
"""
return self._list_paginated_resources(
route=SCHEDULES,
response_model=ScheduleResponseModel,
filter_model=schedule_filter_model,
)
def update_schedule(
self,
schedule_id: UUID,
schedule_update: ScheduleUpdateModel,
) -> ScheduleResponseModel:
"""Updates a schedule.
Args:
schedule_id: The ID of the schedule to be updated.
schedule_update: The update to be applied.
Returns:
The updated schedule.
"""
return self._update_resource(
resource_id=schedule_id,
resource_update=schedule_update,
route=SCHEDULES,
response_model=ScheduleResponseModel,
)
def delete_schedule(self, schedule_id: UUID) -> None:
"""Deletes a schedule.
Args:
schedule_id: The ID of the schedule to delete.
"""
self._delete_resource(
resource_id=schedule_id,
route=SCHEDULES,
)
# --------------
# Pipeline runs
# --------------
def create_run(
self, pipeline_run: PipelineRunRequestModel
) -> PipelineRunResponseModel:
"""Creates a pipeline run.
Args:
pipeline_run: The pipeline run to create.
Returns:
The created pipeline run.
"""
return self._create_workspace_scoped_resource(
resource=pipeline_run,
response_model=PipelineRunResponseModel,
route=RUNS,
)
def get_run(
self, run_name_or_id: Union[UUID, str]
) -> PipelineRunResponseModel:
"""Gets a pipeline run.
Args:
run_name_or_id: The name or ID of the pipeline run to get.
Returns:
The pipeline run.
"""
return self._get_resource(
resource_id=run_name_or_id,
route=RUNS,
response_model=PipelineRunResponseModel,
)
def get_or_create_run(
self, pipeline_run: PipelineRunRequestModel
) -> Tuple[PipelineRunResponseModel, bool]:
"""Gets or creates a pipeline run.
If a run with the same ID or name already exists, it is returned.
Otherwise, a new run is created.
Args:
pipeline_run: The pipeline run to get or create.
Returns:
The pipeline run, and a boolean indicating whether the run was
created or not.
"""
return self._get_or_create_workspace_scoped_resource(
resource=pipeline_run,
route=RUNS,
response_model=PipelineRunResponseModel,
)
def list_runs(
self, runs_filter_model: PipelineRunFilterModel
) -> Page[PipelineRunResponseModel]:
"""List all pipeline runs matching the given filter criteria.
Args:
runs_filter_model: All filter parameters including pagination
params.
Returns:
A list of all pipeline runs matching the filter criteria.
"""
return self._list_paginated_resources(
route=RUNS,
response_model=PipelineRunResponseModel,
filter_model=runs_filter_model,
)
def update_run(
self, run_id: UUID, run_update: PipelineRunUpdateModel
) -> PipelineRunResponseModel:
"""Updates a pipeline run.
Args:
run_id: The ID of the pipeline run to update.
run_update: The update to be applied to the pipeline run.
Returns:
The updated pipeline run.
"""
return self._update_resource(
resource_id=run_id,
resource_update=run_update,
response_model=PipelineRunResponseModel,
route=RUNS,
)
def delete_run(self, run_id: UUID) -> None:
"""Deletes a pipeline run.
Args:
run_id: The ID of the pipeline run to delete.
"""
self._delete_resource(
resource_id=run_id,
route=RUNS,
)
# ------------------
# Pipeline run steps
# ------------------
def create_run_step(
self, step_run: StepRunRequestModel
) -> StepRunResponseModel:
"""Creates a step run.
Args:
step_run: The step run to create.
Returns:
The created step run.
"""
return self._create_resource(
resource=step_run,
response_model=StepRunResponseModel,
route=STEPS,
)
def get_run_step(self, step_run_id: UUID) -> StepRunResponseModel:
"""Get a step run by ID.
Args:
step_run_id: The ID of the step run to get.
Returns:
The step run.
"""
return self._get_resource(
resource_id=step_run_id,
route=STEPS,
response_model=StepRunResponseModel,
)
def list_run_steps(
self, step_run_filter_model: StepRunFilterModel
) -> Page[StepRunResponseModel]:
"""List all step runs matching the given filter criteria.
Args:
step_run_filter_model: All filter parameters including pagination
params.
Returns:
A list of all step runs matching the filter criteria.
"""
return self._list_paginated_resources(
route=STEPS,
response_model=StepRunResponseModel,
filter_model=step_run_filter_model,
)
def update_run_step(
self,
step_run_id: UUID,
step_run_update: StepRunUpdateModel,
) -> StepRunResponseModel:
"""Updates a step run.
Args:
step_run_id: The ID of the step to update.
step_run_update: The update to be applied to the step.
Returns:
The updated step run.
"""
return self._update_resource(
resource_id=step_run_id,
resource_update=step_run_update,
response_model=StepRunResponseModel,
route=STEPS,
)
# ---------
# Artifacts
# ---------
def create_artifact(
self, artifact: ArtifactRequestModel
) -> ArtifactResponseModel:
"""Creates an artifact.
Args:
artifact: The artifact to create.
Returns:
The created artifact.
"""
return self._create_resource(
resource=artifact,
response_model=ArtifactResponseModel,
route=ARTIFACTS,
)
def get_artifact(self, artifact_id: UUID) -> ArtifactResponseModel:
"""Gets an artifact.
Args:
artifact_id: The ID of the artifact to get.
Returns:
The artifact.
"""
return self._get_resource(
resource_id=artifact_id,
route=ARTIFACTS,
response_model=ArtifactResponseModel,
)
def list_artifacts(
self, artifact_filter_model: ArtifactFilterModel
) -> Page[ArtifactResponseModel]:
"""List all artifacts matching the given filter criteria.
Args:
artifact_filter_model: All filter parameters including pagination
params.
Returns:
A list of all artifacts matching the filter criteria.
"""
return self._list_paginated_resources(
route=ARTIFACTS,
response_model=ArtifactResponseModel,
filter_model=artifact_filter_model,
)
def delete_artifact(self, artifact_id: UUID) -> None:
"""Deletes an artifact.
Args:
artifact_id: The ID of the artifact to delete.
"""
self._delete_resource(resource_id=artifact_id, route=ARTIFACTS)
# ------------
# Run Metadata
# ------------
def create_run_metadata(
self, run_metadata: RunMetadataRequestModel
) -> RunMetadataResponseModel:
"""Creates run metadata.
Args:
run_metadata: The run metadata to create.
Returns:
The created run metadata.
"""
return self._create_workspace_scoped_resource(
resource=run_metadata,
response_model=RunMetadataResponseModel,
route=RUN_METADATA,
)
def list_run_metadata(
self,
run_metadata_filter_model: RunMetadataFilterModel,
) -> Page[RunMetadataResponseModel]:
"""List run metadata.
Args:
run_metadata_filter_model: All filter parameters including
pagination params.
Returns:
The run metadata.
"""
return self._list_paginated_resources(
route=RUN_METADATA,
response_model=RunMetadataResponseModel,
filter_model=run_metadata_filter_model,
)
# -----------------
# Code Repositories
# -----------------
def create_code_repository(
self, code_repository: CodeRepositoryRequestModel
) -> CodeRepositoryResponseModel:
"""Creates a new code repository.
Args:
code_repository: Code repository to be created.
Returns:
The newly created code repository.
"""
return self._create_workspace_scoped_resource(
resource=code_repository,
response_model=CodeRepositoryResponseModel,
route=CODE_REPOSITORIES,
)
def get_code_repository(
self, code_repository_id: UUID
) -> CodeRepositoryResponseModel:
"""Gets a specific code repository.
Args:
code_repository_id: The ID of the code repository to get.
Returns:
The requested code repository, if it was found.
"""
return self._get_resource(
resource_id=code_repository_id,
route=CODE_REPOSITORIES,
response_model=CodeRepositoryResponseModel,
)
def list_code_repositories(
self, filter_model: CodeRepositoryFilterModel
) -> Page[CodeRepositoryResponseModel]:
"""List all code repositories.
Args:
filter_model: All filter parameters including pagination
params.
Returns:
A page of all code repositories.
"""
return self._list_paginated_resources(
route=CODE_REPOSITORIES,
response_model=CodeRepositoryResponseModel,
filter_model=filter_model,
)
def update_code_repository(
self, code_repository_id: UUID, update: CodeRepositoryUpdateModel
) -> CodeRepositoryResponseModel:
"""Updates an existing code repository.
Args:
code_repository_id: The ID of the code repository to update.
update: The update to be applied to the code repository.
Returns:
The updated code repository.
"""
return self._update_resource(
resource_id=code_repository_id,
resource_update=update,
response_model=CodeRepositoryResponseModel,
route=CODE_REPOSITORIES,
)
def delete_code_repository(self, code_repository_id: UUID) -> None:
"""Deletes a code repository.
Args:
code_repository_id: The ID of the code repository to delete.
"""
self._delete_resource(
resource_id=code_repository_id, route=CODE_REPOSITORIES
)
# ------------------
# Service Connectors
# ------------------
def _populate_connector_type(
self,
*connector_models: Union[
ServiceConnectorResponseModel, ServiceConnectorResourcesModel
],
) -> None:
"""Populates or updates the connector type of the given connector or resource models.
If the connector type is not locally available, the connector type
field is left as is. The local and remote flags of the connector type
are updated accordingly.
Args:
connector_models: The service connector or resource models to
populate.
"""
for service_connector in connector_models:
# Mark the remote connector type as being only remotely available
if not isinstance(service_connector.connector_type, str):
service_connector.connector_type.local = False
service_connector.connector_type.remote = True
if not service_connector_registry.is_registered(
service_connector.type
):
continue
connector_type = (
service_connector_registry.get_service_connector_type(
service_connector.type
)
)
connector_type.local = True
if not isinstance(service_connector.connector_type, str):
connector_type.remote = True
service_connector.connector_type = connector_type
def create_service_connector(
self, service_connector: ServiceConnectorRequestModel
) -> ServiceConnectorResponseModel:
"""Creates a new service connector.
Args:
service_connector: Service connector to be created.
Returns:
The newly created service connector.
"""
connector_model = self._create_workspace_scoped_resource(
resource=service_connector,
route=SERVICE_CONNECTORS,
response_model=ServiceConnectorResponseModel,
)
self._populate_connector_type(connector_model)
return connector_model
def get_service_connector(
self, service_connector_id: UUID
) -> ServiceConnectorResponseModel:
"""Gets a specific service connector.
Args:
service_connector_id: The ID of the service connector to get.
Returns:
The requested service connector, if it was found.
"""
connector_model = self._get_resource(
resource_id=service_connector_id,
route=SERVICE_CONNECTORS,
response_model=ServiceConnectorResponseModel,
params={"expand_secrets": False},
)
self._populate_connector_type(connector_model)
return connector_model
def list_service_connectors(
self, filter_model: ServiceConnectorFilterModel
) -> Page[ServiceConnectorResponseModel]:
"""List all service connectors.
Args:
filter_model: All filter parameters including pagination
params.
Returns:
A page of all service connectors.
"""
connector_models = self._list_paginated_resources(
route=SERVICE_CONNECTORS,
response_model=ServiceConnectorResponseModel,
filter_model=filter_model,
params={"expand_secrets": False},
)
self._populate_connector_type(*connector_models.items)
return connector_models
def update_service_connector(
self, service_connector_id: UUID, update: ServiceConnectorUpdateModel
) -> ServiceConnectorResponseModel:
"""Updates an existing service connector.
The update model contains the fields to be updated. If a field value is
set to None in the model, the field is not updated, but there are
special rules concerning some fields:
* the `configuration` and `secrets` fields together represent a full
valid configuration update, not just a partial update. If either is
set (i.e. not None) in the update, their values are merged together and
will replace the existing configuration and secrets values.
* the `resource_id` field value is also a full replacement value: if set
to `None`, the resource ID is removed from the service connector.
* the `expiration_seconds` field value is also a full replacement value:
if set to `None`, the expiration is removed from the service connector.
* the `secret_id` field value in the update is ignored, given that
secrets are managed internally by the ZenML store.
* the `labels` field is also a full labels update: if set (i.e. not
`None`), all existing labels are removed and replaced by the new labels
in the update.
Args:
service_connector_id: The ID of the service connector to update.
update: The update to be applied to the service connector.
Returns:
The updated service connector.
"""
connector_model = self._update_resource(
resource_id=service_connector_id,
resource_update=update,
response_model=ServiceConnectorResponseModel,
route=SERVICE_CONNECTORS,
)
self._populate_connector_type(connector_model)
return connector_model
def delete_service_connector(self, service_connector_id: UUID) -> None:
"""Deletes a service connector.
Args:
service_connector_id: The ID of the service connector to delete.
"""
self._delete_resource(
resource_id=service_connector_id, route=SERVICE_CONNECTORS
)
def verify_service_connector_config(
self,
service_connector: ServiceConnectorRequestModel,
list_resources: bool = True,
) -> ServiceConnectorResourcesModel:
"""Verifies if a service connector configuration has access to resources.
Args:
service_connector: The service connector configuration to verify.
list_resources: If True, the list of all resources accessible
through the service connector and matching the supplied resource
type and ID are returned.
Returns:
The list of resources that the service connector configuration has
access to.
"""
response_body = self.post(
f"{SERVICE_CONNECTORS}{SERVICE_CONNECTOR_VERIFY}",
body=service_connector,
params={"list_resources": list_resources},
)
resources = ServiceConnectorResourcesModel.parse_obj(response_body)
self._populate_connector_type(resources)
return resources
def verify_service_connector(
self,
service_connector_id: UUID,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
list_resources: bool = True,
) -> ServiceConnectorResourcesModel:
"""Verifies if a service connector instance has access to one or more resources.
Args:
service_connector_id: The ID of the service connector to verify.
resource_type: The type of resource to verify access to.
resource_id: The ID of the resource to verify access to.
list_resources: If True, the list of all resources accessible
through the service connector and matching the supplied resource
type and ID are returned.
Returns:
The list of resources that the service connector has access to,
scoped to the supplied resource type and ID, if provided.
"""
params: Dict[str, Any] = {"list_resources": list_resources}
if resource_type:
params["resource_type"] = resource_type
if resource_id:
params["resource_id"] = resource_id
response_body = self.put(
f"{SERVICE_CONNECTORS}/{str(service_connector_id)}{SERVICE_CONNECTOR_VERIFY}",
params=params,
)
resources = ServiceConnectorResourcesModel.parse_obj(response_body)
self._populate_connector_type(resources)
return resources
def get_service_connector_client(
self,
service_connector_id: UUID,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
) -> ServiceConnectorResponseModel:
"""Get a service connector client for a service connector and given resource.
Args:
service_connector_id: The ID of the base service connector to use.
resource_type: The type of resource to get a client for.
resource_id: The ID of the resource to get a client for.
Returns:
A service connector client that can be used to access the given
resource.
"""
params = {}
if resource_type:
params["resource_type"] = resource_type
if resource_id:
params["resource_id"] = resource_id
response_body = self.get(
f"{SERVICE_CONNECTORS}/{str(service_connector_id)}{SERVICE_CONNECTOR_CLIENT}",
params=params,
)
connector = ServiceConnectorResponseModel.parse_obj(response_body)
self._populate_connector_type(connector)
return connector
def list_service_connector_resources(
self,
user_name_or_id: Union[str, UUID],
workspace_name_or_id: Union[str, UUID],
connector_type: Optional[str] = None,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
) -> List[ServiceConnectorResourcesModel]:
"""List resources that can be accessed by service connectors.
Args:
user_name_or_id: The name or ID of the user to scope to.
workspace_name_or_id: The name or ID of the workspace to scope to.
connector_type: The type of service connector to scope to.
resource_type: The type of resource to scope to.
resource_id: The ID of the resource to scope to.
Returns:
The matching list of resources that available service
connectors have access to.
"""
params = {}
if connector_type:
params["connector_type"] = connector_type
if resource_type:
params["resource_type"] = resource_type
if resource_id:
params["resource_id"] = resource_id
response_body = self.get(
f"{WORKSPACES}/{workspace_name_or_id}{SERVICE_CONNECTORS}{SERVICE_CONNECTOR_RESOURCES}",
params=params,
)
assert isinstance(response_body, list)
resource_list = [
ServiceConnectorResourcesModel.parse_obj(item)
for item in response_body
]
self._populate_connector_type(*resource_list)
# For service connectors with types that are only locally available,
# we need to retrieve the resource list locally
for idx, resources in enumerate(resource_list):
if isinstance(resources.connector_type, str):
# Skip connector types that are neither locally nor remotely
# available
continue
if resources.connector_type.remote:
# Skip connector types that are remotely available
continue
# Retrieve the resource list locally
assert resources.id is not None
connector = self.get_service_connector(resources.id)
connector_instance = (
service_connector_registry.instantiate_connector(
model=connector
)
)
try:
local_resources = connector_instance.verify(
resource_type=resource_type,
resource_id=resource_id,
)
except (ValueError, AuthorizationException) as e:
logger.error(
f'Failed to fetch {resource_type or "available"} '
f"resources from service connector {connector.name}/"
f"{connector.id}: {e}"
)
continue
resource_list[idx] = local_resources
return resource_list
def list_service_connector_types(
self,
connector_type: Optional[str] = None,
resource_type: Optional[str] = None,
auth_method: Optional[str] = None,
) -> List[ServiceConnectorTypeModel]:
"""Get a list of service connector types.
Args:
connector_type: Filter by connector type.
resource_type: Filter by resource type.
auth_method: Filter by authentication method.
Returns:
List of service connector types.
"""
params = {}
if connector_type:
params["connector_type"] = connector_type
if resource_type:
params["resource_type"] = resource_type
if auth_method:
params["auth_method"] = auth_method
response_body = self.get(
SERVICE_CONNECTOR_TYPES,
params=params,
)
assert isinstance(response_body, list)
remote_connector_types = [
ServiceConnectorTypeModel.parse_obj(item) for item in response_body
]
# Mark the remote connector types as being only remotely available
for c in remote_connector_types:
c.local = False
c.remote = True
local_connector_types = (
service_connector_registry.list_service_connector_types(
connector_type=connector_type,
resource_type=resource_type,
auth_method=auth_method,
)
)
# Add the connector types in the local registry to the list of
# connector types available remotely. Overwrite those that have
# the same connector type but mark them as being remotely available.
connector_types_map = {
connector_type.connector_type: connector_type
for connector_type in remote_connector_types
}
for connector in local_connector_types:
if connector.connector_type in connector_types_map:
connector.remote = True
connector_types_map[connector.connector_type] = connector
return list(connector_types_map.values())
def get_service_connector_type(
self,
connector_type: str,
) -> ServiceConnectorTypeModel:
"""Returns the requested service connector type.
Args:
connector_type: the service connector type identifier.
Returns:
The requested service connector type.
"""
# Use the local registry to get the service connector type, if it
# exists.
local_connector_type: Optional[ServiceConnectorTypeModel] = None
if service_connector_registry.is_registered(connector_type):
local_connector_type = (
service_connector_registry.get_service_connector_type(
connector_type
)
)
try:
response_body = self.get(
f"{SERVICE_CONNECTOR_TYPES}/{connector_type}",
)
remote_connector_type = ServiceConnectorTypeModel.parse_obj(
response_body
)
if local_connector_type:
# If locally available, return the local connector type but
# mark it as being remotely available.
local_connector_type.remote = True
return local_connector_type
# Mark the remote connector type as being only remotely available
remote_connector_type.local = False
remote_connector_type.remote = True
return remote_connector_type
except KeyError:
# If the service connector type is not found, check the local
# registry.
return service_connector_registry.get_service_connector_type(
connector_type
)
# =======================
# Internal helper methods
# =======================
def _get_auth_token(self) -> str:
"""Get the authentication token for the REST store.
Returns:
The authentication token.
Raises:
ValueError: if the response from the server isn't in the right
format.
"""
if self._api_token is None:
# Check if the API token is already stored in the config
if self.config.api_token:
self._api_token = self.config.api_token
# Check if the username and password are provided in the config
elif (
self.config.username is not None
and self.config.password is not None
):
response = self._handle_response(
requests.post(
self.url + API + VERSION_1 + LOGIN,
data={
"username": self.config.username,
"password": self.config.password,
},
verify=self.config.verify_ssl,
timeout=self.config.http_timeout,
)
)
if (
not isinstance(response, dict)
or "access_token" not in response
):
raise ValueError(
f"Bad API Response. Expected access token dict, got "
f"{type(response)}"
)
self._api_token = response["access_token"]
self.config.api_token = self._api_token
else:
raise ValueError(
"No API token or username/password provided. Please "
"provide either a token or a username and password in "
"the ZenStore config."
)
return self._api_token
@property
def session(self) -> requests.Session:
"""Authenticate to the ZenML server.
Returns:
A requests session with the authentication token.
"""
if self._session is None:
if self.config.verify_ssl is False:
urllib3.disable_warnings(
urllib3.exceptions.InsecureRequestWarning
)
self._session = requests.Session()
self._session.verify = self.config.verify_ssl
token = self._get_auth_token()
self._session.headers.update({"Authorization": "Bearer " + token})
logger.debug("Authenticated to ZenML server.")
return self._session
@staticmethod
def _handle_response(response: requests.Response) -> Json:
"""Handle API response, translating http status codes to Exception.
Args:
response: The response to handle.
Returns:
The parsed response.
Raises:
ValueError: if the response is not in the right format.
RuntimeError: if an error response is received from the server
and a more specific exception cannot be determined.
exc: the exception converted from an error response, if one
is returned from the server.
"""
if 200 <= response.status_code < 300:
try:
payload: Json = response.json()
return payload
except requests.exceptions.JSONDecodeError:
raise ValueError(
"Bad response from API. Expected json, got\n"
f"{response.text}"
)
elif response.status_code >= 400:
exc = exception_from_response(response)
if exc is not None:
raise exc
else:
raise RuntimeError(
f"{response.status_code} HTTP Error received from server: "
f"{response.text}"
)
else:
raise RuntimeError(
"Error retrieving from API. Got response "
f"{response.status_code} with body:\n{response.text}"
)
def _request(
self,
method: str,
url: str,
params: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Json:
"""Make a request to the REST API.
Args:
method: The HTTP method to use.
url: The URL to request.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The parsed response.
"""
params = {k: str(v) for k, v in params.items()} if params else {}
self.session.headers.update(
{source_context.name: source_context.get().value}
)
try:
return self._handle_response(
self.session.request(
method,
url,
params=params,
verify=self.config.verify_ssl,
timeout=self.config.http_timeout,
**kwargs,
)
)
except AuthorizationException:
# The authentication token could have expired; refresh it and try
# again
self._session = None
return self._handle_response(
self.session.request(
method,
url,
params=params,
verify=self.config.verify_ssl,
timeout=self.config.http_timeout,
**kwargs,
)
)
def get(
self, path: str, params: Optional[Dict[str, Any]] = None, **kwargs: Any
) -> Json:
"""Make a GET request to the given endpoint path.
Args:
path: The path to the endpoint.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending GET request to {path}...")
return self._request(
"GET", self.url + API + VERSION_1 + path, params=params, **kwargs
)
def delete(
self, path: str, params: Optional[Dict[str, Any]] = None, **kwargs: Any
) -> Json:
"""Make a DELETE request to the given endpoint path.
Args:
path: The path to the endpoint.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending DELETE request to {path}...")
return self._request(
"DELETE",
self.url + API + VERSION_1 + path,
params=params,
**kwargs,
)
def post(
self,
path: str,
body: BaseModel,
params: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Json:
"""Make a POST request to the given endpoint path.
Args:
path: The path to the endpoint.
body: The body to send.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending POST request to {path}...")
return self._request(
"POST",
self.url + API + VERSION_1 + path,
data=body.json(),
params=params,
**kwargs,
)
def put(
self,
path: str,
body: Optional[BaseModel] = None,
params: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Json:
"""Make a PUT request to the given endpoint path.
Args:
path: The path to the endpoint.
body: The body to send.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending PUT request to {path}...")
data = body.json(exclude_unset=True) if body else None
return self._request(
"PUT",
self.url + API + VERSION_1 + path,
data=data,
params=params,
**kwargs,
)
def _create_resource(
self,
resource: BaseRequestModel,
response_model: Type[AnyResponseModel],
route: str,
params: Optional[Dict[str, Any]] = None,
) -> AnyResponseModel:
"""Create a new resource.
Args:
resource: The resource to create.
route: The resource REST API route to use.
response_model: Optional model to use to deserialize the response
body. If not provided, the resource class itself will be used.
params: Optional query parameters to pass to the endpoint.
Returns:
The created resource.
"""
response_body = self.post(f"{route}", body=resource, params=params)
return response_model.parse_obj(response_body)
def _create_workspace_scoped_resource(
self,
resource: WorkspaceScopedRequestModel,
response_model: Type[AnyResponseModel],
route: str,
params: Optional[Dict[str, Any]] = None,
) -> AnyResponseModel:
"""Create a new workspace scoped resource.
Args:
resource: The resource to create.
route: The resource REST API route to use.
response_model: Optional model to use to deserialize the response
body. If not provided, the resource class itself will be used.
params: Optional query parameters to pass to the endpoint.
Returns:
The created resource.
"""
return self._create_resource(
resource=resource,
response_model=response_model,
route=f"{WORKSPACES}/{str(resource.workspace)}{route}",
params=params,
)
def _get_or_create_resource(
self,
resource: BaseRequestModel,
response_model: Type[AnyResponseModel],
route: str,
params: Optional[Dict[str, Any]] = None,
) -> Tuple[AnyResponseModel, bool]:
"""Get or create a resource.
Args:
resource: The resource to get or create.
route: The resource REST API route to use.
response_model: Optional model to use to deserialize the response
body. If not provided, the resource class itself will be used.
params: Optional query parameters to pass to the endpoint.
Returns:
The created resource, and a boolean indicating whether the resource
was created or not.
Raises:
ValueError: If the response body is not a list with 2 elements
where the first element is the resource and the second element
a boolean indicating whether the resource was created or not.
"""
response_body = self.post(
f"{route}{GET_OR_CREATE}",
body=resource,
params=params,
)
if not isinstance(response_body, list):
raise ValueError(
f"Expected a list response from the {route}{GET_OR_CREATE} "
f"endpoint but got {type(response_body)} instead."
)
if len(response_body) != 2:
raise ValueError(
f"Expected a list response with 2 elements from the "
f"{route}{GET_OR_CREATE} endpoint but got {len(response_body)} "
f"elements instead."
)
model_json, was_created = response_body
if not isinstance(was_created, bool):
raise ValueError(
f"Expected a boolean as the second element of the list "
f"response from the {route}{GET_OR_CREATE} endpoint but got "
f"{type(was_created)} instead."
)
return response_model.parse_obj(model_json), was_created
def _get_or_create_workspace_scoped_resource(
self,
resource: WorkspaceScopedRequestModel,
response_model: Type[AnyResponseModel],
route: str,
params: Optional[Dict[str, Any]] = None,
) -> Tuple[AnyResponseModel, bool]:
"""Get or create a workspace scoped resource.
Args:
resource: The resource to get or create.
route: The resource REST API route to use.
response_model: Optional model to use to deserialize the response
body. If not provided, the resource class itself will be used.
params: Optional query parameters to pass to the endpoint.
Returns:
The created resource, and a boolean indicating whether the resource
was created or not.
"""
return self._get_or_create_resource(
resource=resource,
response_model=response_model,
route=f"{WORKSPACES}/{str(resource.workspace)}{route}",
params=params,
)
def _get_resource(
self,
resource_id: Union[str, UUID],
route: str,
response_model: Type[AnyResponseModel],
params: Optional[Dict[str, Any]] = None,
) -> AnyResponseModel:
"""Retrieve a single resource.
Args:
resource_id: The ID of the resource to retrieve.
route: The resource REST API route to use.
response_model: Model to use to serialize the response body.
params: Optional query parameters to pass to the endpoint.
Returns:
The retrieved resource.
"""
body = self.get(f"{route}/{str(resource_id)}", params=params)
return response_model.parse_obj(body)
def _list_paginated_resources(
self,
route: str,
response_model: Type[AnyResponseModel],
filter_model: BaseFilterModel,
params: Optional[Dict[str, Any]] = None,
) -> Page[AnyResponseModel]:
"""Retrieve a list of resources filtered by some criteria.
Args:
route: The resource REST API route to use.
response_model: Model to use to serialize the response body.
filter_model: The filter model to use for the list query.
params: Optional query parameters to pass to the endpoint.
Returns:
List of retrieved resources matching the filter criteria.
Raises:
ValueError: If the value returned by the server is not a list.
"""
# leave out filter params that are not supplied
params = params or {}
params.update(filter_model.dict(exclude_none=True))
body = self.get(f"{route}", params=params)
if not isinstance(body, dict):
raise ValueError(
f"Bad API Response. Expected list, got {type(body)}"
)
# The initial page of items will be of type BaseResponseModel
page_of_items: Page[AnyResponseModel] = Page.parse_obj(body)
# So these items will be parsed into their correct types like here
page_of_items.items = [
response_model.parse_obj(generic_item)
for generic_item in page_of_items.items
]
return page_of_items
def _list_resources(
self,
route: str,
response_model: Type[AnyResponseModel],
**filters: Any,
) -> List[AnyResponseModel]:
"""Retrieve a list of resources filtered by some criteria.
Args:
route: The resource REST API route to use.
response_model: Model to use to serialize the response body.
filters: Filter parameters to use in the query.
Returns:
List of retrieved resources matching the filter criteria.
Raises:
ValueError: If the value returned by the server is not a list.
"""
# leave out filter params that are not supplied
params = dict(filter(lambda x: x[1] is not None, filters.items()))
body = self.get(f"{route}", params=params)
if not isinstance(body, list):
raise ValueError(
f"Bad API Response. Expected list, got {type(body)}"
)
return [response_model.parse_obj(entry) for entry in body]
def _update_resource(
self,
resource_id: UUID,
resource_update: BaseModel,
response_model: Type[AnyResponseModel],
route: str,
params: Optional[Dict[str, Any]] = None,
) -> AnyResponseModel:
"""Update an existing resource.
Args:
resource_id: The id of the resource to update.
resource_update: The resource update.
response_model: Optional model to use to deserialize the response
body. If not provided, the resource class itself will be used.
route: The resource REST API route to use.
params: Optional query parameters to pass to the endpoint.
Returns:
The updated resource.
"""
response_body = self.put(
f"{route}/{str(resource_id)}", body=resource_update, params=params
)
return response_model.parse_obj(response_body)
def _delete_resource(
self, resource_id: Union[str, UUID], route: str
) -> None:
"""Delete a resource.
Args:
resource_id: The ID of the resource to delete.
route: The resource REST API route to use.
"""
self.delete(f"{route}/{str(resource_id)}")
session: Session
property
readonly
Authenticate to the ZenML server.
Returns:
Type | Description |
---|---|
Session |
A requests session with the authentication token. |
CONFIG_TYPE (StoreConfiguration)
pydantic-model
REST ZenML store configuration.
Attributes:
Name | Type | Description |
---|---|---|
type |
StoreType |
The type of the store. |
secrets_store |
Optional[zenml.config.secrets_store_config.SecretsStoreConfiguration] |
The configuration of the secrets store to use. This defaults to a REST secrets store that extends the REST ZenML store. |
username |
Optional[str] |
The username to use to connect to the Zen server. |
password |
Optional[str] |
The password to use to connect to the Zen server. |
verify_ssl |
Union[bool, str] |
Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use or the CA bundle value itself. |
http_timeout |
int |
The timeout to use for all requests. |
Source code in zenml/zen_stores/rest_zen_store.py
class RestZenStoreConfiguration(StoreConfiguration):
"""REST ZenML store configuration.
Attributes:
type: The type of the store.
secrets_store: The configuration of the secrets store to use.
This defaults to a REST secrets store that extends the REST ZenML
store.
username: The username to use to connect to the Zen server.
password: The password to use to connect to the Zen server.
verify_ssl: Either a boolean, in which case it controls whether we
verify the server's TLS certificate, or a string, in which case it
must be a path to a CA bundle to use or the CA bundle value itself.
http_timeout: The timeout to use for all requests.
"""
type: StoreType = StoreType.REST
secrets_store: Optional[SecretsStoreConfiguration] = None
username: Optional[str] = None
password: Optional[str] = None
api_token: Optional[str] = None
verify_ssl: Union[bool, str] = True
http_timeout: int = DEFAULT_HTTP_TIMEOUT
@validator("secrets_store")
def validate_secrets_store(
cls, secrets_store: Optional[SecretsStoreConfiguration]
) -> SecretsStoreConfiguration:
"""Ensures that the secrets store uses an associated REST secrets store.
Args:
secrets_store: The secrets store config to be validated.
Returns:
The validated secrets store config.
Raises:
ValueError: If the secrets store is not of type REST.
"""
if secrets_store is None:
secrets_store = RestSecretsStoreConfiguration()
elif secrets_store.type != SecretsStoreType.REST:
raise ValueError(
"The secrets store associated with a REST zen store must be "
f"of type REST, but is of type {secrets_store.type}."
)
return secrets_store
@root_validator
def validate_credentials(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Validates the credentials provided in the values dictionary.
Args:
values: A dictionary containing the values to be validated.
Raises:
ValueError: If neither api_token nor username is set.
Returns:
The values dictionary.
"""
# Check if the values dictionary contains either an api_token or a
# username as non-empty strings.
if values.get("api_token") or values.get("username"):
return values
else:
raise ValueError(
"Neither api_token nor username is set in the store config."
)
@validator("url")
def validate_url(cls, url: str) -> str:
"""Validates that the URL is a well-formed REST store URL.
Args:
url: The URL to be validated.
Returns:
The validated URL without trailing slashes.
Raises:
ValueError: If the URL is not a well-formed REST store URL.
"""
url = url.rstrip("/")
scheme = re.search("^([a-z0-9]+://)", url)
if scheme is None or scheme.group() not in ("https://", "http://"):
raise ValueError(
"Invalid URL for REST store: {url}. Should be in the form "
"https://hostname[:port] or http://hostname[:port]."
)
# When running inside a container, if the URL uses localhost, the
# target service will not be available. We try to replace localhost
# with one of the special Docker or K3D internal hostnames.
url = replace_localhost_with_internal_hostname(url)
return url
@validator("verify_ssl")
def validate_verify_ssl(
cls, verify_ssl: Union[bool, str]
) -> Union[bool, str]:
"""Validates that the verify_ssl either points to a file or is a bool.
Args:
verify_ssl: The verify_ssl value to be validated.
Returns:
The validated verify_ssl value.
"""
secret_folder = Path(
GlobalConfiguration().local_stores_path,
"certificates",
)
if isinstance(verify_ssl, bool) or verify_ssl.startswith(
str(secret_folder)
):
return verify_ssl
if os.path.isfile(verify_ssl):
with open(verify_ssl, "r") as f:
verify_ssl = f.read()
fileio.makedirs(str(secret_folder))
file_path = Path(secret_folder, "ca_bundle.pem")
with open(file_path, "w") as f:
f.write(verify_ssl)
file_path.chmod(0o600)
verify_ssl = str(file_path)
return verify_ssl
@classmethod
def supports_url_scheme(cls, url: str) -> bool:
"""Check if a URL scheme is supported by this store.
Args:
url: The URL to check.
Returns:
True if the URL scheme is supported, False otherwise.
"""
return urlparse(url).scheme in ("http", "https")
def expand_certificates(self) -> None:
"""Expands the certificates in the verify_ssl field."""
# Load the certificate values back into the configuration
if isinstance(self.verify_ssl, str) and os.path.isfile(
self.verify_ssl
):
with open(self.verify_ssl, "r") as f:
self.verify_ssl = f.read()
@classmethod
def copy_configuration(
cls,
config: "StoreConfiguration",
config_path: str,
load_config_path: Optional[PurePath] = None,
) -> "StoreConfiguration":
"""Create a copy of the store config using a different path.
This method is used to create a copy of the store configuration that can
be loaded using a different configuration path or in the context of a
new environment, such as a container image.
The configuration files accompanying the store configuration are also
copied to the new configuration path (e.g. certificates etc.).
Args:
config: The store configuration to copy.
config_path: new path where the configuration copy will be loaded
from.
load_config_path: absolute path that will be used to load the copied
configuration. This can be set to a value different from
`config_path` if the configuration copy will be loaded from
a different environment, e.g. when the configuration is copied
to a container image and loaded using a different absolute path.
This will be reflected in the paths and URLs encoded in the
copied configuration.
Returns:
A new store configuration object that reflects the new configuration
path.
"""
assert isinstance(config, RestZenStoreConfiguration)
assert config.api_token is not None
config = config.copy(exclude={"username", "password"}, deep=True)
# Load the certificate values back into the configuration
config.expand_certificates()
return config
class Config:
"""Pydantic configuration class."""
# Don't validate attributes when assigning them. This is necessary
# because the `verify_ssl` attribute can be expanded to the contents
# of the certificate file.
validate_assignment = False
# Forbid extra attributes set in the class.
extra = "forbid"
Config
Pydantic configuration class.
Source code in zenml/zen_stores/rest_zen_store.py
class Config:
"""Pydantic configuration class."""
# Don't validate attributes when assigning them. This is necessary
# because the `verify_ssl` attribute can be expanded to the contents
# of the certificate file.
validate_assignment = False
# Forbid extra attributes set in the class.
extra = "forbid"
copy_configuration(config, config_path, load_config_path=None)
classmethod
Create a copy of the store config using a different path.
This method is used to create a copy of the store configuration that can be loaded using a different configuration path or in the context of a new environment, such as a container image.
The configuration files accompanying the store configuration are also copied to the new configuration path (e.g. certificates etc.).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
StoreConfiguration |
The store configuration to copy. |
required |
config_path |
str |
new path where the configuration copy will be loaded from. |
required |
load_config_path |
Optional[pathlib.PurePath] |
absolute path that will be used to load the copied
configuration. This can be set to a value different from
|
None |
Returns:
Type | Description |
---|---|
StoreConfiguration |
A new store configuration object that reflects the new configuration path. |
Source code in zenml/zen_stores/rest_zen_store.py
@classmethod
def copy_configuration(
cls,
config: "StoreConfiguration",
config_path: str,
load_config_path: Optional[PurePath] = None,
) -> "StoreConfiguration":
"""Create a copy of the store config using a different path.
This method is used to create a copy of the store configuration that can
be loaded using a different configuration path or in the context of a
new environment, such as a container image.
The configuration files accompanying the store configuration are also
copied to the new configuration path (e.g. certificates etc.).
Args:
config: The store configuration to copy.
config_path: new path where the configuration copy will be loaded
from.
load_config_path: absolute path that will be used to load the copied
configuration. This can be set to a value different from
`config_path` if the configuration copy will be loaded from
a different environment, e.g. when the configuration is copied
to a container image and loaded using a different absolute path.
This will be reflected in the paths and URLs encoded in the
copied configuration.
Returns:
A new store configuration object that reflects the new configuration
path.
"""
assert isinstance(config, RestZenStoreConfiguration)
assert config.api_token is not None
config = config.copy(exclude={"username", "password"}, deep=True)
# Load the certificate values back into the configuration
config.expand_certificates()
return config
expand_certificates(self)
Expands the certificates in the verify_ssl field.
Source code in zenml/zen_stores/rest_zen_store.py
def expand_certificates(self) -> None:
"""Expands the certificates in the verify_ssl field."""
# Load the certificate values back into the configuration
if isinstance(self.verify_ssl, str) and os.path.isfile(
self.verify_ssl
):
with open(self.verify_ssl, "r") as f:
self.verify_ssl = f.read()
supports_url_scheme(url)
classmethod
Check if a URL scheme is supported by this store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The URL to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the URL scheme is supported, False otherwise. |
Source code in zenml/zen_stores/rest_zen_store.py
@classmethod
def supports_url_scheme(cls, url: str) -> bool:
"""Check if a URL scheme is supported by this store.
Args:
url: The URL to check.
Returns:
True if the URL scheme is supported, False otherwise.
"""
return urlparse(url).scheme in ("http", "https")
validate_credentials(values)
classmethod
Validates the credentials provided in the values dictionary.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
values |
Dict[str, Any] |
A dictionary containing the values to be validated. |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
If neither api_token nor username is set. |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The values dictionary. |
Source code in zenml/zen_stores/rest_zen_store.py
@root_validator
def validate_credentials(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Validates the credentials provided in the values dictionary.
Args:
values: A dictionary containing the values to be validated.
Raises:
ValueError: If neither api_token nor username is set.
Returns:
The values dictionary.
"""
# Check if the values dictionary contains either an api_token or a
# username as non-empty strings.
if values.get("api_token") or values.get("username"):
return values
else:
raise ValueError(
"Neither api_token nor username is set in the store config."
)
validate_secrets_store(secrets_store)
classmethod
Ensures that the secrets store uses an associated REST secrets store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secrets_store |
Optional[zenml.config.secrets_store_config.SecretsStoreConfiguration] |
The secrets store config to be validated. |
required |
Returns:
Type | Description |
---|---|
SecretsStoreConfiguration |
The validated secrets store config. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the secrets store is not of type REST. |
Source code in zenml/zen_stores/rest_zen_store.py
@validator("secrets_store")
def validate_secrets_store(
cls, secrets_store: Optional[SecretsStoreConfiguration]
) -> SecretsStoreConfiguration:
"""Ensures that the secrets store uses an associated REST secrets store.
Args:
secrets_store: The secrets store config to be validated.
Returns:
The validated secrets store config.
Raises:
ValueError: If the secrets store is not of type REST.
"""
if secrets_store is None:
secrets_store = RestSecretsStoreConfiguration()
elif secrets_store.type != SecretsStoreType.REST:
raise ValueError(
"The secrets store associated with a REST zen store must be "
f"of type REST, but is of type {secrets_store.type}."
)
return secrets_store
validate_url(url)
classmethod
Validates that the URL is a well-formed REST store URL.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The URL to be validated. |
required |
Returns:
Type | Description |
---|---|
str |
The validated URL without trailing slashes. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the URL is not a well-formed REST store URL. |
Source code in zenml/zen_stores/rest_zen_store.py
@validator("url")
def validate_url(cls, url: str) -> str:
"""Validates that the URL is a well-formed REST store URL.
Args:
url: The URL to be validated.
Returns:
The validated URL without trailing slashes.
Raises:
ValueError: If the URL is not a well-formed REST store URL.
"""
url = url.rstrip("/")
scheme = re.search("^([a-z0-9]+://)", url)
if scheme is None or scheme.group() not in ("https://", "http://"):
raise ValueError(
"Invalid URL for REST store: {url}. Should be in the form "
"https://hostname[:port] or http://hostname[:port]."
)
# When running inside a container, if the URL uses localhost, the
# target service will not be available. We try to replace localhost
# with one of the special Docker or K3D internal hostnames.
url = replace_localhost_with_internal_hostname(url)
return url
validate_verify_ssl(verify_ssl)
classmethod
Validates that the verify_ssl either points to a file or is a bool.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
verify_ssl |
Union[bool, str] |
The verify_ssl value to be validated. |
required |
Returns:
Type | Description |
---|---|
Union[bool, str] |
The validated verify_ssl value. |
Source code in zenml/zen_stores/rest_zen_store.py
@validator("verify_ssl")
def validate_verify_ssl(
cls, verify_ssl: Union[bool, str]
) -> Union[bool, str]:
"""Validates that the verify_ssl either points to a file or is a bool.
Args:
verify_ssl: The verify_ssl value to be validated.
Returns:
The validated verify_ssl value.
"""
secret_folder = Path(
GlobalConfiguration().local_stores_path,
"certificates",
)
if isinstance(verify_ssl, bool) or verify_ssl.startswith(
str(secret_folder)
):
return verify_ssl
if os.path.isfile(verify_ssl):
with open(verify_ssl, "r") as f:
verify_ssl = f.read()
fileio.makedirs(str(secret_folder))
file_path = Path(secret_folder, "ca_bundle.pem")
with open(file_path, "w") as f:
f.write(verify_ssl)
file_path.chmod(0o600)
verify_ssl = str(file_path)
return verify_ssl
create_artifact(self, artifact)
Creates an artifact.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifact |
ArtifactRequestModel |
The artifact to create. |
required |
Returns:
Type | Description |
---|---|
ArtifactResponseModel |
The created artifact. |
Source code in zenml/zen_stores/rest_zen_store.py
def create_artifact(
self, artifact: ArtifactRequestModel
) -> ArtifactResponseModel:
"""Creates an artifact.
Args:
artifact: The artifact to create.
Returns:
The created artifact.
"""
return self._create_resource(
resource=artifact,
response_model=ArtifactResponseModel,
route=ARTIFACTS,
)
create_build(self, build)
Creates a new build in a workspace.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
build |
PipelineBuildRequestModel |
The build to create. |
required |
Returns:
Type | Description |
---|---|
PipelineBuildResponseModel |
The newly created build. |
Source code in zenml/zen_stores/rest_zen_store.py
def create_build(
self,
build: PipelineBuildRequestModel,
) -> PipelineBuildResponseModel:
"""Creates a new build in a workspace.
Args:
build: The build to create.
Returns:
The newly created build.
"""
return self._create_workspace_scoped_resource(
resource=build,
route=PIPELINE_BUILDS,
response_model=PipelineBuildResponseModel,
)
create_code_repository(self, code_repository)
Creates a new code repository.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
code_repository |
CodeRepositoryRequestModel |
Code repository to be created. |
required |
Returns:
Type | Description |
---|---|
CodeRepositoryResponseModel |
The newly created code repository. |
Source code in zenml/zen_stores/rest_zen_store.py
def create_code_repository(
self, code_repository: CodeRepositoryRequestModel
) -> CodeRepositoryResponseModel:
"""Creates a new code repository.
Args:
code_repository: Code repository to be created.
Returns:
The newly created code repository.
"""
return self._create_workspace_scoped_resource(
resource=code_repository,
response_model=CodeRepositoryResponseModel,
route=CODE_REPOSITORIES,
)
create_deployment(self, deployment)
Creates a new deployment in a workspace.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentRequestModel |
The deployment to create. |
required |
Returns:
Type | Description |
---|---|
PipelineDeploymentResponseModel |
The newly created deployment. |
Source code in zenml/zen_stores/rest_zen_store.py
def create_deployment(
self,
deployment: PipelineDeploymentRequestModel,
) -> PipelineDeploymentResponseModel:
"""Creates a new deployment in a workspace.
Args:
deployment: The deployment to create.
Returns:
The newly created deployment.
"""
return self._create_workspace_scoped_resource(
resource=deployment,
route=PIPELINE_DEPLOYMENTS,
response_model=PipelineDeploymentResponseModel,
)
create_flavor(self, flavor)
Creates a new stack component flavor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flavor |
FlavorRequestModel |
The stack component flavor to create. |
required |
Returns:
Type | Description |
---|---|
FlavorResponseModel |
The newly created flavor. |
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.CREATED_FLAVOR)
def create_flavor(self, flavor: FlavorRequestModel) -> FlavorResponseModel:
"""Creates a new stack component flavor.
Args:
flavor: The stack component flavor to create.
Returns:
The newly created flavor.
"""
return self._create_resource(
resource=flavor,
route=FLAVORS,
response_model=FlavorResponseModel,
)
create_pipeline(self, pipeline)
Creates a new pipeline in a workspace.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
PipelineRequestModel |
The pipeline to create. |
required |
Returns:
Type | Description |
---|---|
PipelineResponseModel |
The newly created pipeline. |
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.CREATE_PIPELINE)
def create_pipeline(
self, pipeline: PipelineRequestModel
) -> PipelineResponseModel:
"""Creates a new pipeline in a workspace.
Args:
pipeline: The pipeline to create.
Returns:
The newly created pipeline.
"""
return self._create_workspace_scoped_resource(
resource=pipeline,
route=PIPELINES,
response_model=PipelineResponseModel,
)
create_role(self, role)
Creates a new role.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role |
RoleRequestModel |
The role model to create. |
required |
Returns:
Type | Description |
---|---|
RoleResponseModel |
The newly created role. |
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.CREATED_ROLE)
def create_role(self, role: RoleRequestModel) -> RoleResponseModel:
"""Creates a new role.
Args:
role: The role model to create.
Returns:
The newly created role.
"""
return self._create_resource(
resource=role,
route=ROLES,
response_model=RoleResponseModel,
)
create_run(self, pipeline_run)
Creates a pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_run |
PipelineRunRequestModel |
The pipeline run to create. |
required |
Returns:
Type | Description |
---|---|
PipelineRunResponseModel |
The created pipeline run. |
Source code in zenml/zen_stores/rest_zen_store.py
def create_run(
self, pipeline_run: PipelineRunRequestModel
) -> PipelineRunResponseModel:
"""Creates a pipeline run.
Args:
pipeline_run: The pipeline run to create.
Returns:
The created pipeline run.
"""
return self._create_workspace_scoped_resource(
resource=pipeline_run,
response_model=PipelineRunResponseModel,
route=RUNS,
)
create_run_metadata(self, run_metadata)
Creates run metadata.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_metadata |
RunMetadataRequestModel |
The run metadata to create. |
required |
Returns:
Type | Description |
---|---|
RunMetadataResponseModel |
The created run metadata. |
Source code in zenml/zen_stores/rest_zen_store.py
def create_run_metadata(
self, run_metadata: RunMetadataRequestModel
) -> RunMetadataResponseModel:
"""Creates run metadata.
Args:
run_metadata: The run metadata to create.
Returns:
The created run metadata.
"""
return self._create_workspace_scoped_resource(
resource=run_metadata,
response_model=RunMetadataResponseModel,
route=RUN_METADATA,
)
create_run_step(self, step_run)
Creates a step run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_run |
StepRunRequestModel |
The step run to create. |
required |
Returns:
Type | Description |
---|---|
StepRunResponseModel |
The created step run. |
Source code in zenml/zen_stores/rest_zen_store.py
def create_run_step(
self, step_run: StepRunRequestModel
) -> StepRunResponseModel:
"""Creates a step run.
Args:
step_run: The step run to create.
Returns:
The created step run.
"""
return self._create_resource(
resource=step_run,
response_model=StepRunResponseModel,
route=STEPS,
)
create_schedule(self, schedule)
Creates a new schedule.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
schedule |
ScheduleRequestModel |
The schedule to create. |
required |
Returns:
Type | Description |
---|---|
ScheduleResponseModel |
The newly created schedule. |
Source code in zenml/zen_stores/rest_zen_store.py
def create_schedule(
self, schedule: ScheduleRequestModel
) -> ScheduleResponseModel:
"""Creates a new schedule.
Args:
schedule: The schedule to create.
Returns:
The newly created schedule.
"""
return self._create_workspace_scoped_resource(
resource=schedule,
route=SCHEDULES,
response_model=ScheduleResponseModel,
)
create_service_connector(self, service_connector)
Creates a new service connector.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service_connector |
ServiceConnectorRequestModel |
Service connector to be created. |
required |
Returns:
Type | Description |
---|---|
ServiceConnectorResponseModel |
The newly created service connector. |
Source code in zenml/zen_stores/rest_zen_store.py
def create_service_connector(
self, service_connector: ServiceConnectorRequestModel
) -> ServiceConnectorResponseModel:
"""Creates a new service connector.
Args:
service_connector: Service connector to be created.
Returns:
The newly created service connector.
"""
connector_model = self._create_workspace_scoped_resource(
resource=service_connector,
route=SERVICE_CONNECTORS,
response_model=ServiceConnectorResponseModel,
)
self._populate_connector_type(connector_model)
return connector_model
create_stack(self, stack)
Register a new stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stack |
StackRequestModel |
The stack to register. |
required |
Returns:
Type | Description |
---|---|
StackResponseModel |
The registered stack. |
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.REGISTERED_STACK)
def create_stack(self, stack: StackRequestModel) -> StackResponseModel:
"""Register a new stack.
Args:
stack: The stack to register.
Returns:
The registered stack.
"""
return self._create_workspace_scoped_resource(
resource=stack,
route=STACKS,
response_model=StackResponseModel,
)
create_stack_component(self, component)
Create a stack component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component |
ComponentRequestModel |
The stack component to create. |
required |
Returns:
Type | Description |
---|---|
ComponentResponseModel |
The created stack component. |
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.REGISTERED_STACK_COMPONENT)
def create_stack_component(
self,
component: ComponentRequestModel,
) -> ComponentResponseModel:
"""Create a stack component.
Args:
component: The stack component to create.
Returns:
The created stack component.
"""
return self._create_workspace_scoped_resource(
resource=component,
route=STACK_COMPONENTS,
response_model=ComponentResponseModel,
)
create_team(self, team)
Creates a new team.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team |
TeamRequestModel |
The team model to create. |
required |
Returns:
Type | Description |
---|---|
TeamResponseModel |
The newly created team. |
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.CREATED_TEAM)
def create_team(self, team: TeamRequestModel) -> TeamResponseModel:
"""Creates a new team.
Args:
team: The team model to create.
Returns:
The newly created team.
"""
return self._create_resource(
resource=team,
route=TEAMS,
response_model=TeamResponseModel,
)
create_team_role_assignment(self, team_role_assignment)
Creates a new team role assignment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team_role_assignment |
TeamRoleAssignmentRequestModel |
The role assignment model to create. |
required |
Returns:
Type | Description |
---|---|
TeamRoleAssignmentResponseModel |
The newly created role assignment. |
Source code in zenml/zen_stores/rest_zen_store.py
def create_team_role_assignment(
self, team_role_assignment: TeamRoleAssignmentRequestModel
) -> TeamRoleAssignmentResponseModel:
"""Creates a new team role assignment.
Args:
team_role_assignment: The role assignment model to create.
Returns:
The newly created role assignment.
"""
return self._create_resource(
resource=team_role_assignment,
route=TEAM_ROLE_ASSIGNMENTS,
response_model=TeamRoleAssignmentResponseModel,
)
create_user(self, user)
Creates a new user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user |
UserRequestModel |
User to be created. |
required |
Returns:
Type | Description |
---|---|
UserResponseModel |
The newly created user. |
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.CREATED_USER)
def create_user(self, user: UserRequestModel) -> UserResponseModel:
"""Creates a new user.
Args:
user: User to be created.
Returns:
The newly created user.
"""
return self._create_resource(
resource=user,
route=USERS + "?assign_default_role=False",
response_model=UserResponseModel,
)
create_user_role_assignment(self, user_role_assignment)
Creates a new role assignment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_role_assignment |
UserRoleAssignmentRequestModel |
The role assignment to create. |
required |
Returns:
Type | Description |
---|---|
UserRoleAssignmentResponseModel |
The newly created workspace. |
Source code in zenml/zen_stores/rest_zen_store.py
def create_user_role_assignment(
self, user_role_assignment: UserRoleAssignmentRequestModel
) -> UserRoleAssignmentResponseModel:
"""Creates a new role assignment.
Args:
user_role_assignment: The role assignment to create.
Returns:
The newly created workspace.
"""
return self._create_resource(
resource=user_role_assignment,
route=USER_ROLE_ASSIGNMENTS,
response_model=UserRoleAssignmentResponseModel,
)
create_workspace(self, workspace)
Creates a new workspace.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workspace |
WorkspaceRequestModel |
The workspace to create. |
required |
Returns:
Type | Description |
---|---|
WorkspaceResponseModel |
The newly created workspace. |
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.CREATED_WORKSPACE)
def create_workspace(
self, workspace: WorkspaceRequestModel
) -> WorkspaceResponseModel:
"""Creates a new workspace.
Args:
workspace: The workspace to create.
Returns:
The newly created workspace.
"""
return self._create_resource(
resource=workspace,
route=WORKSPACES,
response_model=WorkspaceResponseModel,
)
delete(self, path, params=None, **kwargs)
Make a DELETE request to the given endpoint path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path to the endpoint. |
required |
params |
Optional[Dict[str, Any]] |
The query parameters to pass to the endpoint. |
None |
kwargs |
Any |
Additional keyword arguments to pass to the request. |
{} |
Returns:
Type | Description |
---|---|
Union[Dict[str, Any], List[Any], str, int, float, bool] |
The response body. |
Source code in zenml/zen_stores/rest_zen_store.py
def delete(
self, path: str, params: Optional[Dict[str, Any]] = None, **kwargs: Any
) -> Json:
"""Make a DELETE request to the given endpoint path.
Args:
path: The path to the endpoint.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending DELETE request to {path}...")
return self._request(
"DELETE",
self.url + API + VERSION_1 + path,
params=params,
**kwargs,
)
delete_artifact(self, artifact_id)
Deletes an artifact.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifact_id |
UUID |
The ID of the artifact to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
def delete_artifact(self, artifact_id: UUID) -> None:
"""Deletes an artifact.
Args:
artifact_id: The ID of the artifact to delete.
"""
self._delete_resource(resource_id=artifact_id, route=ARTIFACTS)
delete_build(self, build_id)
Deletes a build.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
build_id |
UUID |
The ID of the build to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
def delete_build(self, build_id: UUID) -> None:
"""Deletes a build.
Args:
build_id: The ID of the build to delete.
"""
self._delete_resource(
resource_id=build_id,
route=PIPELINE_BUILDS,
)
delete_code_repository(self, code_repository_id)
Deletes a code repository.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
code_repository_id |
UUID |
The ID of the code repository to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
def delete_code_repository(self, code_repository_id: UUID) -> None:
"""Deletes a code repository.
Args:
code_repository_id: The ID of the code repository to delete.
"""
self._delete_resource(
resource_id=code_repository_id, route=CODE_REPOSITORIES
)
delete_deployment(self, deployment_id)
Deletes a deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment_id |
UUID |
The ID of the deployment to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
def delete_deployment(self, deployment_id: UUID) -> None:
"""Deletes a deployment.
Args:
deployment_id: The ID of the deployment to delete.
"""
self._delete_resource(
resource_id=deployment_id,
route=PIPELINE_DEPLOYMENTS,
)
delete_flavor(self, flavor_id)
Delete a stack component flavor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flavor_id |
UUID |
The ID of the stack component flavor to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.DELETED_FLAVOR)
def delete_flavor(self, flavor_id: UUID) -> None:
"""Delete a stack component flavor.
Args:
flavor_id: The ID of the stack component flavor to delete.
"""
self._delete_resource(
resource_id=flavor_id,
route=FLAVORS,
)
delete_pipeline(self, pipeline_id)
Deletes a pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_id |
UUID |
The ID of the pipeline to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.DELETE_PIPELINE)
def delete_pipeline(self, pipeline_id: UUID) -> None:
"""Deletes a pipeline.
Args:
pipeline_id: The ID of the pipeline to delete.
"""
self._delete_resource(
resource_id=pipeline_id,
route=PIPELINES,
)
delete_role(self, role_name_or_id)
Deletes a role.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the role to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.DELETED_ROLE)
def delete_role(self, role_name_or_id: Union[str, UUID]) -> None:
"""Deletes a role.
Args:
role_name_or_id: Name or ID of the role to delete.
"""
self._delete_resource(
resource_id=role_name_or_id,
route=ROLES,
)
delete_run(self, run_id)
Deletes a pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id |
UUID |
The ID of the pipeline run to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
def delete_run(self, run_id: UUID) -> None:
"""Deletes a pipeline run.
Args:
run_id: The ID of the pipeline run to delete.
"""
self._delete_resource(
resource_id=run_id,
route=RUNS,
)
delete_schedule(self, schedule_id)
Deletes a schedule.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
schedule_id |
UUID |
The ID of the schedule to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
def delete_schedule(self, schedule_id: UUID) -> None:
"""Deletes a schedule.
Args:
schedule_id: The ID of the schedule to delete.
"""
self._delete_resource(
resource_id=schedule_id,
route=SCHEDULES,
)
delete_service_connector(self, service_connector_id)
Deletes a service connector.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service_connector_id |
UUID |
The ID of the service connector to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
def delete_service_connector(self, service_connector_id: UUID) -> None:
"""Deletes a service connector.
Args:
service_connector_id: The ID of the service connector to delete.
"""
self._delete_resource(
resource_id=service_connector_id, route=SERVICE_CONNECTORS
)
delete_stack(self, stack_id)
Delete a stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stack_id |
UUID |
The ID of the stack to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.DELETED_STACK)
def delete_stack(self, stack_id: UUID) -> None:
"""Delete a stack.
Args:
stack_id: The ID of the stack to delete.
"""
self._delete_resource(
resource_id=stack_id,
route=STACKS,
)
delete_stack_component(self, component_id)
Delete a stack component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_id |
UUID |
The ID of the stack component to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.DELETED_STACK_COMPONENT)
def delete_stack_component(self, component_id: UUID) -> None:
"""Delete a stack component.
Args:
component_id: The ID of the stack component to delete.
"""
self._delete_resource(
resource_id=component_id,
route=STACK_COMPONENTS,
)
delete_team(self, team_name_or_id)
Deletes a team.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the team to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.DELETED_TEAM)
def delete_team(self, team_name_or_id: Union[str, UUID]) -> None:
"""Deletes a team.
Args:
team_name_or_id: Name or ID of the team to delete.
"""
self._delete_resource(
resource_id=team_name_or_id,
route=TEAMS,
)
delete_team_role_assignment(self, team_role_assignment_id)
Delete a specific role assignment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team_role_assignment_id |
UUID |
The ID of the specific role assignment |
required |
Source code in zenml/zen_stores/rest_zen_store.py
def delete_team_role_assignment(
self, team_role_assignment_id: UUID
) -> None:
"""Delete a specific role assignment.
Args:
team_role_assignment_id: The ID of the specific role assignment
"""
self._delete_resource(
resource_id=team_role_assignment_id,
route=TEAM_ROLE_ASSIGNMENTS,
)
delete_user(self, user_name_or_id)
Deletes a user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_name_or_id |
Union[str, uuid.UUID] |
The name or ID of the user to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.DELETED_USER)
def delete_user(self, user_name_or_id: Union[str, UUID]) -> None:
"""Deletes a user.
Args:
user_name_or_id: The name or ID of the user to delete.
"""
self._delete_resource(
resource_id=user_name_or_id,
route=USERS,
)
delete_user_role_assignment(self, user_role_assignment_id)
Delete a specific role assignment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_role_assignment_id |
UUID |
The ID of the specific role assignment |
required |
Source code in zenml/zen_stores/rest_zen_store.py
def delete_user_role_assignment(
self, user_role_assignment_id: UUID
) -> None:
"""Delete a specific role assignment.
Args:
user_role_assignment_id: The ID of the specific role assignment
"""
self._delete_resource(
resource_id=user_role_assignment_id,
route=USER_ROLE_ASSIGNMENTS,
)
delete_workspace(self, workspace_name_or_id)
Deletes a workspace.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workspace_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the workspace to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.DELETED_WORKSPACE)
def delete_workspace(self, workspace_name_or_id: Union[str, UUID]) -> None:
"""Deletes a workspace.
Args:
workspace_name_or_id: Name or ID of the workspace to delete.
"""
self._delete_resource(
resource_id=workspace_name_or_id,
route=WORKSPACES,
)
get(self, path, params=None, **kwargs)
Make a GET request to the given endpoint path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path to the endpoint. |
required |
params |
Optional[Dict[str, Any]] |
The query parameters to pass to the endpoint. |
None |
kwargs |
Any |
Additional keyword arguments to pass to the request. |
{} |
Returns:
Type | Description |
---|---|
Union[Dict[str, Any], List[Any], str, int, float, bool] |
The response body. |
Source code in zenml/zen_stores/rest_zen_store.py
def get(
self, path: str, params: Optional[Dict[str, Any]] = None, **kwargs: Any
) -> Json:
"""Make a GET request to the given endpoint path.
Args:
path: The path to the endpoint.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending GET request to {path}...")
return self._request(
"GET", self.url + API + VERSION_1 + path, params=params, **kwargs
)
get_artifact(self, artifact_id)
Gets an artifact.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifact_id |
UUID |
The ID of the artifact to get. |
required |
Returns:
Type | Description |
---|---|
ArtifactResponseModel |
The artifact. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_artifact(self, artifact_id: UUID) -> ArtifactResponseModel:
"""Gets an artifact.
Args:
artifact_id: The ID of the artifact to get.
Returns:
The artifact.
"""
return self._get_resource(
resource_id=artifact_id,
route=ARTIFACTS,
response_model=ArtifactResponseModel,
)
get_build(self, build_id)
Get a build with a given ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
build_id |
UUID |
ID of the build. |
required |
Returns:
Type | Description |
---|---|
PipelineBuildResponseModel |
The build. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_build(self, build_id: UUID) -> PipelineBuildResponseModel:
"""Get a build with a given ID.
Args:
build_id: ID of the build.
Returns:
The build.
"""
return self._get_resource(
resource_id=build_id,
route=PIPELINE_BUILDS,
response_model=PipelineBuildResponseModel,
)
get_code_repository(self, code_repository_id)
Gets a specific code repository.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
code_repository_id |
UUID |
The ID of the code repository to get. |
required |
Returns:
Type | Description |
---|---|
CodeRepositoryResponseModel |
The requested code repository, if it was found. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_code_repository(
self, code_repository_id: UUID
) -> CodeRepositoryResponseModel:
"""Gets a specific code repository.
Args:
code_repository_id: The ID of the code repository to get.
Returns:
The requested code repository, if it was found.
"""
return self._get_resource(
resource_id=code_repository_id,
route=CODE_REPOSITORIES,
response_model=CodeRepositoryResponseModel,
)
get_deployment(self, deployment_id)
Get a deployment with a given ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment_id |
UUID |
ID of the deployment. |
required |
Returns:
Type | Description |
---|---|
PipelineDeploymentResponseModel |
The deployment. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_deployment(
self, deployment_id: UUID
) -> PipelineDeploymentResponseModel:
"""Get a deployment with a given ID.
Args:
deployment_id: ID of the deployment.
Returns:
The deployment.
"""
return self._get_resource(
resource_id=deployment_id,
route=PIPELINE_DEPLOYMENTS,
response_model=PipelineDeploymentResponseModel,
)
get_flavor(self, flavor_id)
Get a stack component flavor by ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flavor_id |
UUID |
The ID of the stack component flavor to get. |
required |
Returns:
Type | Description |
---|---|
FlavorResponseModel |
The stack component flavor. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_flavor(self, flavor_id: UUID) -> FlavorResponseModel:
"""Get a stack component flavor by ID.
Args:
flavor_id: The ID of the stack component flavor to get.
Returns:
The stack component flavor.
"""
return self._get_resource(
resource_id=flavor_id,
route=FLAVORS,
response_model=FlavorResponseModel,
)
get_or_create_run(self, pipeline_run)
Gets or creates a pipeline run.
If a run with the same ID or name already exists, it is returned. Otherwise, a new run is created.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_run |
PipelineRunRequestModel |
The pipeline run to get or create. |
required |
Returns:
Type | Description |
---|---|
Tuple[zenml.models.pipeline_run_models.PipelineRunResponseModel, bool] |
The pipeline run, and a boolean indicating whether the run was created or not. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_or_create_run(
self, pipeline_run: PipelineRunRequestModel
) -> Tuple[PipelineRunResponseModel, bool]:
"""Gets or creates a pipeline run.
If a run with the same ID or name already exists, it is returned.
Otherwise, a new run is created.
Args:
pipeline_run: The pipeline run to get or create.
Returns:
The pipeline run, and a boolean indicating whether the run was
created or not.
"""
return self._get_or_create_workspace_scoped_resource(
resource=pipeline_run,
route=RUNS,
response_model=PipelineRunResponseModel,
)
get_pipeline(self, pipeline_id)
Get a pipeline with a given ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_id |
UUID |
ID of the pipeline. |
required |
Returns:
Type | Description |
---|---|
PipelineResponseModel |
The pipeline. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_pipeline(self, pipeline_id: UUID) -> PipelineResponseModel:
"""Get a pipeline with a given ID.
Args:
pipeline_id: ID of the pipeline.
Returns:
The pipeline.
"""
return self._get_resource(
resource_id=pipeline_id,
route=PIPELINES,
response_model=PipelineResponseModel,
)
get_role(self, role_name_or_id)
Gets a specific role.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the role to get. |
required |
Returns:
Type | Description |
---|---|
RoleResponseModel |
The requested role. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_role(self, role_name_or_id: Union[str, UUID]) -> RoleResponseModel:
"""Gets a specific role.
Args:
role_name_or_id: Name or ID of the role to get.
Returns:
The requested role.
"""
return self._get_resource(
resource_id=role_name_or_id,
route=ROLES,
response_model=RoleResponseModel,
)
get_run(self, run_name_or_id)
Gets a pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_name_or_id |
Union[uuid.UUID, str] |
The name or ID of the pipeline run to get. |
required |
Returns:
Type | Description |
---|---|
PipelineRunResponseModel |
The pipeline run. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_run(
self, run_name_or_id: Union[UUID, str]
) -> PipelineRunResponseModel:
"""Gets a pipeline run.
Args:
run_name_or_id: The name or ID of the pipeline run to get.
Returns:
The pipeline run.
"""
return self._get_resource(
resource_id=run_name_or_id,
route=RUNS,
response_model=PipelineRunResponseModel,
)
get_run_step(self, step_run_id)
Get a step run by ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_run_id |
UUID |
The ID of the step run to get. |
required |
Returns:
Type | Description |
---|---|
StepRunResponseModel |
The step run. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_run_step(self, step_run_id: UUID) -> StepRunResponseModel:
"""Get a step run by ID.
Args:
step_run_id: The ID of the step run to get.
Returns:
The step run.
"""
return self._get_resource(
resource_id=step_run_id,
route=STEPS,
response_model=StepRunResponseModel,
)
get_schedule(self, schedule_id)
Get a schedule with a given ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
schedule_id |
UUID |
ID of the schedule. |
required |
Returns:
Type | Description |
---|---|
ScheduleResponseModel |
The schedule. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_schedule(self, schedule_id: UUID) -> ScheduleResponseModel:
"""Get a schedule with a given ID.
Args:
schedule_id: ID of the schedule.
Returns:
The schedule.
"""
return self._get_resource(
resource_id=schedule_id,
route=SCHEDULES,
response_model=ScheduleResponseModel,
)
get_service_connector(self, service_connector_id)
Gets a specific service connector.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service_connector_id |
UUID |
The ID of the service connector to get. |
required |
Returns:
Type | Description |
---|---|
ServiceConnectorResponseModel |
The requested service connector, if it was found. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_service_connector(
self, service_connector_id: UUID
) -> ServiceConnectorResponseModel:
"""Gets a specific service connector.
Args:
service_connector_id: The ID of the service connector to get.
Returns:
The requested service connector, if it was found.
"""
connector_model = self._get_resource(
resource_id=service_connector_id,
route=SERVICE_CONNECTORS,
response_model=ServiceConnectorResponseModel,
params={"expand_secrets": False},
)
self._populate_connector_type(connector_model)
return connector_model
get_service_connector_client(self, service_connector_id, resource_type=None, resource_id=None)
Get a service connector client for a service connector and given resource.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service_connector_id |
UUID |
The ID of the base service connector to use. |
required |
resource_type |
Optional[str] |
The type of resource to get a client for. |
None |
resource_id |
Optional[str] |
The ID of the resource to get a client for. |
None |
Returns:
Type | Description |
---|---|
ServiceConnectorResponseModel |
A service connector client that can be used to access the given resource. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_service_connector_client(
self,
service_connector_id: UUID,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
) -> ServiceConnectorResponseModel:
"""Get a service connector client for a service connector and given resource.
Args:
service_connector_id: The ID of the base service connector to use.
resource_type: The type of resource to get a client for.
resource_id: The ID of the resource to get a client for.
Returns:
A service connector client that can be used to access the given
resource.
"""
params = {}
if resource_type:
params["resource_type"] = resource_type
if resource_id:
params["resource_id"] = resource_id
response_body = self.get(
f"{SERVICE_CONNECTORS}/{str(service_connector_id)}{SERVICE_CONNECTOR_CLIENT}",
params=params,
)
connector = ServiceConnectorResponseModel.parse_obj(response_body)
self._populate_connector_type(connector)
return connector
get_service_connector_type(self, connector_type)
Returns the requested service connector type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
connector_type |
str |
the service connector type identifier. |
required |
Returns:
Type | Description |
---|---|
ServiceConnectorTypeModel |
The requested service connector type. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_service_connector_type(
self,
connector_type: str,
) -> ServiceConnectorTypeModel:
"""Returns the requested service connector type.
Args:
connector_type: the service connector type identifier.
Returns:
The requested service connector type.
"""
# Use the local registry to get the service connector type, if it
# exists.
local_connector_type: Optional[ServiceConnectorTypeModel] = None
if service_connector_registry.is_registered(connector_type):
local_connector_type = (
service_connector_registry.get_service_connector_type(
connector_type
)
)
try:
response_body = self.get(
f"{SERVICE_CONNECTOR_TYPES}/{connector_type}",
)
remote_connector_type = ServiceConnectorTypeModel.parse_obj(
response_body
)
if local_connector_type:
# If locally available, return the local connector type but
# mark it as being remotely available.
local_connector_type.remote = True
return local_connector_type
# Mark the remote connector type as being only remotely available
remote_connector_type.local = False
remote_connector_type.remote = True
return remote_connector_type
except KeyError:
# If the service connector type is not found, check the local
# registry.
return service_connector_registry.get_service_connector_type(
connector_type
)
get_stack(self, stack_id)
Get a stack by its unique ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stack_id |
UUID |
The ID of the stack to get. |
required |
Returns:
Type | Description |
---|---|
StackResponseModel |
The stack with the given ID. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_stack(self, stack_id: UUID) -> StackResponseModel:
"""Get a stack by its unique ID.
Args:
stack_id: The ID of the stack to get.
Returns:
The stack with the given ID.
"""
return self._get_resource(
resource_id=stack_id,
route=STACKS,
response_model=StackResponseModel,
)
get_stack_component(self, component_id)
Get a stack component by ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_id |
UUID |
The ID of the stack component to get. |
required |
Returns:
Type | Description |
---|---|
ComponentResponseModel |
The stack component. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_stack_component(
self, component_id: UUID
) -> ComponentResponseModel:
"""Get a stack component by ID.
Args:
component_id: The ID of the stack component to get.
Returns:
The stack component.
"""
return self._get_resource(
resource_id=component_id,
route=STACK_COMPONENTS,
response_model=ComponentResponseModel,
)
get_store_info(self)
Get information about the server.
Returns:
Type | Description |
---|---|
ServerModel |
Information about the server. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_store_info(self) -> ServerModel:
"""Get information about the server.
Returns:
Information about the server.
"""
body = self.get(INFO)
return ServerModel.parse_obj(body)
get_team(self, team_name_or_id)
Gets a specific team.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the team to get. |
required |
Returns:
Type | Description |
---|---|
TeamResponseModel |
The requested team. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_team(self, team_name_or_id: Union[str, UUID]) -> TeamResponseModel:
"""Gets a specific team.
Args:
team_name_or_id: Name or ID of the team to get.
Returns:
The requested team.
"""
return self._get_resource(
resource_id=team_name_or_id,
route=TEAMS,
response_model=TeamResponseModel,
)
get_team_role_assignment(self, team_role_assignment_id)
Gets a specific role assignment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team_role_assignment_id |
UUID |
ID of the role assignment to get. |
required |
Returns:
Type | Description |
---|---|
TeamRoleAssignmentResponseModel |
The requested role assignment. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_team_role_assignment(
self, team_role_assignment_id: UUID
) -> TeamRoleAssignmentResponseModel:
"""Gets a specific role assignment.
Args:
team_role_assignment_id: ID of the role assignment to get.
Returns:
The requested role assignment.
"""
return self._get_resource(
resource_id=team_role_assignment_id,
route=TEAM_ROLE_ASSIGNMENTS,
response_model=TeamRoleAssignmentResponseModel,
)
get_user(self, user_name_or_id=None, include_private=False)
Gets a specific user, when no id is specified the active user is returned.
The include_private
parameter is ignored here as it is handled
implicitly by the /current-user endpoint that is queried when no
user_name_or_id is set. Raises a KeyError in case a user with that id
does not exist.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_name_or_id |
Union[str, uuid.UUID] |
The name or ID of the user to get. |
None |
include_private |
bool |
Whether to include private user information |
False |
Returns:
Type | Description |
---|---|
UserResponseModel |
The requested user, if it was found. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_user(
self,
user_name_or_id: Optional[Union[str, UUID]] = None,
include_private: bool = False,
) -> UserResponseModel:
"""Gets a specific user, when no id is specified the active user is returned.
The `include_private` parameter is ignored here as it is handled
implicitly by the /current-user endpoint that is queried when no
user_name_or_id is set. Raises a KeyError in case a user with that id
does not exist.
Args:
user_name_or_id: The name or ID of the user to get.
include_private: Whether to include private user information
Returns:
The requested user, if it was found.
"""
if user_name_or_id:
return self._get_resource(
resource_id=user_name_or_id,
route=USERS,
response_model=UserResponseModel,
)
else:
body = self.get(CURRENT_USER)
return UserResponseModel.parse_obj(body)
get_user_role_assignment(self, user_role_assignment_id)
Get an existing role assignment by name or ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_role_assignment_id |
UUID |
Name or ID of the role assignment to get. |
required |
Returns:
Type | Description |
---|---|
UserRoleAssignmentResponseModel |
The requested workspace. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_user_role_assignment(
self, user_role_assignment_id: UUID
) -> UserRoleAssignmentResponseModel:
"""Get an existing role assignment by name or ID.
Args:
user_role_assignment_id: Name or ID of the role assignment to get.
Returns:
The requested workspace.
"""
return self._get_resource(
resource_id=user_role_assignment_id,
route=USER_ROLE_ASSIGNMENTS,
response_model=UserRoleAssignmentResponseModel,
)
get_workspace(self, workspace_name_or_id)
Get an existing workspace by name or ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workspace_name_or_id |
Union[uuid.UUID, str] |
Name or ID of the workspace to get. |
required |
Returns:
Type | Description |
---|---|
WorkspaceResponseModel |
The requested workspace. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_workspace(
self, workspace_name_or_id: Union[UUID, str]
) -> WorkspaceResponseModel:
"""Get an existing workspace by name or ID.
Args:
workspace_name_or_id: Name or ID of the workspace to get.
Returns:
The requested workspace.
"""
return self._get_resource(
resource_id=workspace_name_or_id,
route=WORKSPACES,
response_model=WorkspaceResponseModel,
)
list_artifacts(self, artifact_filter_model)
List all artifacts matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifact_filter_model |
ArtifactFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[ArtifactResponseModel] |
A list of all artifacts matching the filter criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_artifacts(
self, artifact_filter_model: ArtifactFilterModel
) -> Page[ArtifactResponseModel]:
"""List all artifacts matching the given filter criteria.
Args:
artifact_filter_model: All filter parameters including pagination
params.
Returns:
A list of all artifacts matching the filter criteria.
"""
return self._list_paginated_resources(
route=ARTIFACTS,
response_model=ArtifactResponseModel,
filter_model=artifact_filter_model,
)
list_builds(self, build_filter_model)
List all builds matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
build_filter_model |
PipelineBuildFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[PipelineBuildResponseModel] |
A page of all builds matching the filter criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_builds(
self, build_filter_model: PipelineBuildFilterModel
) -> Page[PipelineBuildResponseModel]:
"""List all builds matching the given filter criteria.
Args:
build_filter_model: All filter parameters including pagination
params.
Returns:
A page of all builds matching the filter criteria.
"""
return self._list_paginated_resources(
route=PIPELINE_BUILDS,
response_model=PipelineBuildResponseModel,
filter_model=build_filter_model,
)
list_code_repositories(self, filter_model)
List all code repositories.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filter_model |
CodeRepositoryFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[CodeRepositoryResponseModel] |
A page of all code repositories. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_code_repositories(
self, filter_model: CodeRepositoryFilterModel
) -> Page[CodeRepositoryResponseModel]:
"""List all code repositories.
Args:
filter_model: All filter parameters including pagination
params.
Returns:
A page of all code repositories.
"""
return self._list_paginated_resources(
route=CODE_REPOSITORIES,
response_model=CodeRepositoryResponseModel,
filter_model=filter_model,
)
list_deployments(self, deployment_filter_model)
List all deployments matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment_filter_model |
PipelineDeploymentFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[PipelineDeploymentResponseModel] |
A page of all deployments matching the filter criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_deployments(
self, deployment_filter_model: PipelineDeploymentFilterModel
) -> Page[PipelineDeploymentResponseModel]:
"""List all deployments matching the given filter criteria.
Args:
deployment_filter_model: All filter parameters including pagination
params.
Returns:
A page of all deployments matching the filter criteria.
"""
return self._list_paginated_resources(
route=PIPELINE_DEPLOYMENTS,
response_model=PipelineDeploymentResponseModel,
filter_model=deployment_filter_model,
)
list_flavors(self, flavor_filter_model)
List all stack component flavors matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flavor_filter_model |
FlavorFilterModel |
All filter parameters including pagination params |
required |
Returns:
Type | Description |
---|---|
Page[FlavorResponseModel] |
List of all the stack component flavors matching the given criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_flavors(
self, flavor_filter_model: FlavorFilterModel
) -> Page[FlavorResponseModel]:
"""List all stack component flavors matching the given filter criteria.
Args:
flavor_filter_model: All filter parameters including pagination
params
Returns:
List of all the stack component flavors matching the given criteria.
"""
return self._list_paginated_resources(
route=FLAVORS,
response_model=FlavorResponseModel,
filter_model=flavor_filter_model,
)
list_pipelines(self, pipeline_filter_model)
List all pipelines matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_filter_model |
PipelineFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[PipelineResponseModel] |
A list of all pipelines matching the filter criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_pipelines(
self, pipeline_filter_model: PipelineFilterModel
) -> Page[PipelineResponseModel]:
"""List all pipelines matching the given filter criteria.
Args:
pipeline_filter_model: All filter parameters including pagination
params.
Returns:
A list of all pipelines matching the filter criteria.
"""
return self._list_paginated_resources(
route=PIPELINES,
response_model=PipelineResponseModel,
filter_model=pipeline_filter_model,
)
list_roles(self, role_filter_model)
List all roles matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role_filter_model |
RoleFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[RoleResponseModel] |
A list of all roles matching the filter criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_roles(
self, role_filter_model: RoleFilterModel
) -> Page[RoleResponseModel]:
"""List all roles matching the given filter criteria.
Args:
role_filter_model: All filter parameters including pagination
params.
Returns:
A list of all roles matching the filter criteria.
"""
return self._list_paginated_resources(
route=ROLES,
response_model=RoleResponseModel,
filter_model=role_filter_model,
)
list_run_metadata(self, run_metadata_filter_model)
List run metadata.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_metadata_filter_model |
RunMetadataFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[RunMetadataResponseModel] |
The run metadata. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_run_metadata(
self,
run_metadata_filter_model: RunMetadataFilterModel,
) -> Page[RunMetadataResponseModel]:
"""List run metadata.
Args:
run_metadata_filter_model: All filter parameters including
pagination params.
Returns:
The run metadata.
"""
return self._list_paginated_resources(
route=RUN_METADATA,
response_model=RunMetadataResponseModel,
filter_model=run_metadata_filter_model,
)
list_run_steps(self, step_run_filter_model)
List all step runs matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_run_filter_model |
StepRunFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[StepRunResponseModel] |
A list of all step runs matching the filter criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_run_steps(
self, step_run_filter_model: StepRunFilterModel
) -> Page[StepRunResponseModel]:
"""List all step runs matching the given filter criteria.
Args:
step_run_filter_model: All filter parameters including pagination
params.
Returns:
A list of all step runs matching the filter criteria.
"""
return self._list_paginated_resources(
route=STEPS,
response_model=StepRunResponseModel,
filter_model=step_run_filter_model,
)
list_runs(self, runs_filter_model)
List all pipeline runs matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
runs_filter_model |
PipelineRunFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[PipelineRunResponseModel] |
A list of all pipeline runs matching the filter criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_runs(
self, runs_filter_model: PipelineRunFilterModel
) -> Page[PipelineRunResponseModel]:
"""List all pipeline runs matching the given filter criteria.
Args:
runs_filter_model: All filter parameters including pagination
params.
Returns:
A list of all pipeline runs matching the filter criteria.
"""
return self._list_paginated_resources(
route=RUNS,
response_model=PipelineRunResponseModel,
filter_model=runs_filter_model,
)
list_schedules(self, schedule_filter_model)
List all schedules in the workspace.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
schedule_filter_model |
ScheduleFilterModel |
All filter parameters including pagination params |
required |
Returns:
Type | Description |
---|---|
Page[ScheduleResponseModel] |
A list of schedules. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_schedules(
self, schedule_filter_model: ScheduleFilterModel
) -> Page[ScheduleResponseModel]:
"""List all schedules in the workspace.
Args:
schedule_filter_model: All filter parameters including pagination
params
Returns:
A list of schedules.
"""
return self._list_paginated_resources(
route=SCHEDULES,
response_model=ScheduleResponseModel,
filter_model=schedule_filter_model,
)
list_service_connector_resources(self, user_name_or_id, workspace_name_or_id, connector_type=None, resource_type=None, resource_id=None)
List resources that can be accessed by service connectors.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_name_or_id |
Union[str, uuid.UUID] |
The name or ID of the user to scope to. |
required |
workspace_name_or_id |
Union[str, uuid.UUID] |
The name or ID of the workspace to scope to. |
required |
connector_type |
Optional[str] |
The type of service connector to scope to. |
None |
resource_type |
Optional[str] |
The type of resource to scope to. |
None |
resource_id |
Optional[str] |
The ID of the resource to scope to. |
None |
Returns:
Type | Description |
---|---|
List[zenml.models.service_connector_models.ServiceConnectorResourcesModel] |
The matching list of resources that available service connectors have access to. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_service_connector_resources(
self,
user_name_or_id: Union[str, UUID],
workspace_name_or_id: Union[str, UUID],
connector_type: Optional[str] = None,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
) -> List[ServiceConnectorResourcesModel]:
"""List resources that can be accessed by service connectors.
Args:
user_name_or_id: The name or ID of the user to scope to.
workspace_name_or_id: The name or ID of the workspace to scope to.
connector_type: The type of service connector to scope to.
resource_type: The type of resource to scope to.
resource_id: The ID of the resource to scope to.
Returns:
The matching list of resources that available service
connectors have access to.
"""
params = {}
if connector_type:
params["connector_type"] = connector_type
if resource_type:
params["resource_type"] = resource_type
if resource_id:
params["resource_id"] = resource_id
response_body = self.get(
f"{WORKSPACES}/{workspace_name_or_id}{SERVICE_CONNECTORS}{SERVICE_CONNECTOR_RESOURCES}",
params=params,
)
assert isinstance(response_body, list)
resource_list = [
ServiceConnectorResourcesModel.parse_obj(item)
for item in response_body
]
self._populate_connector_type(*resource_list)
# For service connectors with types that are only locally available,
# we need to retrieve the resource list locally
for idx, resources in enumerate(resource_list):
if isinstance(resources.connector_type, str):
# Skip connector types that are neither locally nor remotely
# available
continue
if resources.connector_type.remote:
# Skip connector types that are remotely available
continue
# Retrieve the resource list locally
assert resources.id is not None
connector = self.get_service_connector(resources.id)
connector_instance = (
service_connector_registry.instantiate_connector(
model=connector
)
)
try:
local_resources = connector_instance.verify(
resource_type=resource_type,
resource_id=resource_id,
)
except (ValueError, AuthorizationException) as e:
logger.error(
f'Failed to fetch {resource_type or "available"} '
f"resources from service connector {connector.name}/"
f"{connector.id}: {e}"
)
continue
resource_list[idx] = local_resources
return resource_list
list_service_connector_types(self, connector_type=None, resource_type=None, auth_method=None)
Get a list of service connector types.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
connector_type |
Optional[str] |
Filter by connector type. |
None |
resource_type |
Optional[str] |
Filter by resource type. |
None |
auth_method |
Optional[str] |
Filter by authentication method. |
None |
Returns:
Type | Description |
---|---|
List[zenml.models.service_connector_models.ServiceConnectorTypeModel] |
List of service connector types. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_service_connector_types(
self,
connector_type: Optional[str] = None,
resource_type: Optional[str] = None,
auth_method: Optional[str] = None,
) -> List[ServiceConnectorTypeModel]:
"""Get a list of service connector types.
Args:
connector_type: Filter by connector type.
resource_type: Filter by resource type.
auth_method: Filter by authentication method.
Returns:
List of service connector types.
"""
params = {}
if connector_type:
params["connector_type"] = connector_type
if resource_type:
params["resource_type"] = resource_type
if auth_method:
params["auth_method"] = auth_method
response_body = self.get(
SERVICE_CONNECTOR_TYPES,
params=params,
)
assert isinstance(response_body, list)
remote_connector_types = [
ServiceConnectorTypeModel.parse_obj(item) for item in response_body
]
# Mark the remote connector types as being only remotely available
for c in remote_connector_types:
c.local = False
c.remote = True
local_connector_types = (
service_connector_registry.list_service_connector_types(
connector_type=connector_type,
resource_type=resource_type,
auth_method=auth_method,
)
)
# Add the connector types in the local registry to the list of
# connector types available remotely. Overwrite those that have
# the same connector type but mark them as being remotely available.
connector_types_map = {
connector_type.connector_type: connector_type
for connector_type in remote_connector_types
}
for connector in local_connector_types:
if connector.connector_type in connector_types_map:
connector.remote = True
connector_types_map[connector.connector_type] = connector
return list(connector_types_map.values())
list_service_connectors(self, filter_model)
List all service connectors.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filter_model |
ServiceConnectorFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[ServiceConnectorResponseModel] |
A page of all service connectors. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_service_connectors(
self, filter_model: ServiceConnectorFilterModel
) -> Page[ServiceConnectorResponseModel]:
"""List all service connectors.
Args:
filter_model: All filter parameters including pagination
params.
Returns:
A page of all service connectors.
"""
connector_models = self._list_paginated_resources(
route=SERVICE_CONNECTORS,
response_model=ServiceConnectorResponseModel,
filter_model=filter_model,
params={"expand_secrets": False},
)
self._populate_connector_type(*connector_models.items)
return connector_models
list_stack_components(self, component_filter_model)
List all stack components matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_filter_model |
ComponentFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[ComponentResponseModel] |
A list of all stack components matching the filter criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_stack_components(
self, component_filter_model: ComponentFilterModel
) -> Page[ComponentResponseModel]:
"""List all stack components matching the given filter criteria.
Args:
component_filter_model: All filter parameters including pagination
params.
Returns:
A list of all stack components matching the filter criteria.
"""
return self._list_paginated_resources(
route=STACK_COMPONENTS,
response_model=ComponentResponseModel,
filter_model=component_filter_model,
)
list_stacks(self, stack_filter_model)
List all stacks matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stack_filter_model |
StackFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[StackResponseModel] |
A list of all stacks matching the filter criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_stacks(
self, stack_filter_model: StackFilterModel
) -> Page[StackResponseModel]:
"""List all stacks matching the given filter criteria.
Args:
stack_filter_model: All filter parameters including pagination
params.
Returns:
A list of all stacks matching the filter criteria.
"""
return self._list_paginated_resources(
route=STACKS,
response_model=StackResponseModel,
filter_model=stack_filter_model,
)
list_team_role_assignments(self, team_role_assignment_filter_model)
List all roles assignments matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team_role_assignment_filter_model |
TeamRoleAssignmentFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[TeamRoleAssignmentResponseModel] |
A list of all roles assignments matching the filter criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_team_role_assignments(
self, team_role_assignment_filter_model: TeamRoleAssignmentFilterModel
) -> Page[TeamRoleAssignmentResponseModel]:
"""List all roles assignments matching the given filter criteria.
Args:
team_role_assignment_filter_model: All filter parameters including
pagination params.
Returns:
A list of all roles assignments matching the filter criteria.
"""
return self._list_paginated_resources(
route=TEAM_ROLE_ASSIGNMENTS,
response_model=TeamRoleAssignmentResponseModel,
filter_model=team_role_assignment_filter_model,
)
list_teams(self, team_filter_model)
List all teams matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team_filter_model |
TeamFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[TeamResponseModel] |
A list of all teams matching the filter criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_teams(
self, team_filter_model: TeamFilterModel
) -> Page[TeamResponseModel]:
"""List all teams matching the given filter criteria.
Args:
team_filter_model: All filter parameters including pagination
params.
Returns:
A list of all teams matching the filter criteria.
"""
return self._list_paginated_resources(
route=TEAMS,
response_model=TeamResponseModel,
filter_model=team_filter_model,
)
list_user_role_assignments(self, user_role_assignment_filter_model)
List all roles assignments matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_role_assignment_filter_model |
UserRoleAssignmentFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[UserRoleAssignmentResponseModel] |
A list of all roles assignments matching the filter criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_user_role_assignments(
self, user_role_assignment_filter_model: UserRoleAssignmentFilterModel
) -> Page[UserRoleAssignmentResponseModel]:
"""List all roles assignments matching the given filter criteria.
Args:
user_role_assignment_filter_model: All filter parameters including
pagination params.
Returns:
A list of all roles assignments matching the filter criteria.
"""
return self._list_paginated_resources(
route=USER_ROLE_ASSIGNMENTS,
response_model=UserRoleAssignmentResponseModel,
filter_model=user_role_assignment_filter_model,
)
list_users(self, user_filter_model)
List all users.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_filter_model |
UserFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[UserResponseModel] |
A list of all users. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_users(
self, user_filter_model: UserFilterModel
) -> Page[UserResponseModel]:
"""List all users.
Args:
user_filter_model: All filter parameters including pagination
params.
Returns:
A list of all users.
"""
return self._list_paginated_resources(
route=USERS,
response_model=UserResponseModel,
filter_model=user_filter_model,
)
list_workspaces(self, workspace_filter_model)
List all workspace matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workspace_filter_model |
WorkspaceFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[WorkspaceResponseModel] |
A list of all workspace matching the filter criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_workspaces(
self, workspace_filter_model: WorkspaceFilterModel
) -> Page[WorkspaceResponseModel]:
"""List all workspace matching the given filter criteria.
Args:
workspace_filter_model: All filter parameters including pagination
params.
Returns:
A list of all workspace matching the filter criteria.
"""
return self._list_paginated_resources(
route=WORKSPACES,
response_model=WorkspaceResponseModel,
filter_model=workspace_filter_model,
)
post(self, path, body, params=None, **kwargs)
Make a POST request to the given endpoint path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path to the endpoint. |
required |
body |
BaseModel |
The body to send. |
required |
params |
Optional[Dict[str, Any]] |
The query parameters to pass to the endpoint. |
None |
kwargs |
Any |
Additional keyword arguments to pass to the request. |
{} |
Returns:
Type | Description |
---|---|
Union[Dict[str, Any], List[Any], str, int, float, bool] |
The response body. |
Source code in zenml/zen_stores/rest_zen_store.py
def post(
self,
path: str,
body: BaseModel,
params: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Json:
"""Make a POST request to the given endpoint path.
Args:
path: The path to the endpoint.
body: The body to send.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending POST request to {path}...")
return self._request(
"POST",
self.url + API + VERSION_1 + path,
data=body.json(),
params=params,
**kwargs,
)
put(self, path, body=None, params=None, **kwargs)
Make a PUT request to the given endpoint path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path to the endpoint. |
required |
body |
Optional[pydantic.main.BaseModel] |
The body to send. |
None |
params |
Optional[Dict[str, Any]] |
The query parameters to pass to the endpoint. |
None |
kwargs |
Any |
Additional keyword arguments to pass to the request. |
{} |
Returns:
Type | Description |
---|---|
Union[Dict[str, Any], List[Any], str, int, float, bool] |
The response body. |
Source code in zenml/zen_stores/rest_zen_store.py
def put(
self,
path: str,
body: Optional[BaseModel] = None,
params: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Json:
"""Make a PUT request to the given endpoint path.
Args:
path: The path to the endpoint.
body: The body to send.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending PUT request to {path}...")
data = body.json(exclude_unset=True) if body else None
return self._request(
"PUT",
self.url + API + VERSION_1 + path,
data=data,
params=params,
**kwargs,
)
update_code_repository(self, code_repository_id, update)
Updates an existing code repository.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
code_repository_id |
UUID |
The ID of the code repository to update. |
required |
update |
CodeRepositoryUpdateModel |
The update to be applied to the code repository. |
required |
Returns:
Type | Description |
---|---|
CodeRepositoryResponseModel |
The updated code repository. |
Source code in zenml/zen_stores/rest_zen_store.py
def update_code_repository(
self, code_repository_id: UUID, update: CodeRepositoryUpdateModel
) -> CodeRepositoryResponseModel:
"""Updates an existing code repository.
Args:
code_repository_id: The ID of the code repository to update.
update: The update to be applied to the code repository.
Returns:
The updated code repository.
"""
return self._update_resource(
resource_id=code_repository_id,
resource_update=update,
response_model=CodeRepositoryResponseModel,
route=CODE_REPOSITORIES,
)
update_flavor(self, flavor_id, flavor_update)
Updates an existing user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flavor_id |
UUID |
The id of the flavor to update. |
required |
flavor_update |
FlavorUpdateModel |
The update to be applied to the flavor. |
required |
Returns:
Type | Description |
---|---|
FlavorResponseModel |
The updated flavor. |
Source code in zenml/zen_stores/rest_zen_store.py
def update_flavor(
self, flavor_id: UUID, flavor_update: FlavorUpdateModel
) -> FlavorResponseModel:
"""Updates an existing user.
Args:
flavor_id: The id of the flavor to update.
flavor_update: The update to be applied to the flavor.
Returns:
The updated flavor.
"""
return self._update_resource(
resource_id=flavor_id,
resource_update=flavor_update,
route=FLAVORS,
response_model=FlavorResponseModel,
)
update_pipeline(self, pipeline_id, pipeline_update)
Updates a pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_id |
UUID |
The ID of the pipeline to be updated. |
required |
pipeline_update |
PipelineUpdateModel |
The update to be applied. |
required |
Returns:
Type | Description |
---|---|
PipelineResponseModel |
The updated pipeline. |
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.UPDATE_PIPELINE)
def update_pipeline(
self, pipeline_id: UUID, pipeline_update: PipelineUpdateModel
) -> PipelineResponseModel:
"""Updates a pipeline.
Args:
pipeline_id: The ID of the pipeline to be updated.
pipeline_update: The update to be applied.
Returns:
The updated pipeline.
"""
return self._update_resource(
resource_id=pipeline_id,
resource_update=pipeline_update,
route=PIPELINES,
response_model=PipelineResponseModel,
)
update_role(self, role_id, role_update)
Update an existing role.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role_id |
UUID |
The ID of the role to be updated. |
required |
role_update |
RoleUpdateModel |
The update to be applied to the role. |
required |
Returns:
Type | Description |
---|---|
RoleResponseModel |
The updated role. |
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.UPDATED_ROLE)
def update_role(
self, role_id: UUID, role_update: RoleUpdateModel
) -> RoleResponseModel:
"""Update an existing role.
Args:
role_id: The ID of the role to be updated.
role_update: The update to be applied to the role.
Returns:
The updated role.
"""
return self._update_resource(
resource_id=role_id,
resource_update=role_update,
route=ROLES,
response_model=RoleResponseModel,
)
update_run(self, run_id, run_update)
Updates a pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id |
UUID |
The ID of the pipeline run to update. |
required |
run_update |
PipelineRunUpdateModel |
The update to be applied to the pipeline run. |
required |
Returns:
Type | Description |
---|---|
PipelineRunResponseModel |
The updated pipeline run. |
Source code in zenml/zen_stores/rest_zen_store.py
def update_run(
self, run_id: UUID, run_update: PipelineRunUpdateModel
) -> PipelineRunResponseModel:
"""Updates a pipeline run.
Args:
run_id: The ID of the pipeline run to update.
run_update: The update to be applied to the pipeline run.
Returns:
The updated pipeline run.
"""
return self._update_resource(
resource_id=run_id,
resource_update=run_update,
response_model=PipelineRunResponseModel,
route=RUNS,
)
update_run_step(self, step_run_id, step_run_update)
Updates a step run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_run_id |
UUID |
The ID of the step to update. |
required |
step_run_update |
StepRunUpdateModel |
The update to be applied to the step. |
required |
Returns:
Type | Description |
---|---|
StepRunResponseModel |
The updated step run. |
Source code in zenml/zen_stores/rest_zen_store.py
def update_run_step(
self,
step_run_id: UUID,
step_run_update: StepRunUpdateModel,
) -> StepRunResponseModel:
"""Updates a step run.
Args:
step_run_id: The ID of the step to update.
step_run_update: The update to be applied to the step.
Returns:
The updated step run.
"""
return self._update_resource(
resource_id=step_run_id,
resource_update=step_run_update,
response_model=StepRunResponseModel,
route=STEPS,
)
update_schedule(self, schedule_id, schedule_update)
Updates a schedule.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
schedule_id |
UUID |
The ID of the schedule to be updated. |
required |
schedule_update |
ScheduleUpdateModel |
The update to be applied. |
required |
Returns:
Type | Description |
---|---|
ScheduleResponseModel |
The updated schedule. |
Source code in zenml/zen_stores/rest_zen_store.py
def update_schedule(
self,
schedule_id: UUID,
schedule_update: ScheduleUpdateModel,
) -> ScheduleResponseModel:
"""Updates a schedule.
Args:
schedule_id: The ID of the schedule to be updated.
schedule_update: The update to be applied.
Returns:
The updated schedule.
"""
return self._update_resource(
resource_id=schedule_id,
resource_update=schedule_update,
route=SCHEDULES,
response_model=ScheduleResponseModel,
)
update_service_connector(self, service_connector_id, update)
Updates an existing service connector.
The update model contains the fields to be updated. If a field value is set to None in the model, the field is not updated, but there are special rules concerning some fields:
- the
configuration
andsecrets
fields together represent a full valid configuration update, not just a partial update. If either is set (i.e. not None) in the update, their values are merged together and will replace the existing configuration and secrets values. - the
resource_id
field value is also a full replacement value: if set toNone
, the resource ID is removed from the service connector. - the
expiration_seconds
field value is also a full replacement value: if set toNone
, the expiration is removed from the service connector. - the
secret_id
field value in the update is ignored, given that secrets are managed internally by the ZenML store. - the
labels
field is also a full labels update: if set (i.e. notNone
), all existing labels are removed and replaced by the new labels in the update.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service_connector_id |
UUID |
The ID of the service connector to update. |
required |
update |
ServiceConnectorUpdateModel |
The update to be applied to the service connector. |
required |
Returns:
Type | Description |
---|---|
ServiceConnectorResponseModel |
The updated service connector. |
Source code in zenml/zen_stores/rest_zen_store.py
def update_service_connector(
self, service_connector_id: UUID, update: ServiceConnectorUpdateModel
) -> ServiceConnectorResponseModel:
"""Updates an existing service connector.
The update model contains the fields to be updated. If a field value is
set to None in the model, the field is not updated, but there are
special rules concerning some fields:
* the `configuration` and `secrets` fields together represent a full
valid configuration update, not just a partial update. If either is
set (i.e. not None) in the update, their values are merged together and
will replace the existing configuration and secrets values.
* the `resource_id` field value is also a full replacement value: if set
to `None`, the resource ID is removed from the service connector.
* the `expiration_seconds` field value is also a full replacement value:
if set to `None`, the expiration is removed from the service connector.
* the `secret_id` field value in the update is ignored, given that
secrets are managed internally by the ZenML store.
* the `labels` field is also a full labels update: if set (i.e. not
`None`), all existing labels are removed and replaced by the new labels
in the update.
Args:
service_connector_id: The ID of the service connector to update.
update: The update to be applied to the service connector.
Returns:
The updated service connector.
"""
connector_model = self._update_resource(
resource_id=service_connector_id,
resource_update=update,
response_model=ServiceConnectorResponseModel,
route=SERVICE_CONNECTORS,
)
self._populate_connector_type(connector_model)
return connector_model
update_stack(self, stack_id, stack_update)
Update a stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stack_id |
UUID |
The ID of the stack update. |
required |
stack_update |
StackUpdateModel |
The update request on the stack. |
required |
Returns:
Type | Description |
---|---|
StackResponseModel |
The updated stack. |
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.UPDATED_STACK)
def update_stack(
self, stack_id: UUID, stack_update: StackUpdateModel
) -> StackResponseModel:
"""Update a stack.
Args:
stack_id: The ID of the stack update.
stack_update: The update request on the stack.
Returns:
The updated stack.
"""
return self._update_resource(
resource_id=stack_id,
resource_update=stack_update,
route=STACKS,
response_model=StackResponseModel,
)
update_stack_component(self, component_id, component_update)
Update an existing stack component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_id |
UUID |
The ID of the stack component to update. |
required |
component_update |
ComponentUpdateModel |
The update to be applied to the stack component. |
required |
Returns:
Type | Description |
---|---|
ComponentResponseModel |
The updated stack component. |
Source code in zenml/zen_stores/rest_zen_store.py
@track(AnalyticsEvent.UPDATED_STACK_COMPONENT)
def update_stack_component(
self,
component_id: UUID,
component_update: ComponentUpdateModel,
) -> ComponentResponseModel:
"""Update an existing stack component.
Args:
component_id: The ID of the stack component to update.
component_update: The update to be applied to the stack component.
Returns:
The updated stack component.
"""
return self._update_resource(
resource_id=component_id,
resource_update=component_update,
route=STACK_COMPONENTS,
response_model=ComponentResponseModel,
)
update_team(self, team_id, team_update)
Upda