Models
zenml.models
special
Pydantic models for the various concepts in ZenML.
artifact_models
Models representing artifacts.
ArtifactBaseModel (BaseModel)
pydantic-model
Base model for artifacts.
Source code in zenml/models/artifact_models.py
class ArtifactBaseModel(BaseModel):
"""Base model for artifacts."""
name: str = Field(
title="Name of the output in the parent step.",
max_length=STR_FIELD_MAX_LENGTH,
)
artifact_store_id: Optional[UUID]
type: ArtifactType
uri: str = Field(
title="URI of the artifact.", max_length=STR_FIELD_MAX_LENGTH
)
materializer: Source = Field(
title="Materializer class to use for this artifact.",
)
data_type: Source = Field(
title="Data type of the artifact.",
)
visualizations: Optional[List[VisualizationModel]] = Field(
default=None, title="Visualizations of the artifact."
)
_convert_source = convert_source_validator("materializer", "data_type")
ArtifactFilterModel (WorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced filtering of all Artifacts.
Source code in zenml/models/artifact_models.py
class ArtifactFilterModel(WorkspaceScopedFilterModel):
"""Model to enable advanced filtering of all Artifacts."""
# `only_unused` refers to a property of the artifacts relationship
# rather than a field in the db, hence it needs to be handled
# explicitly
FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*WorkspaceScopedFilterModel.FILTER_EXCLUDE_FIELDS,
"only_unused",
]
name: Optional[str] = Field(
default=None,
description="Name of the artifact",
)
uri: Optional[str] = Field(
default=None,
description="Uri of the artifact",
)
materializer: Optional[str] = Field(
default=None,
description="Materializer used to produce the artifact",
)
type: Optional[str] = Field(
default=None,
description="Type of the artifact",
)
data_type: Optional[str] = Field(
default=None,
description="Datatype of the artifact",
)
artifact_store_id: Optional[Union[UUID, str]] = Field(
default=None, description="Artifact store for this artifact"
)
workspace_id: Optional[Union[UUID, str]] = Field(
default=None, description="Workspace for this artifact"
)
user_id: Optional[Union[UUID, str]] = Field(
default=None, description="User that produced this artifact"
)
only_unused: Optional[bool] = Field(
default=False, description="Filter only for unused artifacts"
)
artifact_store_id: Union[uuid.UUID, str]
pydantic-field
Artifact store for this artifact
data_type: str
pydantic-field
Datatype of the artifact
materializer: str
pydantic-field
Materializer used to produce the artifact
name: str
pydantic-field
Name of the artifact
only_unused: bool
pydantic-field
Filter only for unused artifacts
type: str
pydantic-field
Type of the artifact
uri: str
pydantic-field
Uri of the artifact
user_id: Union[uuid.UUID, str]
pydantic-field
User that produced this artifact
workspace_id: Union[uuid.UUID, str]
pydantic-field
Workspace for this artifact
ArtifactRequestModel (ArtifactBaseModel, WorkspaceScopedRequestModel)
pydantic-model
Request model for artifacts.
Source code in zenml/models/artifact_models.py
class ArtifactRequestModel(ArtifactBaseModel, WorkspaceScopedRequestModel):
"""Request model for artifacts."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
ArtifactResponseModel (ArtifactBaseModel, WorkspaceScopedResponseModel)
pydantic-model
Response model for artifacts.
Source code in zenml/models/artifact_models.py
class ArtifactResponseModel(ArtifactBaseModel, WorkspaceScopedResponseModel):
"""Response model for artifacts."""
producer_step_run_id: Optional[UUID]
metadata: Dict[str, "RunMetadataResponseModel"] = Field(
default={}, title="Metadata of the artifact."
)
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
base_models
Base domain model definitions.
BaseRequestModel (BaseZenModel)
pydantic-model
Base request model.
Used as a base class for all request models.
Source code in zenml/models/base_models.py
class BaseRequestModel(BaseZenModel):
"""Base request model.
Used as a base class for all request models.
"""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
BaseResponseModel (BaseZenModel)
pydantic-model
Base domain model.
Used as a base class for all domain models that have the following common characteristics:
- are uniquely identified by a UUID
- have a creation timestamp and a last modified timestamp
Source code in zenml/models/base_models.py
class BaseResponseModel(BaseZenModel):
"""Base domain model.
Used as a base class for all domain models that have the following common
characteristics:
* are uniquely identified by a UUID
* have a creation timestamp and a last modified timestamp
"""
id: UUID = Field(title="The unique resource id.")
created: datetime = Field(title="Time when this resource was created.")
updated: datetime = Field(
title="Time when this resource was last updated."
)
def __hash__(self) -> int:
"""Implementation of hash magic method.
Returns:
Hash of the UUID.
"""
return hash((type(self),) + tuple([self.id]))
def __eq__(self, other: Any) -> bool:
"""Implementation of equality magic method.
Args:
other: The other object to compare to.
Returns:
True if the other object is of the same type and has the same UUID.
"""
if isinstance(other, BaseResponseModel):
return self.id == other.id
else:
return False
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for base response models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["entity_id"] = self.id
return metadata
__eq__(self, other)
special
Implementation of equality magic method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
Any |
The other object to compare to. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the other object is of the same type and has the same UUID. |
Source code in zenml/models/base_models.py
def __eq__(self, other: Any) -> bool:
"""Implementation of equality magic method.
Args:
other: The other object to compare to.
Returns:
True if the other object is of the same type and has the same UUID.
"""
if isinstance(other, BaseResponseModel):
return self.id == other.id
else:
return False
__hash__(self)
special
Implementation of hash magic method.
Returns:
Type | Description |
---|---|
int |
Hash of the UUID. |
Source code in zenml/models/base_models.py
def __hash__(self) -> int:
"""Implementation of hash magic method.
Returns:
Hash of the UUID.
"""
return hash((type(self),) + tuple([self.id]))
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
get_analytics_metadata(self)
Fetches the analytics metadata for base response models.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The analytics metadata. |
Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for base response models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["entity_id"] = self.id
return metadata
BaseZenModel (AnalyticsTrackedModelMixin)
pydantic-model
Base model class for all ZenML models.
This class is used as a base class for all ZenML models. It provides functionality for tracking analytics events and proper encoding of SecretStr values.
Source code in zenml/models/base_models.py
class BaseZenModel(AnalyticsTrackedModelMixin):
"""Base model class for all ZenML models.
This class is used as a base class for all ZenML models. It provides
functionality for tracking analytics events and proper encoding of
SecretStr values.
"""
class Config:
"""Pydantic configuration class."""
# This is needed to allow the REST client and server to unpack SecretStr
# values correctly.
json_encoders = {
SecretStr: lambda v: v.get_secret_value()
if v is not None
else None
}
# Allow extras on all models to support forwards and backwards
# compatibility (e.g. new fields in newer versions of ZenML servers
# are allowed to be present in older versions of ZenML clients and
# vice versa).
extra = "allow"
Config
Pydantic configuration class.
Source code in zenml/models/base_models.py
class Config:
"""Pydantic configuration class."""
# This is needed to allow the REST client and server to unpack SecretStr
# values correctly.
json_encoders = {
SecretStr: lambda v: v.get_secret_value()
if v is not None
else None
}
# Allow extras on all models to support forwards and backwards
# compatibility (e.g. new fields in newer versions of ZenML servers
# are allowed to be present in older versions of ZenML clients and
# vice versa).
extra = "allow"
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
ShareableRequestModel (WorkspaceScopedRequestModel)
pydantic-model
Base shareable workspace-scoped domain model.
Used as a base class for all domain models that are workspace-scoped and are shareable.
Source code in zenml/models/base_models.py
class ShareableRequestModel(WorkspaceScopedRequestModel):
"""Base shareable workspace-scoped domain model.
Used as a base class for all domain models that are workspace-scoped and are
shareable.
"""
is_shared: bool = Field(
default=False,
title=(
"Flag describing if this resource is shared with other users in "
"the same workspace."
),
)
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for workspace scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["is_shared"] = self.is_shared
return metadata
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
get_analytics_metadata(self)
Fetches the analytics metadata for workspace scoped models.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The analytics metadata. |
Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for workspace scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["is_shared"] = self.is_shared
return metadata
ShareableResponseModel (WorkspaceScopedResponseModel)
pydantic-model
Base shareable workspace-scoped domain model.
Used as a base class for all domain models that are workspace-scoped and are shareable.
Source code in zenml/models/base_models.py
class ShareableResponseModel(WorkspaceScopedResponseModel):
"""Base shareable workspace-scoped domain model.
Used as a base class for all domain models that are workspace-scoped and are
shareable.
"""
is_shared: bool = Field(
title=(
"Flag describing if this resource is shared with other users in "
"the same workspace."
),
)
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for workspace scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["is_shared"] = self.is_shared
return metadata
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
get_analytics_metadata(self)
Fetches the analytics metadata for workspace scoped models.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The analytics metadata. |
Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for workspace scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["is_shared"] = self.is_shared
return metadata
UserScopedRequestModel (BaseRequestModel)
pydantic-model
Base user-owned request model.
Used as a base class for all domain models that are "owned" by a user.
Source code in zenml/models/base_models.py
class UserScopedRequestModel(BaseRequestModel):
"""Base user-owned request model.
Used as a base class for all domain models that are "owned" by a user.
"""
user: UUID = Field(title="The id of the user that created this resource.")
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for user scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["user_id"] = self.user
return metadata
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
get_analytics_metadata(self)
Fetches the analytics metadata for user scoped models.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The analytics metadata. |
Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for user scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["user_id"] = self.user
return metadata
UserScopedResponseModel (BaseResponseModel)
pydantic-model
Base user-owned domain model.
Used as a base class for all domain models that are "owned" by a user.
Source code in zenml/models/base_models.py
class UserScopedResponseModel(BaseResponseModel):
"""Base user-owned domain model.
Used as a base class for all domain models that are "owned" by a user.
"""
user: Union["UserResponseModel", None] = Field(
title="The user that created this resource.", nullable=True
)
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for user scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
if self.user is not None:
metadata["user_id"] = self.user.id
return metadata
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
get_analytics_metadata(self)
Fetches the analytics metadata for user scoped models.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The analytics metadata. |
Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for user scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
if self.user is not None:
metadata["user_id"] = self.user.id
return metadata
WorkspaceScopedRequestModel (UserScopedRequestModel)
pydantic-model
Base workspace-scoped request domain model.
Used as a base class for all domain models that are workspace-scoped.
Source code in zenml/models/base_models.py
class WorkspaceScopedRequestModel(UserScopedRequestModel):
"""Base workspace-scoped request domain model.
Used as a base class for all domain models that are workspace-scoped.
"""
workspace: UUID = Field(
title="The workspace to which this resource belongs."
)
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for workspace scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["workspace_id"] = self.workspace
return metadata
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
get_analytics_metadata(self)
Fetches the analytics metadata for workspace scoped models.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The analytics metadata. |
Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for workspace scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["workspace_id"] = self.workspace
return metadata
WorkspaceScopedResponseModel (UserScopedResponseModel)
pydantic-model
Base workspace-scoped domain model.
Used as a base class for all domain models that are workspace-scoped.
Source code in zenml/models/base_models.py
class WorkspaceScopedResponseModel(UserScopedResponseModel):
"""Base workspace-scoped domain model.
Used as a base class for all domain models that are workspace-scoped.
"""
workspace: "WorkspaceResponseModel" = Field(
title="The workspace of this resource."
)
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for workspace scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["workspace_id"] = self.workspace.id
return metadata
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
get_analytics_metadata(self)
Fetches the analytics metadata for workspace scoped models.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The analytics metadata. |
Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for workspace scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["workspace_id"] = self.workspace.id
return metadata
update_model(_cls)
Base update model.
This is used as a decorator on top of request models to convert them into update models where the fields are optional and can be set to None.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
_cls |
Type[~T] |
The class to decorate |
required |
Returns:
Type | Description |
---|---|
Type[~T] |
The decorated class. |
Source code in zenml/models/base_models.py
def update_model(_cls: Type[T]) -> Type[T]:
"""Base update model.
This is used as a decorator on top of request models to convert them
into update models where the fields are optional and can be set to None.
Args:
_cls: The class to decorate
Returns:
The decorated class.
"""
for _, value in _cls.__fields__.items():
value.required = False
value.allow_none = True
return _cls
code_repository_models
Models representing code repositories.
CodeReferenceBaseModel (BaseModel)
pydantic-model
Base model for code references.
Source code in zenml/models/code_repository_models.py
class CodeReferenceBaseModel(BaseModel):
"""Base model for code references."""
commit: str = Field(description="The commit of the code reference.")
subdirectory: str = Field(
description="The subdirectory of the code reference."
)
commit: str
pydantic-field
required
The commit of the code reference.
subdirectory: str
pydantic-field
required
The subdirectory of the code reference.
CodeReferenceRequestModel (CodeReferenceBaseModel, BaseRequestModel)
pydantic-model
Code reference request model.
Source code in zenml/models/code_repository_models.py
class CodeReferenceRequestModel(CodeReferenceBaseModel, BaseRequestModel):
"""Code reference request model."""
code_repository: UUID = Field(
description="The repository of the code reference."
)
code_repository: UUID
pydantic-field
required
The repository of the code reference.
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
CodeReferenceResponseModel (CodeReferenceBaseModel, BaseResponseModel)
pydantic-model
Code reference response model.
Source code in zenml/models/code_repository_models.py
class CodeReferenceResponseModel(CodeReferenceBaseModel, BaseResponseModel):
"""Code reference response model."""
code_repository: CodeRepositoryResponseModel = Field(
description="The repository of the code reference."
)
code_repository: CodeRepositoryResponseModel
pydantic-field
required
The repository of the code reference.
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
CodeRepositoryBaseModel (BaseModel)
pydantic-model
Base model for code repositories.
Source code in zenml/models/code_repository_models.py
class CodeRepositoryBaseModel(BaseModel):
"""Base model for code repositories."""
name: str = Field(
title="The name of the code repository.",
max_length=STR_FIELD_MAX_LENGTH,
)
config: Dict[str, Any] = Field(
description="Configuration for the code repository."
)
source: Source = Field(description="The code repository source.")
logo_url: Optional[str] = Field(
description="Optional URL of a logo (png, jpg or svg) for the code repository."
)
description: Optional[str] = Field(
description="Code repository description.",
max_length=TEXT_FIELD_MAX_LENGTH,
)
config: Dict[str, Any]
pydantic-field
required
Configuration for the code repository.
description: ConstrainedStrValue
pydantic-field
Code repository description.
logo_url: str
pydantic-field
Optional URL of a logo (png, jpg or svg) for the code repository.
source: Source
pydantic-field
required
The code repository source.
CodeRepositoryFilterModel (WorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced filtering of all code repositories.
Source code in zenml/models/code_repository_models.py
class CodeRepositoryFilterModel(WorkspaceScopedFilterModel):
"""Model to enable advanced filtering of all code repositories."""
name: Optional[str] = Field(
description="Name of the code repository.",
)
workspace_id: Union[UUID, str, None] = Field(
description="Workspace of the code repository."
)
user_id: Union[UUID, str, None] = Field(
description="User that created the code repository."
)
name: str
pydantic-field
Name of the code repository.
user_id: Union[uuid.UUID, str]
pydantic-field
User that created the code repository.
workspace_id: Union[uuid.UUID, str]
pydantic-field
Workspace of the code repository.
CodeRepositoryRequestModel (CodeRepositoryBaseModel, WorkspaceScopedRequestModel)
pydantic-model
Code repository request model.
Source code in zenml/models/code_repository_models.py
class CodeRepositoryRequestModel(
CodeRepositoryBaseModel, WorkspaceScopedRequestModel
):
"""Code repository request model."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
CodeRepositoryResponseModel (CodeRepositoryBaseModel, WorkspaceScopedResponseModel)
pydantic-model
Code repository response model.
Source code in zenml/models/code_repository_models.py
class CodeRepositoryResponseModel(
CodeRepositoryBaseModel, WorkspaceScopedResponseModel
):
"""Code repository response model."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
CodeRepositoryUpdateModel (CodeRepositoryRequestModel)
pydantic-model
Code repository update model.
Source code in zenml/models/code_repository_models.py
class CodeRepositoryUpdateModel(CodeRepositoryRequestModel):
"""Code repository update model."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
component_models
Models representing stack components.
ComponentBaseModel (BaseModel)
pydantic-model
Base model for stack components.
Source code in zenml/models/component_models.py
class ComponentBaseModel(BaseModel):
"""Base model for stack components."""
name: str = Field(
title="The name of the stack component.",
max_length=STR_FIELD_MAX_LENGTH,
)
type: StackComponentType = Field(
title="The type of the stack component.",
)
flavor: str = Field(
title="The flavor of the stack component.",
max_length=STR_FIELD_MAX_LENGTH,
)
configuration: Dict[str, Any] = Field(
title="The stack component configuration.",
)
labels: Optional[Dict[str, Any]] = Field(
default=None,
title="The stack component labels.",
)
ComponentFilterModel (ShareableWorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced filtering of all ComponentModels.
The Component Model needs additional scoping. As such the _scope_user
field can be set to the user that is doing the filtering. The
generate_filter()
method of the baseclass is overwritten to include the
scoping.
Source code in zenml/models/component_models.py
class ComponentFilterModel(ShareableWorkspaceScopedFilterModel):
"""Model to enable advanced filtering of all ComponentModels.
The Component Model needs additional scoping. As such the `_scope_user`
field can be set to the user that is doing the filtering. The
`generate_filter()` method of the baseclass is overwritten to include the
scoping.
"""
FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*ShareableWorkspaceScopedFilterModel.FILTER_EXCLUDE_FIELDS,
"scope_type",
]
CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*ShareableWorkspaceScopedFilterModel.CLI_EXCLUDE_FIELDS,
"scope_type",
]
scope_type: Optional[str] = Field(
default=None,
description="The type to scope this query to.",
)
is_shared: Optional[Union[bool, str]] = Field(
default=None, description="If the stack is shared or private"
)
name: Optional[str] = Field(
default=None,
description="Name of the stack component",
)
flavor: Optional[str] = Field(
default=None,
description="Flavor of the stack component",
)
type: Optional[str] = Field(
default=None,
description="Type of the stack component",
)
workspace_id: Optional[Union[UUID, str]] = Field(
default=None, description="Workspace of the stack component"
)
user_id: Optional[Union[UUID, str]] = Field(
default=None, description="User of the stack"
)
def set_scope_type(self, component_type: str) -> None:
"""Set the type of component on which to perform the filtering to scope the response.
Args:
component_type: The type of component to scope the query to.
"""
self.scope_type = component_type
def generate_filter(
self, table: Type["SQLModel"]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
"""Generate the filter for the query.
Stack components can be scoped by type to narrow the search.
Args:
table: The Table that is being queried from.
Returns:
The filter expression for the query.
"""
from sqlalchemy import and_
base_filter = super().generate_filter(table)
if self.scope_type:
type_filter = getattr(table, "type") == self.scope_type
return and_(base_filter, type_filter)
return base_filter
flavor: str
pydantic-field
Flavor of the stack component
is_shared: Union[bool, str]
pydantic-field
If the stack is shared or private
name: str
pydantic-field
Name of the stack component
scope_type: str
pydantic-field
The type to scope this query to.
type: str
pydantic-field
Type of the stack component
user_id: Union[uuid.UUID, str]
pydantic-field
User of the stack
workspace_id: Union[uuid.UUID, str]
pydantic-field
Workspace of the stack component
generate_filter(self, table)
Generate the filter for the query.
Stack components can be scoped by type to narrow the search.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
Type[SQLModel] |
The Table that is being queried from. |
required |
Returns:
Type | Description |
---|---|
Union[BinaryExpression[Any], BooleanClauseList[Any]] |
The filter expression for the query. |
Source code in zenml/models/component_models.py
def generate_filter(
self, table: Type["SQLModel"]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
"""Generate the filter for the query.
Stack components can be scoped by type to narrow the search.
Args:
table: The Table that is being queried from.
Returns:
The filter expression for the query.
"""
from sqlalchemy import and_
base_filter = super().generate_filter(table)
if self.scope_type:
type_filter = getattr(table, "type") == self.scope_type
return and_(base_filter, type_filter)
return base_filter
set_scope_type(self, component_type)
Set the type of component on which to perform the filtering to scope the response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_type |
str |
The type of component to scope the query to. |
required |
Source code in zenml/models/component_models.py
def set_scope_type(self, component_type: str) -> None:
"""Set the type of component on which to perform the filtering to scope the response.
Args:
component_type: The type of component to scope the query to.
"""
self.scope_type = component_type
ComponentRequestModel (ComponentBaseModel, ShareableRequestModel)
pydantic-model
Request model for stack components.
Source code in zenml/models/component_models.py
class ComponentRequestModel(ComponentBaseModel, ShareableRequestModel):
"""Request model for stack components."""
ANALYTICS_FIELDS: ClassVar[List[str]] = ["type", "flavor"]
@validator("name")
def name_cant_be_a_secret_reference(cls, name: str) -> str:
"""Validator to ensure that the given name is not a secret reference.
Args:
name: The name to validate.
Returns:
The name if it is not a secret reference.
Raises:
ValueError: If the name is a secret reference.
"""
if secret_utils.is_secret_reference(name):
raise ValueError(
"Passing the `name` attribute of a stack component as a "
"secret reference is not allowed."
)
return name
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
name_cant_be_a_secret_reference(name)
classmethod
Validator to ensure that the given name is not a secret reference.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name to validate. |
required |
Returns:
Type | Description |
---|---|
str |
The name if it is not a secret reference. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the name is a secret reference. |
Source code in zenml/models/component_models.py
@validator("name")
def name_cant_be_a_secret_reference(cls, name: str) -> str:
"""Validator to ensure that the given name is not a secret reference.
Args:
name: The name to validate.
Returns:
The name if it is not a secret reference.
Raises:
ValueError: If the name is a secret reference.
"""
if secret_utils.is_secret_reference(name):
raise ValueError(
"Passing the `name` attribute of a stack component as a "
"secret reference is not allowed."
)
return name
ComponentResponseModel (ComponentBaseModel, ShareableResponseModel)
pydantic-model
Response model for stack components.
Source code in zenml/models/component_models.py
class ComponentResponseModel(ComponentBaseModel, ShareableResponseModel):
"""Response model for stack components."""
ANALYTICS_FIELDS: ClassVar[List[str]] = ["type", "flavor"]
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
ComponentUpdateModel (ComponentRequestModel)
pydantic-model
Update model for stack components.
Source code in zenml/models/component_models.py
class ComponentUpdateModel(ComponentRequestModel):
"""Update model for stack components."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
constants
Constants used by ZenML domain models.
filter_models
Base filter model definitions.
BaseFilterModel (BaseModel)
pydantic-model
Class to unify all filter, paginate and sort request parameters.
This Model allows fine-grained filtering, sorting and pagination of resources.
Usage example for subclasses of this class:
ResourceListModel(
name="contains:default",
workspace="default"
count_steps="gte:5"
sort_by="created",
page=2,
size=50
)
Source code in zenml/models/filter_models.py
class BaseFilterModel(BaseModel):
"""Class to unify all filter, paginate and sort request parameters.
This Model allows fine-grained filtering, sorting and pagination of
resources.
Usage example for subclasses of this class:
```
ResourceListModel(
name="contains:default",
workspace="default"
count_steps="gte:5"
sort_by="created",
page=2,
size=50
)
```
"""
# List of fields that cannot be used as filters.
FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
"sort_by",
"page",
"size",
"logical_operator",
]
# List of fields that are not even mentioned as options in the CLI.
CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = []
sort_by: str = Field(
default="created", description="Which column to sort by."
)
logical_operator: LogicalOperators = Field(
default=LogicalOperators.AND,
description="Which logical operator to use between all filters "
"['and', 'or']",
)
page: int = Field(
default=PAGINATION_STARTING_PAGE, ge=1, description="Page number"
)
size: int = Field(
default=PAGE_SIZE_DEFAULT,
ge=1,
le=PAGE_SIZE_MAXIMUM,
description="Page size",
)
id: Optional[Union[UUID, str]] = Field(
default=None, description="Id for this resource"
)
created: Optional[Union[datetime, str]] = Field(
default=None, description="Created"
)
updated: Optional[Union[datetime, str]] = Field(
default=None, description="Updated"
)
@validator("sort_by", pre=True)
def validate_sort_by(cls, v: str) -> str:
"""Validate that the sort_column is a valid column with a valid operand.
Args:
v: The sort_by field value.
Returns:
The validated sort_by field value.
Raises:
ValidationError: If the sort_by field is not a string.
ValueError: If the resource can't be sorted by this field.
"""
# Somehow pydantic allows you to pass in int values, which will be
# interpreted as string, however within the validator they are still
# integers, which don't have a .split() method
if not isinstance(v, str):
raise ValidationError(
f"str type expected for the sort_by field. "
f"Received a {type(v)}"
)
column = v
split_value = v.split(":", 1)
if len(split_value) == 2:
column = split_value[1]
if split_value[0] not in SorterOps.values():
logger.warning(
"Invalid operand used for column sorting. "
"Only the following operands are supported `%s`. "
"Defaulting to 'asc' on column `%s`.",
SorterOps.values(),
column,
)
v = column
if column in cls.FILTER_EXCLUDE_FIELDS:
raise ValueError(
f"This resource can not be sorted by this field: '{v}'"
)
elif column in cls.__fields__:
return v
else:
raise ValueError(
"You can only sort by valid fields of this resource"
)
@root_validator(pre=True)
def filter_ops(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Parse incoming filters to ensure all filters are legal.
Args:
values: The values of the class.
Returns:
The values of the class.
"""
cls._generate_filter_list(values)
return values
@property
def list_of_filters(self) -> List[Filter]:
"""Converts the class variables into a list of usable Filter Models.
Returns:
A list of Filter models.
"""
return self._generate_filter_list(
{key: getattr(self, key) for key in self.__fields__}
)
@property
def sorting_params(self) -> Tuple[str, SorterOps]:
"""Converts the class variables into a list of usable Filter Models.
Returns:
A tuple of the column to sort by and the sorting operand.
"""
column = self.sort_by
# The default sorting operand is asc
operator = SorterOps.ASCENDING
# Check if user explicitly set an operand
split_value = self.sort_by.split(":", 1)
if len(split_value) == 2:
column = split_value[1]
operator = SorterOps(split_value[0])
return column, operator
@classmethod
def _generate_filter_list(cls, values: Dict[str, Any]) -> List[Filter]:
"""Create a list of filters from a (column, value) dictionary.
Args:
values: A dictionary of column names and values to filter on.
Returns:
A list of filters.
"""
list_of_filters: List[Filter] = []
for key, value in values.items():
# Ignore excluded filters
if key in cls.FILTER_EXCLUDE_FIELDS:
continue
# Skip filtering for None values
if value is None:
continue
# Determine the operator and filter value
value, operator = cls._resolve_operator(value)
# Define the filter
filter = cls._define_filter(
column=key, value=value, operator=operator
)
list_of_filters.append(filter)
return list_of_filters
@staticmethod
def _resolve_operator(value: Any) -> Tuple[Any, GenericFilterOps]:
"""Determine the operator and filter value from a user-provided value.
If the user-provided value is a string of the form "operator:value",
then the operator is extracted and the value is returned. Otherwise,
`GenericFilterOps.EQUALS` is used as default operator and the value
is returned as-is.
Args:
value: The user-provided value.
Returns:
A tuple of the filter value and the operator.
"""
operator = GenericFilterOps.EQUALS # Default operator
if isinstance(value, str):
split_value = value.split(":", 1)
if (
len(split_value) == 2
and split_value[0] in GenericFilterOps.values()
):
value = split_value[1]
operator = GenericFilterOps(split_value[0])
return value, operator
@classmethod
def _define_filter(
cls, column: str, value: Any, operator: GenericFilterOps
) -> Filter:
"""Define a filter for a given column.
Args:
column: The column to filter on.
value: The value by which to filter.
operator: The operator to use for filtering.
Returns:
A Filter object.
"""
# Create datetime filters
if cls.is_datetime_field(column):
return cls._define_datetime_filter(
column=column,
value=value,
operator=operator,
)
# Create UUID filters
if cls.is_uuid_field(column):
return cls._define_uuid_filter(
column=column,
value=value,
operator=operator,
)
# Create int filters
if cls.is_int_field(column):
return NumericFilter(
operation=GenericFilterOps(operator),
column=column,
value=int(value),
)
# Create bool filters
if cls.is_bool_field(column):
return cls._define_bool_filter(
column=column,
value=value,
operator=operator,
)
# Create str filters
if cls.is_str_field(column):
return StrFilter(
operation=GenericFilterOps(operator),
column=column,
value=value,
)
# Handle unsupported datatypes
logger.warning(
f"The Datatype {cls.__fields__[column].type_} might not be "
"supported for filtering. Defaulting to a string filter."
)
return StrFilter(
operation=GenericFilterOps(operator),
column=column,
value=str(value),
)
@classmethod
def is_datetime_field(cls, k: str) -> bool:
"""Checks if it's a datetime field.
Args:
k: The key to check.
Returns:
True if the field is a datetime field, False otherwise.
"""
return (
issubclass(datetime, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is datetime
)
@classmethod
def is_uuid_field(cls, k: str) -> bool:
"""Checks if it's a uuid field.
Args:
k: The key to check.
Returns:
True if the field is a uuid field, False otherwise.
"""
return (
issubclass(UUID, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is UUID
)
@classmethod
def is_int_field(cls, k: str) -> bool:
"""Checks if it's a int field.
Args:
k: The key to check.
Returns:
True if the field is a int field, False otherwise.
"""
return (
issubclass(int, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is int
)
@classmethod
def is_bool_field(cls, k: str) -> bool:
"""Checks if it's a bool field.
Args:
k: The key to check.
Returns:
True if the field is a bool field, False otherwise.
"""
return (
issubclass(bool, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is bool
)
@classmethod
def is_str_field(cls, k: str) -> bool:
"""Checks if it's a string field.
Args:
k: The key to check.
Returns:
True if the field is a string field, False otherwise.
"""
return (
issubclass(str, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is str
)
@classmethod
def is_sort_by_field(cls, k: str) -> bool:
"""Checks if it's a sort by field.
Args:
k: The key to check.
Returns:
True if the field is a sort by field, False otherwise.
"""
return (
issubclass(str, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ == str
) and k == "sort_by"
@staticmethod
def _define_datetime_filter(
column: str, value: Any, operator: GenericFilterOps
) -> NumericFilter:
"""Define a datetime filter for a given column.
Args:
column: The column to filter on.
value: The datetime value by which to filter.
operator: The operator to use for filtering.
Returns:
A Filter object.
Raises:
ValueError: If the value is not a valid datetime.
"""
try:
if isinstance(value, datetime):
datetime_value = value
else:
datetime_value = datetime.strptime(
value, FILTERING_DATETIME_FORMAT
)
except ValueError as e:
raise ValueError(
"The datetime filter only works with values in the following "
f"format: {FILTERING_DATETIME_FORMAT}"
) from e
datetime_filter = NumericFilter(
operation=GenericFilterOps(operator),
column=column,
value=datetime_value,
)
return datetime_filter
@staticmethod
def _define_uuid_filter(
column: str, value: Any, operator: GenericFilterOps
) -> UUIDFilter:
"""Define a UUID filter for a given column.
Args:
column: The column to filter on.
value: The UUID value by which to filter.
operator: The operator to use for filtering.
Returns:
A Filter object.
Raises:
ValueError: If the value is not a valid UUID.
"""
# For equality checks, ensure that the value is a valid UUID.
if operator == GenericFilterOps.EQUALS and not isinstance(value, UUID):
try:
UUID(value)
except ValueError as e:
raise ValueError(
"Invalid value passed as UUID query parameter."
) from e
# Cast the value to string for further comparisons.
value = str(value)
# Generate the filter.
uuid_filter = UUIDFilter(
operation=GenericFilterOps(operator),
column=column,
value=value,
)
return uuid_filter
@staticmethod
def _define_bool_filter(
column: str, value: Any, operator: GenericFilterOps
) -> BoolFilter:
"""Define a bool filter for a given column.
Args:
column: The column to filter on.
value: The bool value by which to filter.
operator: The operator to use for filtering.
Returns:
A Filter object.
"""
if GenericFilterOps(operator) != GenericFilterOps.EQUALS:
logger.warning(
"Boolean filters do not support any"
"operation except for equals. Defaulting"
"to an `equals` comparison."
)
return BoolFilter(
operation=GenericFilterOps.EQUALS,
column=column,
value=bool(value),
)
@property
def offset(self) -> int:
"""Returns the offset needed for the query on the data persistence layer.
Returns:
The offset for the query.
"""
return self.size * (self.page - 1)
def generate_filter(
self, table: Type[SQLModel]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
"""Generate the filter for the query.
Args:
table: The Table that is being queried from.
Returns:
The filter expression for the query.
Raises:
RuntimeError: If a valid logical operator is not supplied.
"""
from sqlalchemy import and_
from sqlmodel import or_
filters = []
for column_filter in self.list_of_filters:
filters.append(
column_filter.generate_query_conditions(table=table)
)
if self.logical_operator == LogicalOperators.OR:
return or_(False, *filters)
elif self.logical_operator == LogicalOperators.AND:
return and_(True, *filters)
else:
raise RuntimeError("No valid logical operator was supplied.")
def apply_filter(
self,
query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
table: Type["AnySchema"],
) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
filters = self.generate_filter(table=table)
if filters is not None:
query = query.where(filters)
return query
created: Union[datetime.datetime, str]
pydantic-field
Created
id: Union[uuid.UUID, str]
pydantic-field
Id for this resource
list_of_filters: List[Filter]
property
readonly
Converts the class variables into a list of usable Filter Models.
Returns:
Type | Description |
---|---|
List[Filter] |
A list of Filter models. |
logical_operator: LogicalOperators
pydantic-field
Which logical operator to use between all filters ['and', 'or']
offset: int
property
readonly
Returns the offset needed for the query on the data persistence layer.
Returns:
Type | Description |
---|---|
int |
The offset for the query. |
page: ConstrainedIntValue
pydantic-field
Page number
size: ConstrainedIntValue
pydantic-field
Page size
sort_by: str
pydantic-field
Which column to sort by.
sorting_params: Tuple[str, SorterOps]
property
readonly
Converts the class variables into a list of usable Filter Models.
Returns:
Type | Description |
---|---|
Tuple[str, SorterOps] |
A tuple of the column to sort by and the sorting operand. |
updated: Union[datetime.datetime, str]
pydantic-field
Updated
apply_filter(self, query, table)
Applies the filter to a query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
Union[('Select[AnySchema]', 'SelectOfScalar[AnySchema]')] |
The query to which to apply the filter. |
required |
table |
Type['AnySchema'] |
The query table. |
required |
Returns:
Type | Description |
---|---|
Union[('Select[AnySchema]', 'SelectOfScalar[AnySchema]')] |
The query with filter applied. |
Source code in zenml/models/filter_models.py
def apply_filter(
self,
query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
table: Type["AnySchema"],
) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
filters = self.generate_filter(table=table)
if filters is not None:
query = query.where(filters)
return query
filter_ops(values)
classmethod
Parse incoming filters to ensure all filters are legal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
values |
Dict[str, Any] |
The values of the class. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The values of the class. |
Source code in zenml/models/filter_models.py
@root_validator(pre=True)
def filter_ops(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Parse incoming filters to ensure all filters are legal.
Args:
values: The values of the class.
Returns:
The values of the class.
"""
cls._generate_filter_list(values)
return values
generate_filter(self, table)
Generate the filter for the query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
Type[SQLModel] |
The Table that is being queried from. |
required |
Returns:
Type | Description |
---|---|
Union[('BinaryExpression[Any]', 'BooleanClauseList[Any]')] |
The filter expression for the query. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If a valid logical operator is not supplied. |
Source code in zenml/models/filter_models.py
def generate_filter(
self, table: Type[SQLModel]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
"""Generate the filter for the query.
Args:
table: The Table that is being queried from.
Returns:
The filter expression for the query.
Raises:
RuntimeError: If a valid logical operator is not supplied.
"""
from sqlalchemy import and_
from sqlmodel import or_
filters = []
for column_filter in self.list_of_filters:
filters.append(
column_filter.generate_query_conditions(table=table)
)
if self.logical_operator == LogicalOperators.OR:
return or_(False, *filters)
elif self.logical_operator == LogicalOperators.AND:
return and_(True, *filters)
else:
raise RuntimeError("No valid logical operator was supplied.")
is_bool_field(k)
classmethod
Checks if it's a bool field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a bool field, False otherwise. |
Source code in zenml/models/filter_models.py
@classmethod
def is_bool_field(cls, k: str) -> bool:
"""Checks if it's a bool field.
Args:
k: The key to check.
Returns:
True if the field is a bool field, False otherwise.
"""
return (
issubclass(bool, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is bool
)
is_datetime_field(k)
classmethod
Checks if it's a datetime field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a datetime field, False otherwise. |
Source code in zenml/models/filter_models.py
@classmethod
def is_datetime_field(cls, k: str) -> bool:
"""Checks if it's a datetime field.
Args:
k: The key to check.
Returns:
True if the field is a datetime field, False otherwise.
"""
return (
issubclass(datetime, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is datetime
)
is_int_field(k)
classmethod
Checks if it's a int field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a int field, False otherwise. |
Source code in zenml/models/filter_models.py
@classmethod
def is_int_field(cls, k: str) -> bool:
"""Checks if it's a int field.
Args:
k: The key to check.
Returns:
True if the field is a int field, False otherwise.
"""
return (
issubclass(int, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is int
)
is_sort_by_field(k)
classmethod
Checks if it's a sort by field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a sort by field, False otherwise. |
Source code in zenml/models/filter_models.py
@classmethod
def is_sort_by_field(cls, k: str) -> bool:
"""Checks if it's a sort by field.
Args:
k: The key to check.
Returns:
True if the field is a sort by field, False otherwise.
"""
return (
issubclass(str, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ == str
) and k == "sort_by"
is_str_field(k)
classmethod
Checks if it's a string field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a string field, False otherwise. |
Source code in zenml/models/filter_models.py
@classmethod
def is_str_field(cls, k: str) -> bool:
"""Checks if it's a string field.
Args:
k: The key to check.
Returns:
True if the field is a string field, False otherwise.
"""
return (
issubclass(str, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is str
)
is_uuid_field(k)
classmethod
Checks if it's a uuid field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a uuid field, False otherwise. |
Source code in zenml/models/filter_models.py
@classmethod
def is_uuid_field(cls, k: str) -> bool:
"""Checks if it's a uuid field.
Args:
k: The key to check.
Returns:
True if the field is a uuid field, False otherwise.
"""
return (
issubclass(UUID, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is UUID
)
validate_sort_by(v)
classmethod
Validate that the sort_column is a valid column with a valid operand.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
v |
str |
The sort_by field value. |
required |
Returns:
Type | Description |
---|---|
str |
The validated sort_by field value. |
Exceptions:
Type | Description |
---|---|
ValidationError |
If the sort_by field is not a string. |
ValueError |
If the resource can't be sorted by this field. |
Source code in zenml/models/filter_models.py
@validator("sort_by", pre=True)
def validate_sort_by(cls, v: str) -> str:
"""Validate that the sort_column is a valid column with a valid operand.
Args:
v: The sort_by field value.
Returns:
The validated sort_by field value.
Raises:
ValidationError: If the sort_by field is not a string.
ValueError: If the resource can't be sorted by this field.
"""
# Somehow pydantic allows you to pass in int values, which will be
# interpreted as string, however within the validator they are still
# integers, which don't have a .split() method
if not isinstance(v, str):
raise ValidationError(
f"str type expected for the sort_by field. "
f"Received a {type(v)}"
)
column = v
split_value = v.split(":", 1)
if len(split_value) == 2:
column = split_value[1]
if split_value[0] not in SorterOps.values():
logger.warning(
"Invalid operand used for column sorting. "
"Only the following operands are supported `%s`. "
"Defaulting to 'asc' on column `%s`.",
SorterOps.values(),
column,
)
v = column
if column in cls.FILTER_EXCLUDE_FIELDS:
raise ValueError(
f"This resource can not be sorted by this field: '{v}'"
)
elif column in cls.__fields__:
return v
else:
raise ValueError(
"You can only sort by valid fields of this resource"
)
BoolFilter (Filter)
pydantic-model
Filter for all Boolean fields.
Source code in zenml/models/filter_models.py
class BoolFilter(Filter):
"""Filter for all Boolean fields."""
ALLOWED_OPS: ClassVar[List[str]] = [GenericFilterOps.EQUALS]
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions for a boolean column.
Args:
column: The boolean column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
return column == self.value
generate_query_conditions_from_column(self, column)
Generate query conditions for a boolean column.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
column |
Any |
The boolean column of an SQLModel table on which to filter. |
required |
Returns:
Type | Description |
---|---|
Any |
A list of query conditions. |
Source code in zenml/models/filter_models.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions for a boolean column.
Args:
column: The boolean column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
return column == self.value
Filter (BaseModel, ABC)
pydantic-model
Filter for all fields.
A Filter is a combination of a column, a value that the user uses to
filter on this column and an operation to use. The easiest example
would be user equals aria
with column=user
, value=aria
and the
operation=equals
.
All subclasses of this class will support different sets of operations. This operation set is defined in the ALLOWED_OPS class variable.
Source code in zenml/models/filter_models.py
class Filter(BaseModel, ABC):
"""Filter for all fields.
A Filter is a combination of a column, a value that the user uses to
filter on this column and an operation to use. The easiest example
would be `user equals aria` with column=`user`, value=`aria` and the
operation=`equals`.
All subclasses of this class will support different sets of operations.
This operation set is defined in the ALLOWED_OPS class variable.
"""
ALLOWED_OPS: ClassVar[List[str]] = []
operation: GenericFilterOps
column: str
value: Any
@validator("operation", pre=True)
def validate_operation(cls, op: str) -> str:
"""Validate that the operation is a valid op for the field type.
Args:
op: The operation of this filter.
Returns:
The operation if it is valid.
Raises:
ValueError: If the operation is not valid for this field type.
"""
if op not in cls.ALLOWED_OPS:
raise ValueError(
f"This datatype can not be filtered using this operation: "
f"'{op}'. The allowed operations are: {cls.ALLOWED_OPS}"
)
else:
return op
def generate_query_conditions(
self,
table: Type[SQLModel],
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
"""Generate the query conditions for the database.
This method converts the Filter class into an appropriate SQLModel
query condition, to be used when filtering on the Database.
Args:
table: The SQLModel table to use for the query creation
Returns:
A list of conditions that will be combined using the `and` operation
"""
column = getattr(table, self.column)
conditions = self.generate_query_conditions_from_column(column)
return conditions # type:ignore[no-any-return]
@abstractmethod
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions given the corresponding database column.
This method should be overridden by subclasses to define how each
supported operation in `self.ALLOWED_OPS` can be used to filter the
given column by `self.value`.
Args:
column: The column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
generate_query_conditions(self, table)
Generate the query conditions for the database.
This method converts the Filter class into an appropriate SQLModel query condition, to be used when filtering on the Database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
Type[SQLModel] |
The SQLModel table to use for the query creation |
required |
Returns:
Type | Description |
---|---|
Union[('BinaryExpression[Any]', 'BooleanClauseList[Any]')] |
A list of conditions that will be combined using the |
Source code in zenml/models/filter_models.py
def generate_query_conditions(
self,
table: Type[SQLModel],
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
"""Generate the query conditions for the database.
This method converts the Filter class into an appropriate SQLModel
query condition, to be used when filtering on the Database.
Args:
table: The SQLModel table to use for the query creation
Returns:
A list of conditions that will be combined using the `and` operation
"""
column = getattr(table, self.column)
conditions = self.generate_query_conditions_from_column(column)
return conditions # type:ignore[no-any-return]
generate_query_conditions_from_column(self, column)
Generate query conditions given the corresponding database column.
This method should be overridden by subclasses to define how each
supported operation in self.ALLOWED_OPS
can be used to filter the
given column by self.value
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
column |
Any |
The column of an SQLModel table on which to filter. |
required |
Returns:
Type | Description |
---|---|
Any |
A list of query conditions. |
Source code in zenml/models/filter_models.py
@abstractmethod
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions given the corresponding database column.
This method should be overridden by subclasses to define how each
supported operation in `self.ALLOWED_OPS` can be used to filter the
given column by `self.value`.
Args:
column: The column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
validate_operation(op)
classmethod
Validate that the operation is a valid op for the field type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
op |
str |
The operation of this filter. |
required |
Returns:
Type | Description |
---|---|
str |
The operation if it is valid. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the operation is not valid for this field type. |
Source code in zenml/models/filter_models.py
@validator("operation", pre=True)
def validate_operation(cls, op: str) -> str:
"""Validate that the operation is a valid op for the field type.
Args:
op: The operation of this filter.
Returns:
The operation if it is valid.
Raises:
ValueError: If the operation is not valid for this field type.
"""
if op not in cls.ALLOWED_OPS:
raise ValueError(
f"This datatype can not be filtered using this operation: "
f"'{op}'. The allowed operations are: {cls.ALLOWED_OPS}"
)
else:
return op
NumericFilter (Filter)
pydantic-model
Filter for all numeric fields.
Source code in zenml/models/filter_models.py
class NumericFilter(Filter):
"""Filter for all numeric fields."""
value: Union[float, datetime]
ALLOWED_OPS: ClassVar[List[str]] = [
GenericFilterOps.EQUALS,
GenericFilterOps.GT,
GenericFilterOps.GTE,
GenericFilterOps.LT,
GenericFilterOps.LTE,
]
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions for a UUID column.
Args:
column: The UUID column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
if self.operation == GenericFilterOps.GTE:
return column >= self.value
if self.operation == GenericFilterOps.GT:
return column > self.value
if self.operation == GenericFilterOps.LTE:
return column <= self.value
if self.operation == GenericFilterOps.LT:
return column < self.value
return column == self.value
generate_query_conditions_from_column(self, column)
Generate query conditions for a UUID column.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
column |
Any |
The UUID column of an SQLModel table on which to filter. |
required |
Returns:
Type | Description |
---|---|
Any |
A list of query conditions. |
Source code in zenml/models/filter_models.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions for a UUID column.
Args:
column: The UUID column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
if self.operation == GenericFilterOps.GTE:
return column >= self.value
if self.operation == GenericFilterOps.GT:
return column > self.value
if self.operation == GenericFilterOps.LTE:
return column <= self.value
if self.operation == GenericFilterOps.LT:
return column < self.value
return column == self.value
ShareableWorkspaceScopedFilterModel (WorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced scoping with workspace and user scoped shareable things.
Source code in zenml/models/filter_models.py
class ShareableWorkspaceScopedFilterModel(WorkspaceScopedFilterModel):
"""Model to enable advanced scoping with workspace and user scoped shareable things."""
FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*WorkspaceScopedFilterModel.FILTER_EXCLUDE_FIELDS,
"scope_user",
]
CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*WorkspaceScopedFilterModel.CLI_EXCLUDE_FIELDS,
"scope_user",
]
scope_user: Optional[UUID] = Field(
default=None,
description="The user to scope this query to.",
)
def set_scope_user(self, user_id: UUID) -> None:
"""Set the user that is performing the filtering to scope the response.
Args:
user_id: The user ID to scope the response to.
"""
self.scope_user = user_id
def apply_filter(
self,
query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
table: Type["AnySchema"],
) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
from sqlmodel import or_
query = super().apply_filter(query=query, table=table)
if self.scope_user:
scope_filter = or_(
getattr(table, "user_id") == self.scope_user,
getattr(table, "is_shared").is_(True),
)
query = query.where(scope_filter)
return query
scope_user: UUID
pydantic-field
The user to scope this query to.
apply_filter(self, query, table)
Applies the filter to a query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
Union[('Select[AnySchema]', 'SelectOfScalar[AnySchema]')] |
The query to which to apply the filter. |
required |
table |
Type['AnySchema'] |
The query table. |
required |
Returns:
Type | Description |
---|---|
Union[('Select[AnySchema]', 'SelectOfScalar[AnySchema]')] |
The query with filter applied. |
Source code in zenml/models/filter_models.py
def apply_filter(
self,
query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
table: Type["AnySchema"],
) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
from sqlmodel import or_
query = super().apply_filter(query=query, table=table)
if self.scope_user:
scope_filter = or_(
getattr(table, "user_id") == self.scope_user,
getattr(table, "is_shared").is_(True),
)
query = query.where(scope_filter)
return query
set_scope_user(self, user_id)
Set the user that is performing the filtering to scope the response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
UUID |
The user ID to scope the response to. |
required |
Source code in zenml/models/filter_models.py
def set_scope_user(self, user_id: UUID) -> None:
"""Set the user that is performing the filtering to scope the response.
Args:
user_id: The user ID to scope the response to.
"""
self.scope_user = user_id
StrFilter (Filter)
pydantic-model
Filter for all string fields.
Source code in zenml/models/filter_models.py
class StrFilter(Filter):
"""Filter for all string fields."""
ALLOWED_OPS: ClassVar[List[str]] = [
GenericFilterOps.EQUALS,
GenericFilterOps.STARTSWITH,
GenericFilterOps.CONTAINS,
GenericFilterOps.ENDSWITH,
]
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions for a string column.
Args:
column: The string column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
if self.operation == GenericFilterOps.CONTAINS:
return column.like(f"%{self.value}%")
if self.operation == GenericFilterOps.STARTSWITH:
return column.startswith(f"{self.value}")
if self.operation == GenericFilterOps.ENDSWITH:
return column.endswith(f"{self.value}")
return column == self.value
generate_query_conditions_from_column(self, column)
Generate query conditions for a string column.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
column |
Any |
The string column of an SQLModel table on which to filter. |
required |
Returns:
Type | Description |
---|---|
Any |
A list of query conditions. |
Source code in zenml/models/filter_models.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions for a string column.
Args:
column: The string column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
if self.operation == GenericFilterOps.CONTAINS:
return column.like(f"%{self.value}%")
if self.operation == GenericFilterOps.STARTSWITH:
return column.startswith(f"{self.value}")
if self.operation == GenericFilterOps.ENDSWITH:
return column.endswith(f"{self.value}")
return column == self.value
UUIDFilter (StrFilter)
pydantic-model
Filter for all uuid fields which are mostly treated like strings.
Source code in zenml/models/filter_models.py
class UUIDFilter(StrFilter):
"""Filter for all uuid fields which are mostly treated like strings."""
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions for a UUID column.
Args:
column: The UUID column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
import sqlalchemy
from sqlalchemy_utils.functions import cast_if
# For equality checks, compare the UUID directly
if self.operation == GenericFilterOps.EQUALS:
return column == self.value
# For all other operations, cast and handle the column as string
return super().generate_query_conditions_from_column(
column=cast_if(column, sqlalchemy.String)
)
generate_query_conditions_from_column(self, column)
Generate query conditions for a UUID column.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
column |
Any |
The UUID column of an SQLModel table on which to filter. |
required |
Returns:
Type | Description |
---|---|
Any |
A list of query conditions. |
Source code in zenml/models/filter_models.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions for a UUID column.
Args:
column: The UUID column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
import sqlalchemy
from sqlalchemy_utils.functions import cast_if
# For equality checks, compare the UUID directly
if self.operation == GenericFilterOps.EQUALS:
return column == self.value
# For all other operations, cast and handle the column as string
return super().generate_query_conditions_from_column(
column=cast_if(column, sqlalchemy.String)
)
WorkspaceScopedFilterModel (BaseFilterModel)
pydantic-model
Model to enable advanced scoping with workspace.
Source code in zenml/models/filter_models.py
class WorkspaceScopedFilterModel(BaseFilterModel):
"""Model to enable advanced scoping with workspace."""
FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*BaseFilterModel.FILTER_EXCLUDE_FIELDS,
"scope_workspace",
]
CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*BaseFilterModel.CLI_EXCLUDE_FIELDS,
"scope_workspace",
]
scope_workspace: Optional[UUID] = Field(
default=None,
description="The workspace to scope this query to.",
)
def set_scope_workspace(self, workspace_id: UUID) -> None:
"""Set the workspace to scope this response.
Args:
workspace_id: The workspace to scope this response to.
"""
self.scope_workspace = workspace_id
def apply_filter(
self,
query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
table: Type["AnySchema"],
) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
from sqlmodel import or_
query = super().apply_filter(query=query, table=table)
if self.scope_workspace:
scope_filter = or_(
getattr(table, "workspace_id") == self.scope_workspace,
getattr(table, "workspace_id").is_(None),
)
query = query.where(scope_filter)
return query
scope_workspace: UUID
pydantic-field
The workspace to scope this query to.
apply_filter(self, query, table)
Applies the filter to a query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
Union[('Select[AnySchema]', 'SelectOfScalar[AnySchema]')] |
The query to which to apply the filter. |
required |
table |
Type['AnySchema'] |
The query table. |
required |
Returns:
Type | Description |
---|---|
Union[('Select[AnySchema]', 'SelectOfScalar[AnySchema]')] |
The query with filter applied. |
Source code in zenml/models/filter_models.py
def apply_filter(
self,
query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
table: Type["AnySchema"],
) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
from sqlmodel import or_
query = super().apply_filter(query=query, table=table)
if self.scope_workspace:
scope_filter = or_(
getattr(table, "workspace_id") == self.scope_workspace,
getattr(table, "workspace_id").is_(None),
)
query = query.where(scope_filter)
return query
set_scope_workspace(self, workspace_id)
Set the workspace to scope this response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workspace_id |
UUID |
The workspace to scope this response to. |
required |
Source code in zenml/models/filter_models.py
def set_scope_workspace(self, workspace_id: UUID) -> None:
"""Set the workspace to scope this response.
Args:
workspace_id: The workspace to scope this response to.
"""
self.scope_workspace = workspace_id
flavor_models
Models representing stack component flavors.
FlavorBaseModel (BaseModel)
pydantic-model
Base model for stack component flavors.
Source code in zenml/models/flavor_models.py
class FlavorBaseModel(BaseModel):
"""Base model for stack component flavors."""
name: str = Field(
title="The name of the Flavor.",
max_length=STR_FIELD_MAX_LENGTH,
)
type: StackComponentType = Field(title="The type of the Flavor.")
config_schema: Dict[str, Any] = Field(
title="The JSON schema of this flavor's corresponding configuration.",
)
source: str = Field(
title="The path to the module which contains this Flavor.",
max_length=STR_FIELD_MAX_LENGTH,
)
integration: Optional[str] = Field(
title="The name of the integration that the Flavor belongs to.",
max_length=STR_FIELD_MAX_LENGTH,
)
logo_url: Optional[str] = Field(
default=None,
title="Optionally, a url pointing to a png,"
"svg or jpg can be attached.",
)
docs_url: Optional[str] = Field(
default=None,
title="Optionally, a url pointing to docs, within docs.zenml.io.",
)
sdk_docs_url: Optional[str] = Field(
default=None,
title="Optionally, a url pointing to SDK docs,"
"within apidocs.zenml.io.",
)
is_custom: bool = Field(
title="Whether or not this flavor is a custom, user created flavor.",
default=True,
)
FlavorFilterModel (WorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced filtering of all Flavors.
Source code in zenml/models/flavor_models.py
class FlavorFilterModel(WorkspaceScopedFilterModel):
"""Model to enable advanced filtering of all Flavors."""
name: Optional[str] = Field(
default=None,
description="Name of the flavor",
)
type: Optional[str] = Field(
default=None,
description="Stack Component Type of the stack flavor",
)
integration: Optional[str] = Field(
default=None,
description="Integration associated with the flavor",
)
workspace_id: Optional[Union[UUID, str]] = Field(
default=None, description="Workspace of the stack"
)
user_id: Optional[Union[UUID, str]] = Field(
default=None, description="User of the stack"
)
integration: str
pydantic-field
Integration associated with the flavor
name: str
pydantic-field
Name of the flavor
type: str
pydantic-field
Stack Component Type of the stack flavor
user_id: Union[uuid.UUID, str]
pydantic-field
User of the stack
workspace_id: Union[uuid.UUID, str]
pydantic-field
Workspace of the stack
FlavorRequestModel (FlavorBaseModel, BaseRequestModel)
pydantic-model
Request model for stack component flavors.
Source code in zenml/models/flavor_models.py
class FlavorRequestModel(FlavorBaseModel, BaseRequestModel):
"""Request model for stack component flavors."""
ANALYTICS_FIELDS: ClassVar[List[str]] = [
"type",
"integration",
]
user: Optional[UUID] = Field(
default=None, title="The id of the user that created this resource."
)
workspace: Optional[UUID] = Field(
default=None, title="The workspace to which this resource belongs."
)
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
FlavorResponseModel (FlavorBaseModel, BaseResponseModel)
pydantic-model
Response model for stack component flavors.
Source code in zenml/models/flavor_models.py
class FlavorResponseModel(FlavorBaseModel, BaseResponseModel):
"""Response model for stack component flavors."""
ANALYTICS_FIELDS: ClassVar[List[str]] = [
"id",
"type",
"integration",
]
user: Union["UserResponseModel", None] = Field(
title="The user that created this resource.", nullable=True
)
workspace: Optional["WorkspaceResponseModel"] = Field(
title="The project of this resource."
)
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
FlavorUpdateModel (FlavorRequestModel)
pydantic-model
Update model for flavors.
Source code in zenml/models/flavor_models.py
class FlavorUpdateModel(FlavorRequestModel):
"""Update model for flavors."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
hub_plugin_models
Models representing ZenML Hub plugins.
HubPluginBaseModel (BaseModel)
pydantic-model
Base model for a ZenML Hub plugin.
Source code in zenml/models/hub_plugin_models.py
class HubPluginBaseModel(BaseModel):
"""Base model for a ZenML Hub plugin."""
name: str
description: Optional[str]
version: Optional[str]
release_notes: Optional[str]
repository_url: str
repository_subdirectory: Optional[str]
repository_branch: Optional[str]
repository_commit: Optional[str]
tags: Optional[List[str]]
logo_url: Optional[str]
HubPluginRequestModel (HubPluginBaseModel)
pydantic-model
Request model for a ZenML Hub plugin.
Source code in zenml/models/hub_plugin_models.py
class HubPluginRequestModel(HubPluginBaseModel):
"""Request model for a ZenML Hub plugin."""
HubPluginResponseModel (HubPluginBaseModel)
pydantic-model
Response model for a ZenML Hub plugin.
Source code in zenml/models/hub_plugin_models.py
class HubPluginResponseModel(HubPluginBaseModel):
"""Response model for a ZenML Hub plugin."""
id: UUID
status: PluginStatus
author: str
version: str
index_url: Optional[str]
package_name: Optional[str]
requirements: Optional[List[str]]
build_logs: Optional[str]
created: datetime
updated: datetime
HubUserResponseModel (BaseModel)
pydantic-model
Model for a ZenML Hub user.
Source code in zenml/models/hub_plugin_models.py
class HubUserResponseModel(BaseModel):
"""Model for a ZenML Hub user."""
id: UUID
email: str
username: Optional[str]
PluginStatus (StrEnum)
Enum that represents the status of a plugin.
- PENDING: Plugin is being built
- FAILED: Plugin build failed
- AVAILABLE: Plugin is available for installation
- YANKED: Plugin was yanked and is no longer available
Source code in zenml/models/hub_plugin_models.py
class PluginStatus(StrEnum):
"""Enum that represents the status of a plugin.
- PENDING: Plugin is being built
- FAILED: Plugin build failed
- AVAILABLE: Plugin is available for installation
- YANKED: Plugin was yanked and is no longer available
"""
PENDING = "pending"
FAILED = "failed"
AVAILABLE = "available"
YANKED = "yanked"
page_model
Model implementation for easy pagination for Lists of ZenML Domain Models.
The code contained within this file has been inspired by the fastapi-pagination library: https://github.com/uriyyo/fastapi-pagination
Page (GenericModel, Generic)
pydantic-model
Return Model for List Models to accommodate pagination.
Source code in zenml/models/page_model.py
class Page(GenericModel, Generic[B]):
"""Return Model for List Models to accommodate pagination."""
index: PositiveInt
max_size: PositiveInt
total_pages: NonNegativeInt
total: NonNegativeInt
items: Sequence[B]
__params_type__ = BaseFilterModel
@property
def size(self) -> int:
"""Return the item count of the page.
Returns:
The amount of items in the page.
"""
return len(self.items)
def __len__(self) -> int:
"""Return the item count of the page.
Returns:
The amount of items in the page.
"""
return len(self.items)
def __getitem__(self, index: int) -> B:
"""Return the item at the given index.
Args:
index: The index to get the item from.
Returns:
The item at the given index.
"""
return self.items[index]
def __contains__(self, item: B) -> bool:
"""Returns whether the page contains a specific item.
Args:
item: The item to check for.
Returns:
Whether the item is in the page.
"""
return item in self.items
class Config:
"""Pydantic configuration class."""
# This is needed to allow the REST API server to unpack SecretStr
# values correctly before sending them to the client.
json_encoders = {
SecretStr: lambda v: v.get_secret_value() if v else None
}
size: int
property
readonly
Return the item count of the page.
Returns:
Type | Description |
---|---|
int |
The amount of items in the page. |
Config
Pydantic configuration class.
Source code in zenml/models/page_model.py
class Config:
"""Pydantic configuration class."""
# This is needed to allow the REST API server to unpack SecretStr
# values correctly before sending them to the client.
json_encoders = {
SecretStr: lambda v: v.get_secret_value() if v else None
}
__params_type__ (BaseModel)
pydantic-model
Class to unify all filter, paginate and sort request parameters.
This Model allows fine-grained filtering, sorting and pagination of resources.
Usage example for subclasses of this class:
ResourceListModel(
name="contains:default",
workspace="default"
count_steps="gte:5"
sort_by="created",
page=2,
size=50
)
Source code in zenml/models/page_model.py
class BaseFilterModel(BaseModel):
"""Class to unify all filter, paginate and sort request parameters.
This Model allows fine-grained filtering, sorting and pagination of
resources.
Usage example for subclasses of this class:
```
ResourceListModel(
name="contains:default",
workspace="default"
count_steps="gte:5"
sort_by="created",
page=2,
size=50
)
```
"""
# List of fields that cannot be used as filters.
FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
"sort_by",
"page",
"size",
"logical_operator",
]
# List of fields that are not even mentioned as options in the CLI.
CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = []
sort_by: str = Field(
default="created", description="Which column to sort by."
)
logical_operator: LogicalOperators = Field(
default=LogicalOperators.AND,
description="Which logical operator to use between all filters "
"['and', 'or']",
)
page: int = Field(
default=PAGINATION_STARTING_PAGE, ge=1, description="Page number"
)
size: int = Field(
default=PAGE_SIZE_DEFAULT,
ge=1,
le=PAGE_SIZE_MAXIMUM,
description="Page size",
)
id: Optional[Union[UUID, str]] = Field(
default=None, description="Id for this resource"
)
created: Optional[Union[datetime, str]] = Field(
default=None, description="Created"
)
updated: Optional[Union[datetime, str]] = Field(
default=None, description="Updated"
)
@validator("sort_by", pre=True)
def validate_sort_by(cls, v: str) -> str:
"""Validate that the sort_column is a valid column with a valid operand.
Args:
v: The sort_by field value.
Returns:
The validated sort_by field value.
Raises:
ValidationError: If the sort_by field is not a string.
ValueError: If the resource can't be sorted by this field.
"""
# Somehow pydantic allows you to pass in int values, which will be
# interpreted as string, however within the validator they are still
# integers, which don't have a .split() method
if not isinstance(v, str):
raise ValidationError(
f"str type expected for the sort_by field. "
f"Received a {type(v)}"
)
column = v
split_value = v.split(":", 1)
if len(split_value) == 2:
column = split_value[1]
if split_value[0] not in SorterOps.values():
logger.warning(
"Invalid operand used for column sorting. "
"Only the following operands are supported `%s`. "
"Defaulting to 'asc' on column `%s`.",
SorterOps.values(),
column,
)
v = column
if column in cls.FILTER_EXCLUDE_FIELDS:
raise ValueError(
f"This resource can not be sorted by this field: '{v}'"
)
elif column in cls.__fields__:
return v
else:
raise ValueError(
"You can only sort by valid fields of this resource"
)
@root_validator(pre=True)
def filter_ops(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Parse incoming filters to ensure all filters are legal.
Args:
values: The values of the class.
Returns:
The values of the class.
"""
cls._generate_filter_list(values)
return values
@property
def list_of_filters(self) -> List[Filter]:
"""Converts the class variables into a list of usable Filter Models.
Returns:
A list of Filter models.
"""
return self._generate_filter_list(
{key: getattr(self, key) for key in self.__fields__}
)
@property
def sorting_params(self) -> Tuple[str, SorterOps]:
"""Converts the class variables into a list of usable Filter Models.
Returns:
A tuple of the column to sort by and the sorting operand.
"""
column = self.sort_by
# The default sorting operand is asc
operator = SorterOps.ASCENDING
# Check if user explicitly set an operand
split_value = self.sort_by.split(":", 1)
if len(split_value) == 2:
column = split_value[1]
operator = SorterOps(split_value[0])
return column, operator
@classmethod
def _generate_filter_list(cls, values: Dict[str, Any]) -> List[Filter]:
"""Create a list of filters from a (column, value) dictionary.
Args:
values: A dictionary of column names and values to filter on.
Returns:
A list of filters.
"""
list_of_filters: List[Filter] = []
for key, value in values.items():
# Ignore excluded filters
if key in cls.FILTER_EXCLUDE_FIELDS:
continue
# Skip filtering for None values
if value is None:
continue
# Determine the operator and filter value
value, operator = cls._resolve_operator(value)
# Define the filter
filter = cls._define_filter(
column=key, value=value, operator=operator
)
list_of_filters.append(filter)
return list_of_filters
@staticmethod
def _resolve_operator(value: Any) -> Tuple[Any, GenericFilterOps]:
"""Determine the operator and filter value from a user-provided value.
If the user-provided value is a string of the form "operator:value",
then the operator is extracted and the value is returned. Otherwise,
`GenericFilterOps.EQUALS` is used as default operator and the value
is returned as-is.
Args:
value: The user-provided value.
Returns:
A tuple of the filter value and the operator.
"""
operator = GenericFilterOps.EQUALS # Default operator
if isinstance(value, str):
split_value = value.split(":", 1)
if (
len(split_value) == 2
and split_value[0] in GenericFilterOps.values()
):
value = split_value[1]
operator = GenericFilterOps(split_value[0])
return value, operator
@classmethod
def _define_filter(
cls, column: str, value: Any, operator: GenericFilterOps
) -> Filter:
"""Define a filter for a given column.
Args:
column: The column to filter on.
value: The value by which to filter.
operator: The operator to use for filtering.
Returns:
A Filter object.
"""
# Create datetime filters
if cls.is_datetime_field(column):
return cls._define_datetime_filter(
column=column,
value=value,
operator=operator,
)
# Create UUID filters
if cls.is_uuid_field(column):
return cls._define_uuid_filter(
column=column,
value=value,
operator=operator,
)
# Create int filters
if cls.is_int_field(column):
return NumericFilter(
operation=GenericFilterOps(operator),
column=column,
value=int(value),
)
# Create bool filters
if cls.is_bool_field(column):
return cls._define_bool_filter(
column=column,
value=value,
operator=operator,
)
# Create str filters
if cls.is_str_field(column):
return StrFilter(
operation=GenericFilterOps(operator),
column=column,
value=value,
)
# Handle unsupported datatypes
logger.warning(
f"The Datatype {cls.__fields__[column].type_} might not be "
"supported for filtering. Defaulting to a string filter."
)
return StrFilter(
operation=GenericFilterOps(operator),
column=column,
value=str(value),
)
@classmethod
def is_datetime_field(cls, k: str) -> bool:
"""Checks if it's a datetime field.
Args:
k: The key to check.
Returns:
True if the field is a datetime field, False otherwise.
"""
return (
issubclass(datetime, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is datetime
)
@classmethod
def is_uuid_field(cls, k: str) -> bool:
"""Checks if it's a uuid field.
Args:
k: The key to check.
Returns:
True if the field is a uuid field, False otherwise.
"""
return (
issubclass(UUID, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is UUID
)
@classmethod
def is_int_field(cls, k: str) -> bool:
"""Checks if it's a int field.
Args:
k: The key to check.
Returns:
True if the field is a int field, False otherwise.
"""
return (
issubclass(int, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is int
)
@classmethod
def is_bool_field(cls, k: str) -> bool:
"""Checks if it's a bool field.
Args:
k: The key to check.
Returns:
True if the field is a bool field, False otherwise.
"""
return (
issubclass(bool, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is bool
)
@classmethod
def is_str_field(cls, k: str) -> bool:
"""Checks if it's a string field.
Args:
k: The key to check.
Returns:
True if the field is a string field, False otherwise.
"""
return (
issubclass(str, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is str
)
@classmethod
def is_sort_by_field(cls, k: str) -> bool:
"""Checks if it's a sort by field.
Args:
k: The key to check.
Returns:
True if the field is a sort by field, False otherwise.
"""
return (
issubclass(str, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ == str
) and k == "sort_by"
@staticmethod
def _define_datetime_filter(
column: str, value: Any, operator: GenericFilterOps
) -> NumericFilter:
"""Define a datetime filter for a given column.
Args:
column: The column to filter on.
value: The datetime value by which to filter.
operator: The operator to use for filtering.
Returns:
A Filter object.
Raises:
ValueError: If the value is not a valid datetime.
"""
try:
if isinstance(value, datetime):
datetime_value = value
else:
datetime_value = datetime.strptime(
value, FILTERING_DATETIME_FORMAT
)
except ValueError as e:
raise ValueError(
"The datetime filter only works with values in the following "
f"format: {FILTERING_DATETIME_FORMAT}"
) from e
datetime_filter = NumericFilter(
operation=GenericFilterOps(operator),
column=column,
value=datetime_value,
)
return datetime_filter
@staticmethod
def _define_uuid_filter(
column: str, value: Any, operator: GenericFilterOps
) -> UUIDFilter:
"""Define a UUID filter for a given column.
Args:
column: The column to filter on.
value: The UUID value by which to filter.
operator: The operator to use for filtering.
Returns:
A Filter object.
Raises:
ValueError: If the value is not a valid UUID.
"""
# For equality checks, ensure that the value is a valid UUID.
if operator == GenericFilterOps.EQUALS and not isinstance(value, UUID):
try:
UUID(value)
except ValueError as e:
raise ValueError(
"Invalid value passed as UUID query parameter."
) from e
# Cast the value to string for further comparisons.
value = str(value)
# Generate the filter.
uuid_filter = UUIDFilter(
operation=GenericFilterOps(operator),
column=column,
value=value,
)
return uuid_filter
@staticmethod
def _define_bool_filter(
column: str, value: Any, operator: GenericFilterOps
) -> BoolFilter:
"""Define a bool filter for a given column.
Args:
column: The column to filter on.
value: The bool value by which to filter.
operator: The operator to use for filtering.
Returns:
A Filter object.
"""
if GenericFilterOps(operator) != GenericFilterOps.EQUALS:
logger.warning(
"Boolean filters do not support any"
"operation except for equals. Defaulting"
"to an `equals` comparison."
)
return BoolFilter(
operation=GenericFilterOps.EQUALS,
column=column,
value=bool(value),
)
@property
def offset(self) -> int:
"""Returns the offset needed for the query on the data persistence layer.
Returns:
The offset for the query.
"""
return self.size * (self.page - 1)
def generate_filter(
self, table: Type[SQLModel]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
"""Generate the filter for the query.
Args:
table: The Table that is being queried from.
Returns:
The filter expression for the query.
Raises:
RuntimeError: If a valid logical operator is not supplied.
"""
from sqlalchemy import and_
from sqlmodel import or_
filters = []
for column_filter in self.list_of_filters:
filters.append(
column_filter.generate_query_conditions(table=table)
)
if self.logical_operator == LogicalOperators.OR:
return or_(False, *filters)
elif self.logical_operator == LogicalOperators.AND:
return and_(True, *filters)
else:
raise RuntimeError("No valid logical operator was supplied.")
def apply_filter(
self,
query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
table: Type["AnySchema"],
) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
filters = self.generate_filter(table=table)
if filters is not None:
query = query.where(filters)
return query
created: Union[datetime.datetime, str]
pydantic-field
Created
id: Union[uuid.UUID, str]
pydantic-field
Id for this resource
list_of_filters: List[Filter]
property
readonly
Converts the class variables into a list of usable Filter Models.
Returns:
Type | Description |
---|---|
List[Filter] |
A list of Filter models. |
logical_operator: LogicalOperators
pydantic-field
Which logical operator to use between all filters ['and', 'or']
offset: int
property
readonly
Returns the offset needed for the query on the data persistence layer.
Returns:
Type | Description |
---|---|
int |
The offset for the query. |
page: ConstrainedIntValue
pydantic-field
Page number
size: ConstrainedIntValue
pydantic-field
Page size
sort_by: str
pydantic-field
Which column to sort by.
sorting_params: Tuple[str, SorterOps]
property
readonly
Converts the class variables into a list of usable Filter Models.
Returns:
Type | Description |
---|---|
Tuple[str, SorterOps] |
A tuple of the column to sort by and the sorting operand. |
updated: Union[datetime.datetime, str]
pydantic-field
Updated
apply_filter(self, query, table)
Applies the filter to a query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
Union[('Select[AnySchema]', 'SelectOfScalar[AnySchema]')] |
The query to which to apply the filter. |
required |
table |
Type['AnySchema'] |
The query table. |
required |
Returns:
Type | Description |
---|---|
Union[('Select[AnySchema]', 'SelectOfScalar[AnySchema]')] |
The query with filter applied. |
Source code in zenml/models/page_model.py
def apply_filter(
self,
query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
table: Type["AnySchema"],
) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
filters = self.generate_filter(table=table)
if filters is not None:
query = query.where(filters)
return query
filter_ops(values)
classmethod
Parse incoming filters to ensure all filters are legal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
values |
Dict[str, Any] |
The values of the class. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The values of the class. |
Source code in zenml/models/page_model.py
@root_validator(pre=True)
def filter_ops(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Parse incoming filters to ensure all filters are legal.
Args:
values: The values of the class.
Returns:
The values of the class.
"""
cls._generate_filter_list(values)
return values
generate_filter(self, table)
Generate the filter for the query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
Type[SQLModel] |
The Table that is being queried from. |
required |
Returns:
Type | Description |
---|---|
Union[('BinaryExpression[Any]', 'BooleanClauseList[Any]')] |
The filter expression for the query. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If a valid logical operator is not supplied. |
Source code in zenml/models/page_model.py
def generate_filter(
self, table: Type[SQLModel]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
"""Generate the filter for the query.
Args:
table: The Table that is being queried from.
Returns:
The filter expression for the query.
Raises:
RuntimeError: If a valid logical operator is not supplied.
"""
from sqlalchemy import and_
from sqlmodel import or_
filters = []
for column_filter in self.list_of_filters:
filters.append(
column_filter.generate_query_conditions(table=table)
)
if self.logical_operator == LogicalOperators.OR:
return or_(False, *filters)
elif self.logical_operator == LogicalOperators.AND:
return and_(True, *filters)
else:
raise RuntimeError("No valid logical operator was supplied.")
is_bool_field(k)
classmethod
Checks if it's a bool field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a bool field, False otherwise. |
Source code in zenml/models/page_model.py
@classmethod
def is_bool_field(cls, k: str) -> bool:
"""Checks if it's a bool field.
Args:
k: The key to check.
Returns:
True if the field is a bool field, False otherwise.
"""
return (
issubclass(bool, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is bool
)
is_datetime_field(k)
classmethod
Checks if it's a datetime field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a datetime field, False otherwise. |
Source code in zenml/models/page_model.py
@classmethod
def is_datetime_field(cls, k: str) -> bool:
"""Checks if it's a datetime field.
Args:
k: The key to check.
Returns:
True if the field is a datetime field, False otherwise.
"""
return (
issubclass(datetime, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is datetime
)
is_int_field(k)
classmethod
Checks if it's a int field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a int field, False otherwise. |
Source code in zenml/models/page_model.py
@classmethod
def is_int_field(cls, k: str) -> bool:
"""Checks if it's a int field.
Args:
k: The key to check.
Returns:
True if the field is a int field, False otherwise.
"""
return (
issubclass(int, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is int
)
is_sort_by_field(k)
classmethod
Checks if it's a sort by field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a sort by field, False otherwise. |
Source code in zenml/models/page_model.py
@classmethod
def is_sort_by_field(cls, k: str) -> bool:
"""Checks if it's a sort by field.
Args:
k: The key to check.
Returns:
True if the field is a sort by field, False otherwise.
"""
return (
issubclass(str, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ == str
) and k == "sort_by"
is_str_field(k)
classmethod
Checks if it's a string field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a string field, False otherwise. |
Source code in zenml/models/page_model.py
@classmethod
def is_str_field(cls, k: str) -> bool:
"""Checks if it's a string field.
Args:
k: The key to check.
Returns:
True if the field is a string field, False otherwise.
"""
return (
issubclass(str, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is str
)
is_uuid_field(k)
classmethod
Checks if it's a uuid field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a uuid field, False otherwise. |
Source code in zenml/models/page_model.py
@classmethod
def is_uuid_field(cls, k: str) -> bool:
"""Checks if it's a uuid field.
Args:
k: The key to check.
Returns:
True if the field is a uuid field, False otherwise.
"""
return (
issubclass(UUID, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is UUID
)
validate_sort_by(v)
classmethod
Validate that the sort_column is a valid column with a valid operand.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
v |
str |
The sort_by field value. |
required |
Returns:
Type | Description |
---|---|
str |
The validated sort_by field value. |
Exceptions:
Type | Description |
---|---|
ValidationError |
If the sort_by field is not a string. |
ValueError |
If the resource can't be sorted by this field. |
Source code in zenml/models/page_model.py
@validator("sort_by", pre=True)
def validate_sort_by(cls, v: str) -> str:
"""Validate that the sort_column is a valid column with a valid operand.
Args:
v: The sort_by field value.
Returns:
The validated sort_by field value.
Raises:
ValidationError: If the sort_by field is not a string.
ValueError: If the resource can't be sorted by this field.
"""
# Somehow pydantic allows you to pass in int values, which will be
# interpreted as string, however within the validator they are still
# integers, which don't have a .split() method
if not isinstance(v, str):
raise ValidationError(
f"str type expected for the sort_by field. "
f"Received a {type(v)}"
)
column = v
split_value = v.split(":", 1)
if len(split_value) == 2:
column = split_value[1]
if split_value[0] not in SorterOps.values():
logger.warning(
"Invalid operand used for column sorting. "
"Only the following operands are supported `%s`. "
"Defaulting to 'asc' on column `%s`.",
SorterOps.values(),
column,
)
v = column
if column in cls.FILTER_EXCLUDE_FIELDS:
raise ValueError(
f"This resource can not be sorted by this field: '{v}'"
)
elif column in cls.__fields__:
return v
else:
raise ValueError(
"You can only sort by valid fields of this resource"
)
__contains__(self, item)
special
Returns whether the page contains a specific item.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item |
B |
The item to check for. |
required |
Returns:
Type | Description |
---|---|
bool |
Whether the item is in the page. |
Source code in zenml/models/page_model.py
def __contains__(self, item: B) -> bool:
"""Returns whether the page contains a specific item.
Args:
item: The item to check for.
Returns:
Whether the item is in the page.
"""
return item in self.items
__getitem__(self, index)
special
Return the item at the given index.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
index |
int |
The index to get the item from. |
required |
Returns:
Type | Description |
---|---|
B |
The item at the given index. |
Source code in zenml/models/page_model.py
def __getitem__(self, index: int) -> B:
"""Return the item at the given index.
Args:
index: The index to get the item from.
Returns:
The item at the given index.
"""
return self.items[index]
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
__len__(self)
special
Return the item count of the page.
Returns:
Type | Description |
---|---|
int |
The amount of items in the page. |
Source code in zenml/models/page_model.py
def __len__(self) -> int:
"""Return the item count of the page.
Returns:
The amount of items in the page.
"""
return len(self.items)
pipeline_build_models
Models representing pipeline builds.
BuildItem (BaseModel)
pydantic-model
Pipeline build item.
Attributes:
Name | Type | Description |
---|---|---|
image |
str |
The image name or digest. |
dockerfile |
Optional[str] |
The contents of the Dockerfile used to build the image. |
requirements |
Optional[str] |
The pip requirements installed in the image. This is a string consisting of multiple concatenated requirements.txt files. |
settings_checksum |
Optional[str] |
Checksum of the settings used for the build. |
contains_code |
bool |
Whether the image contains user files. |
requires_code_download |
bool |
Whether the image needs to download files. |
Source code in zenml/models/pipeline_build_models.py
class BuildItem(BaseModel):
"""Pipeline build item.
Attributes:
image: The image name or digest.
dockerfile: The contents of the Dockerfile used to build the image.
requirements: The pip requirements installed in the image. This is a
string consisting of multiple concatenated requirements.txt files.
settings_checksum: Checksum of the settings used for the build.
contains_code: Whether the image contains user files.
requires_code_download: Whether the image needs to download files.
"""
image: str = Field(title="The image name or digest.")
dockerfile: Optional[str] = Field(
title="The dockerfile used to build the image."
)
requirements: Optional[str] = Field(
title="The pip requirements installed in the image."
)
settings_checksum: Optional[str] = Field(
title="The checksum of the build settings."
)
contains_code: bool = Field(
default=True, title="Whether the image contains user files."
)
requires_code_download: bool = Field(
default=False, title="Whether the image needs to download files."
)
PipelineBuildBaseModel (YAMLSerializationMixin)
pydantic-model
Base model for pipeline builds.
Attributes:
Name | Type | Description |
---|---|---|
images |
Dict[str, zenml.models.pipeline_build_models.BuildItem] |
Docker images of this build. |
is_local |
bool |
Whether the images are stored locally or in a container registry. |
Source code in zenml/models/pipeline_build_models.py
class PipelineBuildBaseModel(pydantic_utils.YAMLSerializationMixin):
"""Base model for pipeline builds.
Attributes:
images: Docker images of this build.
is_local: Whether the images are stored locally or in a container
registry.
"""
images: Dict[str, BuildItem] = Field(
default={}, title="The images of this build."
)
is_local: bool = Field(
title="Whether the build images are stored in a container registry or locally.",
)
contains_code: bool = Field(
title="Whether any image of the build contains user code.",
)
zenml_version: Optional[str] = Field(
title="The version of ZenML used for this build."
)
python_version: Optional[str] = Field(
title="The Python version used for this build."
)
checksum: Optional[str] = Field(title="The build checksum.")
@property
def requires_code_download(self) -> bool:
"""Whether the build requires code download.
Returns:
Whether the build requires code download.
"""
return any(
item.requires_code_download for item in self.images.values()
)
@staticmethod
def get_image_key(component_key: str, step: Optional[str] = None) -> str:
"""Get the image key.
Args:
component_key: The component key.
step: The pipeline step for which the image was built.
Returns:
The image key.
"""
if step:
return f"{step}.{component_key}"
else:
return component_key
def get_image(self, component_key: str, step: Optional[str] = None) -> str:
"""Get the image built for a specific key.
Args:
component_key: The key for which to get the image.
step: The pipeline step for which to get the image. If no image
exists for this step, will fallback to the pipeline image for
the same key.
Returns:
The image name or digest.
"""
return self._get_item(component_key=component_key, step=step).image
def get_settings_checksum(
self, component_key: str, step: Optional[str] = None
) -> Optional[str]:
"""Get the settings checksum for a specific key.
Args:
component_key: The key for which to get the checksum.
step: The pipeline step for which to get the checksum. If no
image exists for this step, will fallback to the pipeline image
for the same key.
Returns:
The settings checksum.
"""
return self._get_item(
component_key=component_key, step=step
).settings_checksum
def _get_item(
self, component_key: str, step: Optional[str] = None
) -> BuildItem:
"""Get the item for a specific key.
Args:
component_key: The key for which to get the item.
step: The pipeline step for which to get the item. If no item
exists for this step, will fallback to the item for
the same key.
Raises:
KeyError: If no item exists for the given key.
Returns:
The build item.
"""
if step:
try:
combined_key = self.get_image_key(
component_key=component_key, step=step
)
return self.images[combined_key]
except KeyError:
pass
try:
return self.images[component_key]
except KeyError:
raise KeyError(
f"Unable to find image for key {component_key}. Available keys: "
f"{set(self.images)}."
)
requires_code_download: bool
property
readonly
Whether the build requires code download.
Returns:
Type | Description |
---|---|
bool |
Whether the build requires code download. |
get_image(self, component_key, step=None)
Get the image built for a specific key.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_key |
str |
The key for which to get the image. |
required |
step |
Optional[str] |
The pipeline step for which to get the image. If no image exists for this step, will fallback to the pipeline image for the same key. |
None |
Returns:
Type | Description |
---|---|
str |
The image name or digest. |
Source code in zenml/models/pipeline_build_models.py
def get_image(self, component_key: str, step: Optional[str] = None) -> str:
"""Get the image built for a specific key.
Args:
component_key: The key for which to get the image.
step: The pipeline step for which to get the image. If no image
exists for this step, will fallback to the pipeline image for
the same key.
Returns:
The image name or digest.
"""
return self._get_item(component_key=component_key, step=step).image
get_image_key(component_key, step=None)
staticmethod
Get the image key.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_key |
str |
The component key. |
required |
step |
Optional[str] |
The pipeline step for which the image was built. |
None |
Returns:
Type | Description |
---|---|
str |
The image key. |
Source code in zenml/models/pipeline_build_models.py
@staticmethod
def get_image_key(component_key: str, step: Optional[str] = None) -> str:
"""Get the image key.
Args:
component_key: The component key.
step: The pipeline step for which the image was built.
Returns:
The image key.
"""
if step:
return f"{step}.{component_key}"
else:
return component_key
get_settings_checksum(self, component_key, step=None)
Get the settings checksum for a specific key.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_key |
str |
The key for which to get the checksum. |
required |
step |
Optional[str] |
The pipeline step for which to get the checksum. If no image exists for this step, will fallback to the pipeline image for the same key. |
None |
Returns:
Type | Description |
---|---|
Optional[str] |
The settings checksum. |
Source code in zenml/models/pipeline_build_models.py
def get_settings_checksum(
self, component_key: str, step: Optional[str] = None
) -> Optional[str]:
"""Get the settings checksum for a specific key.
Args:
component_key: The key for which to get the checksum.
step: The pipeline step for which to get the checksum. If no
image exists for this step, will fallback to the pipeline image
for the same key.
Returns:
The settings checksum.
"""
return self._get_item(
component_key=component_key, step=step
).settings_checksum
PipelineBuildFilterModel (WorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced filtering of all pipeline builds.
Source code in zenml/models/pipeline_build_models.py
class PipelineBuildFilterModel(WorkspaceScopedFilterModel):
"""Model to enable advanced filtering of all pipeline builds."""
workspace_id: Union[UUID, str, None] = Field(
description="Workspace for this pipeline build."
)
user_id: Union[UUID, str, None] = Field(
description="User that produced this pipeline build."
)
pipeline_id: Union[UUID, str, None] = Field(
description="Pipeline associated with the pipeline build.",
)
stack_id: Union[UUID, str, None] = Field(
description="Stack used for the Pipeline Run"
)
is_local: Optional[bool] = Field(
description="Whether the build images are stored in a container registry or locally.",
)
contains_code: Optional[bool] = Field(
description="Whether any image of the build contains user code.",
)
zenml_version: Optional[str] = Field(
description="The version of ZenML used for this build."
)
python_version: Optional[str] = Field(
description="The Python version used for this build."
)
checksum: Optional[str] = Field(description="The build checksum.")
checksum: str
pydantic-field
The build checksum.
contains_code: bool
pydantic-field
Whether any image of the build contains user code.
is_local: bool
pydantic-field
Whether the build images are stored in a container registry or locally.
pipeline_id: Union[uuid.UUID, str]
pydantic-field
Pipeline associated with the pipeline build.
python_version: str
pydantic-field
The Python version used for this build.
stack_id: Union[uuid.UUID, str]
pydantic-field
Stack used for the Pipeline Run
user_id: Union[uuid.UUID, str]
pydantic-field
User that produced this pipeline build.
workspace_id: Union[uuid.UUID, str]
pydantic-field
Workspace for this pipeline build.
zenml_version: str
pydantic-field
The version of ZenML used for this build.
PipelineBuildRequestModel (PipelineBuildBaseModel, WorkspaceScopedRequestModel)
pydantic-model
Request model for pipelines builds.
Source code in zenml/models/pipeline_build_models.py
class PipelineBuildRequestModel(
PipelineBuildBaseModel, WorkspaceScopedRequestModel
):
"""Request model for pipelines builds."""
stack: Optional[UUID] = Field(
title="The stack that was used for this build."
)
pipeline: Optional[UUID] = Field(
title="The pipeline that was used for this build."
)
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
PipelineBuildResponseModel (PipelineBuildBaseModel, WorkspaceScopedResponseModel)
pydantic-model
Response model for pipeline builds.
Source code in zenml/models/pipeline_build_models.py
class PipelineBuildResponseModel(
PipelineBuildBaseModel, WorkspaceScopedResponseModel
):
"""Response model for pipeline builds."""
pipeline: Optional["PipelineResponseModel"] = Field(
title="The pipeline that was used for this build."
)
stack: Optional["StackResponseModel"] = Field(
title="The stack that was used for this build."
)
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
pipeline_deployment_models
Models representing pipeline deployments.
PipelineDeploymentBaseModel (BaseModel)
pydantic-model
Base model for pipeline deployments.
Source code in zenml/models/pipeline_deployment_models.py
class PipelineDeploymentBaseModel(BaseModel):
"""Base model for pipeline deployments."""
run_name_template: str = Field(
title="The run name template for runs created using this deployment.",
)
pipeline_configuration: PipelineConfiguration = Field(
title="The pipeline configuration for this deployment."
)
step_configurations: Dict[str, Step] = Field(
default={}, title="The step configurations for this deployment."
)
client_environment: Dict[str, str] = Field(
default={}, title="The client environment for this deployment."
)
@property
def requires_included_files(self) -> bool:
"""Whether the deployment requires included files.
Returns:
Whether the deployment requires included files.
"""
return any(
step.config.docker_settings.source_files == SourceFileMode.INCLUDE
for step in self.step_configurations.values()
)
@property
def requires_code_download(self) -> bool:
"""Whether the deployment requires downloading some code files.
Returns:
Whether the deployment requires downloading some code files.
"""
return any(
step.config.docker_settings.source_files == SourceFileMode.DOWNLOAD
for step in self.step_configurations.values()
)
requires_code_download: bool
property
readonly
Whether the deployment requires downloading some code files.
Returns:
Type | Description |
---|---|
bool |
Whether the deployment requires downloading some code files. |
requires_included_files: bool
property
readonly
Whether the deployment requires included files.
Returns:
Type | Description |
---|---|
bool |
Whether the deployment requires included files. |
PipelineDeploymentFilterModel (WorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced filtering of all pipeline deployments.
Source code in zenml/models/pipeline_deployment_models.py
class PipelineDeploymentFilterModel(WorkspaceScopedFilterModel):
"""Model to enable advanced filtering of all pipeline deployments."""
workspace_id: Optional[Union[UUID, str]] = Field(
default=None, description="Workspace for this deployment."
)
user_id: Optional[Union[UUID, str]] = Field(
default=None, description="User that created this deployment."
)
pipeline_id: Optional[Union[UUID, str]] = Field(
default=None, description="Pipeline associated with the deployment."
)
stack_id: Optional[Union[UUID, str]] = Field(
default=None, description="Stack associated with the deployment."
)
build_id: Optional[Union[UUID, str]] = Field(
default=None, description="Build associated with the deployment."
)
schedule_id: Optional[Union[UUID, str]] = Field(
default=None, description="Schedule associated with the deployment."
)
build_id: Union[uuid.UUID, str]
pydantic-field
Build associated with the deployment.
pipeline_id: Union[uuid.UUID, str]
pydantic-field
Pipeline associated with the deployment.
schedule_id: Union[uuid.UUID, str]
pydantic-field
Schedule associated with the deployment.
stack_id: Union[uuid.UUID, str]
pydantic-field
Stack associated with the deployment.
user_id: Union[uuid.UUID, str]
pydantic-field
User that created this deployment.
workspace_id: Union[uuid.UUID, str]
pydantic-field
Workspace for this deployment.
PipelineDeploymentRequestModel (PipelineDeploymentBaseModel, WorkspaceScopedRequestModel)
pydantic-model
Request model for pipeline deployments.
Source code in zenml/models/pipeline_deployment_models.py
class PipelineDeploymentRequestModel(
PipelineDeploymentBaseModel, WorkspaceScopedRequestModel
):
"""Request model for pipeline deployments."""
stack: UUID = Field(title="The stack associated with the deployment.")
pipeline: Optional[UUID] = Field(
title="The pipeline associated with the deployment."
)
build: Optional[UUID] = Field(
title="The build associated with the deployment."
)
schedule: Optional[UUID] = Field(
title="The schedule associated with the deployment."
)
code_reference: Optional["CodeReferenceRequestModel"] = Field(
title="The code reference associated with the deployment."
)
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
PipelineDeploymentResponseModel (PipelineDeploymentBaseModel, WorkspaceScopedResponseModel)
pydantic-model
Response model for pipeline deployments.
Source code in zenml/models/pipeline_deployment_models.py
class PipelineDeploymentResponseModel(
PipelineDeploymentBaseModel, WorkspaceScopedResponseModel
):
"""Response model for pipeline deployments."""
pipeline: Optional["PipelineResponseModel"] = Field(
title="The pipeline associated with the deployment."
)
stack: Optional["StackResponseModel"] = Field(
title="The stack associated with the deployment."
)
build: Optional["PipelineBuildResponseModel"] = Field(
title="The pipeline build associated with the deployment."
)
schedule: Optional["ScheduleResponseModel"] = Field(
title="The schedule associated with the deployment."
)
code_reference: Optional["CodeReferenceResponseModel"] = Field(
title="The code reference associated with the deployment."
)
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
pipeline_models
Models representing pipelines.
PipelineBaseModel (BaseModel)
pydantic-model
Base model for pipelines.
Source code in zenml/models/pipeline_models.py
class PipelineBaseModel(BaseModel):
"""Base model for pipelines."""
name: str = Field(
title="The name of the pipeline.",
max_length=STR_FIELD_MAX_LENGTH,
)
version: str = Field(
title="The version of the pipeline.",
max_length=STR_FIELD_MAX_LENGTH,
)
version_hash: str = Field(
title="The version hash of the pipeline.",
max_length=STR_FIELD_MAX_LENGTH,
)
docstring: Optional[str] = Field(
title="The docstring of the pipeline.",
max_length=TEXT_FIELD_MAX_LENGTH,
)
spec: PipelineSpec
PipelineFilterModel (WorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced filtering of all Workspaces.
Source code in zenml/models/pipeline_models.py
class PipelineFilterModel(WorkspaceScopedFilterModel):
"""Model to enable advanced filtering of all Workspaces."""
name: Optional[str] = Field(
default=None,
description="Name of the Pipeline",
)
version: Optional[str] = Field(
default=None,
description="Version of the Pipeline",
)
version_hash: Optional[str] = Field(
default=None,
description="Version hash of the Pipeline",
)
docstring: Optional[str] = Field(
default=None,
description="Docstring of the Pipeline",
)
workspace_id: Optional[Union[UUID, str]] = Field(
default=None, description="Workspace of the Pipeline"
)
user_id: Optional[Union[UUID, str]] = Field(
default=None, description="User of the Pipeline"
)
docstring: str
pydantic-field
Docstring of the Pipeline
name: str
pydantic-field
Name of the Pipeline
user_id: Union[uuid.UUID, str]
pydantic-field
User of the Pipeline
version: str
pydantic-field
Version of the Pipeline
version_hash: str
pydantic-field
Version hash of the Pipeline
workspace_id: Union[uuid.UUID, str]
pydantic-field
Workspace of the Pipeline
PipelineRequestModel (PipelineBaseModel, WorkspaceScopedRequestModel)
pydantic-model
Pipeline request model.
Source code in zenml/models/pipeline_models.py
class PipelineRequestModel(PipelineBaseModel, WorkspaceScopedRequestModel):
"""Pipeline request model."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
PipelineResponseModel (PipelineBaseModel, WorkspaceScopedResponseModel)
pydantic-model
Pipeline response model user, workspace, runs, and status hydrated.
Source code in zenml/models/pipeline_models.py
class PipelineResponseModel(PipelineBaseModel, WorkspaceScopedResponseModel):
"""Pipeline response model user, workspace, runs, and status hydrated."""
runs: Optional[List["PipelineRunResponseModel"]] = Field(
default=None, title="A list of the last x Pipeline Runs."
)
status: Optional[List[ExecutionStatus]] = Field(
default=None, title="The status of the last x Pipeline Runs."
)
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
PipelineUpdateModel (PipelineRequestModel)
pydantic-model
Pipeline update model.
Source code in zenml/models/pipeline_models.py
class PipelineUpdateModel(PipelineRequestModel):
"""Pipeline update model."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
pipeline_run_models
Models representing pipeline runs.
PipelineRunBaseModel (BaseModel)
pydantic-model
Base model for pipeline runs.
Source code in zenml/models/pipeline_run_models.py
class PipelineRunBaseModel(BaseModel):
"""Base model for pipeline runs."""
name: str = Field(
title="The name of the pipeline run.",
max_length=STR_FIELD_MAX_LENGTH,
)
orchestrator_run_id: Optional[str] = Field(
title="The orchestrator run ID.",
max_length=STR_FIELD_MAX_LENGTH,
default=None,
)
schedule_id: Optional[UUID]
enable_cache: Optional[bool]
start_time: Optional[datetime]
end_time: Optional[datetime]
status: ExecutionStatus
pipeline_configuration: PipelineConfiguration
num_steps: Optional[int]
client_version: Optional[str] = Field(
title="Client version.",
default=current_zenml_version,
max_length=STR_FIELD_MAX_LENGTH,
)
server_version: Optional[str] = Field(
title="Server version.",
max_length=STR_FIELD_MAX_LENGTH,
)
client_environment: Dict[str, str] = Field(
default={},
title=(
"Environment of the client that initiated this pipeline run "
"(OS, Python version, etc.)."
),
)
orchestrator_environment: Dict[str, str] = Field(
default={},
title=(
"Environment of the orchestrator that executed this pipeline run "
"(OS, Python version, etc.)."
),
)
git_sha: Optional[str] = None
_deprecation_validator = deprecation_utils.deprecate_pydantic_attributes(
"git_sha"
)
PipelineRunFilterModel (WorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced filtering of all Workspaces.
Source code in zenml/models/pipeline_run_models.py
class PipelineRunFilterModel(WorkspaceScopedFilterModel):
"""Model to enable advanced filtering of all Workspaces."""
FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*WorkspaceScopedFilterModel.FILTER_EXCLUDE_FIELDS,
"unlisted",
"code_repository_id",
]
name: Optional[str] = Field(
default=None,
description="Name of the Pipeline Run",
)
orchestrator_run_id: Optional[str] = Field(
default=None,
description="Name of the Pipeline Run within the orchestrator",
)
pipeline_id: Optional[Union[UUID, str]] = Field(
default=None, description="Pipeline associated with the Pipeline Run"
)
workspace_id: Optional[Union[UUID, str]] = Field(
default=None, description="Workspace of the Pipeline Run"
)
user_id: Optional[Union[UUID, str]] = Field(
default=None, description="User that created the Pipeline Run"
)
stack_id: Optional[Union[UUID, str]] = Field(
default=None, description="Stack used for the Pipeline Run"
)
schedule_id: Optional[Union[UUID, str]] = Field(
default=None, description="Schedule that triggered the Pipeline Run"
)
build_id: Optional[Union[UUID, str]] = Field(
default=None, description="Build used for the Pipeline Run"
)
deployment_id: Optional[Union[UUID, str]] = Field(
default=None, description="Deployment used for the Pipeline Run"
)
code_repository_id: Optional[Union[UUID, str]] = Field(
default=None, description="Code repository used for the Pipeline Run"
)
status: Optional[str] = Field(
default=None,
description="Name of the Pipeline Run",
)
start_time: Optional[Union[datetime, str]] = Field(
default=None, description="Start time for this run"
)
end_time: Optional[Union[datetime, str]] = Field(
default=None, description="End time for this run"
)
num_steps: Optional[int] = Field(
default=None,
description="Amount of steps in the Pipeline Run",
)
unlisted: Optional[bool] = None
def generate_filter(
self, table: Type["SQLModel"]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
"""Generate the filter for the query.
Args:
table: The Table that is being queried from.
Returns:
The filter expression for the query.
"""
from sqlalchemy import and_
from sqlmodel import or_
base_filter = super().generate_filter(table)
operator = (
or_ if self.logical_operator == LogicalOperators.OR else and_
)
if self.unlisted is not None:
if self.unlisted is True:
unlisted_filter = getattr(table, "pipeline_id").is_(None)
else:
unlisted_filter = getattr(table, "pipeline_id").is_not(None)
base_filter = operator(base_filter, unlisted_filter)
if self.code_repository_id:
from zenml.zen_stores.schemas import (
CodeReferenceSchema,
PipelineDeploymentSchema,
PipelineRunSchema,
)
code_repo_filter = and_( # type: ignore[type-var]
PipelineRunSchema.deployment_id == PipelineDeploymentSchema.id,
PipelineDeploymentSchema.code_reference_id
== CodeReferenceSchema.id,
CodeReferenceSchema.code_repository_id
== self.code_repository_id,
)
base_filter = operator(base_filter, code_repo_filter)
return base_filter
build_id: Union[uuid.UUID, str]
pydantic-field
Build used for the Pipeline Run
code_repository_id: Union[uuid.UUID, str]
pydantic-field
Code repository used for the Pipeline Run
deployment_id: Union[uuid.UUID, str]
pydantic-field
Deployment used for the Pipeline Run
end_time: Union[datetime.datetime, str]
pydantic-field
End time for this run
name: str
pydantic-field
Name of the Pipeline Run
num_steps: int
pydantic-field
Amount of steps in the Pipeline Run
orchestrator_run_id: str
pydantic-field
Name of the Pipeline Run within the orchestrator
pipeline_id: Union[uuid.UUID, str]
pydantic-field
Pipeline associated with the Pipeline Run
schedule_id: Union[uuid.UUID, str]
pydantic-field
Schedule that triggered the Pipeline Run
stack_id: Union[uuid.UUID, str]
pydantic-field
Stack used for the Pipeline Run
start_time: Union[datetime.datetime, str]
pydantic-field
Start time for this run
status: str
pydantic-field
Name of the Pipeline Run
user_id: Union[uuid.UUID, str]
pydantic-field
User that created the Pipeline Run
workspace_id: Union[uuid.UUID, str]
pydantic-field
Workspace of the Pipeline Run
generate_filter(self, table)
Generate the filter for the query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
Type[SQLModel] |
The Table that is being queried from. |
required |
Returns:
Type | Description |
---|---|
Union[BinaryExpression[Any], BooleanClauseList[Any]] |
The filter expression for the query. |
Source code in zenml/models/pipeline_run_models.py
def generate_filter(
self, table: Type["SQLModel"]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
"""Generate the filter for the query.
Args:
table: The Table that is being queried from.
Returns:
The filter expression for the query.
"""
from sqlalchemy import and_
from sqlmodel import or_
base_filter = super().generate_filter(table)
operator = (
or_ if self.logical_operator == LogicalOperators.OR else and_
)
if self.unlisted is not None:
if self.unlisted is True:
unlisted_filter = getattr(table, "pipeline_id").is_(None)
else:
unlisted_filter = getattr(table, "pipeline_id").is_not(None)
base_filter = operator(base_filter, unlisted_filter)
if self.code_repository_id:
from zenml.zen_stores.schemas import (
CodeReferenceSchema,
PipelineDeploymentSchema,
PipelineRunSchema,
)
code_repo_filter = and_( # type: ignore[type-var]
PipelineRunSchema.deployment_id == PipelineDeploymentSchema.id,
PipelineDeploymentSchema.code_reference_id
== CodeReferenceSchema.id,
CodeReferenceSchema.code_repository_id
== self.code_repository_id,
)
base_filter = operator(base_filter, code_repo_filter)
return base_filter
PipelineRunRequestModel (PipelineRunBaseModel, WorkspaceScopedRequestModel)
pydantic-model
Pipeline run model with user, workspace, pipeline, and stack as UUIDs.
Source code in zenml/models/pipeline_run_models.py
class PipelineRunRequestModel(
PipelineRunBaseModel, WorkspaceScopedRequestModel
):
"""Pipeline run model with user, workspace, pipeline, and stack as UUIDs."""
id: UUID
stack: Optional[UUID] # Might become None if the stack is deleted.
pipeline: Optional[UUID] # Unlisted runs have this as None.
build: Optional[UUID]
deployment: Optional[UUID]
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
PipelineRunResponseModel (PipelineRunBaseModel, WorkspaceScopedResponseModel)
pydantic-model
Pipeline run model with user, workspace, pipeline, and stack hydrated.
Source code in zenml/models/pipeline_run_models.py
class PipelineRunResponseModel(
PipelineRunBaseModel, WorkspaceScopedResponseModel
):
"""Pipeline run model with user, workspace, pipeline, and stack hydrated."""
pipeline: Optional["PipelineResponseModel"] = Field(
default=None, title="The pipeline this run belongs to."
)
stack: Optional["StackResponseModel"] = Field(
default=None, title="The stack that was used for this run."
)
metadata: Dict[str, "RunMetadataResponseModel"] = Field(
default={},
title="Metadata associated with this pipeline run.",
)
build: Optional["PipelineBuildResponseModel"] = Field(
default=None, title="The pipeline build that was used for this run."
)
deployment: Optional["PipelineDeploymentResponseModel"] = Field(
default=None, title="The deployment that was used for this run."
)
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
PipelineRunUpdateModel (BaseModel)
pydantic-model
Pipeline run update model.
Source code in zenml/models/pipeline_run_models.py
class PipelineRunUpdateModel(BaseModel):
"""Pipeline run update model."""
status: Optional[ExecutionStatus] = None
end_time: Optional[datetime] = None
role_models
Models representing roles that can be assigned to users or teams.
RoleBaseModel (BaseModel)
pydantic-model
Base model for roles.
Source code in zenml/models/role_models.py
class RoleBaseModel(BaseModel):
"""Base model for roles."""
name: str = Field(
title="The unique name of the role.",
max_length=STR_FIELD_MAX_LENGTH,
)
permissions: Set[PermissionType]
RoleFilterModel (BaseFilterModel)
pydantic-model
Model to enable advanced filtering of all Users.
Source code in zenml/models/role_models.py
class RoleFilterModel(BaseFilterModel):
"""Model to enable advanced filtering of all Users."""
name: Optional[str] = Field(
default=None,
description="Name of the role",
)
name: str
pydantic-field
Name of the role
RoleRequestModel (RoleBaseModel, BaseRequestModel)
pydantic-model
Request model for roles.
Source code in zenml/models/role_models.py
class RoleRequestModel(RoleBaseModel, BaseRequestModel):
"""Request model for roles."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
RoleResponseModel (RoleBaseModel, BaseResponseModel)
pydantic-model
Response model for roles.
Source code in zenml/models/role_models.py
class RoleResponseModel(RoleBaseModel, BaseResponseModel):
"""Response model for roles."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
RoleUpdateModel (RoleRequestModel)
pydantic-model
Update model for roles.
Source code in zenml/models/role_models.py
class RoleUpdateModel(RoleRequestModel):
"""Update model for roles."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
run_metadata_models
Models representing run metadata.
RunMetadataBaseModel (BaseModel)
pydantic-model
Base model for run metadata.
Source code in zenml/models/run_metadata_models.py
class RunMetadataBaseModel(BaseModel):
"""Base model for run metadata."""
pipeline_run_id: Optional[UUID] = Field(
title="The ID of the pipeline run that this metadata belongs to.",
)
step_run_id: Optional[UUID]
artifact_id: Optional[UUID]
stack_component_id: Optional[UUID]
key: str = Field(
title="The key of the metadata.",
max_length=STR_FIELD_MAX_LENGTH,
)
value: MetadataType = Field(
title="The value of the metadata.",
max_length=TEXT_FIELD_MAX_LENGTH,
)
type: MetadataTypeEnum = Field(
title="The type of the metadata.",
max_length=STR_FIELD_MAX_LENGTH,
)
RunMetadataFilterModel (WorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced filtering of run metadata.
Source code in zenml/models/run_metadata_models.py
class RunMetadataFilterModel(WorkspaceScopedFilterModel):
"""Model to enable advanced filtering of run metadata."""
pipeline_run_id: Optional[Union[str, UUID]] = None
step_run_id: Optional[Union[str, UUID]] = None
artifact_id: Optional[Union[str, UUID]] = None
stack_component_id: Optional[Union[str, UUID]] = None
key: Optional[str] = None
type: Optional[Union[str, MetadataTypeEnum]] = None
RunMetadataRequestModel (RunMetadataBaseModel, WorkspaceScopedRequestModel)
pydantic-model
Request model for run metadata.
Source code in zenml/models/run_metadata_models.py
class RunMetadataRequestModel(
RunMetadataBaseModel, WorkspaceScopedRequestModel
):
"""Request model for run metadata."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
RunMetadataResponseModel (RunMetadataBaseModel, WorkspaceScopedResponseModel)
pydantic-model
Response model for run metadata.
Source code in zenml/models/run_metadata_models.py
class RunMetadataResponseModel(
RunMetadataBaseModel, WorkspaceScopedResponseModel
):
"""Response model for run metadata."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
schedule_model
Model definition for pipeline run schedules.
ScheduleBaseModel (Schedule, BaseModel)
pydantic-model
Domain model for schedules.
Source code in zenml/models/schedule_model.py
class ScheduleBaseModel(Schedule, BaseModel):
"""Domain model for schedules."""
ANALYTICS_FIELDS: ClassVar[List[str]] = ["id"]
name: str
active: bool
orchestrator_id: Optional[UUID]
pipeline_id: Optional[UUID]
ScheduleFilterModel (ShareableWorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced filtering of all Users.
Source code in zenml/models/schedule_model.py
class ScheduleFilterModel(ShareableWorkspaceScopedFilterModel):
"""Model to enable advanced filtering of all Users."""
workspace_id: Optional[Union[UUID, str]] = Field(
default=None, description="Workspace scope of the schedule."
)
user_id: Optional[Union[UUID, str]] = Field(
default=None, description="User that created the schedule"
)
pipeline_id: Optional[Union[UUID, str]] = Field(
default=None, description="Pipeline that the schedule is attached to."
)
orchestrator_id: Optional[Union[UUID, str]] = Field(
default=None,
description="Orchestrator that the schedule is attached to.",
)
active: Optional[bool] = Field(
default=None,
description="If the schedule is active",
)
cron_expression: Optional[str] = Field(
default=None,
description="The cron expression, describing the schedule",
)
start_time: Optional[Union[datetime, str]] = Field(
default=None, description="Start time"
)
end_time: Optional[Union[datetime, str]] = Field(
default=None, description="End time"
)
interval_second: Optional[Optional[float]] = Field(
default=None,
description="The repetition interval in seconds",
)
catchup: Optional[bool] = Field(
default=None,
description="Whether or not the schedule is set to catchup past missed "
"events",
)
name: Optional[str] = Field(
default=None,
description="Name of the schedule",
)
active: bool
pydantic-field
If the schedule is active
catchup: bool
pydantic-field
Whether or not the schedule is set to catchup past missed events
cron_expression: str
pydantic-field
The cron expression, describing the schedule
end_time: Union[datetime.datetime, str]
pydantic-field
End time
interval_second: float
pydantic-field
The repetition interval in seconds
name: str
pydantic-field
Name of the schedule
orchestrator_id: Union[uuid.UUID, str]
pydantic-field
Orchestrator that the schedule is attached to.
pipeline_id: Union[uuid.UUID, str]
pydantic-field
Pipeline that the schedule is attached to.
start_time: Union[datetime.datetime, str]
pydantic-field
Start time
user_id: Union[uuid.UUID, str]
pydantic-field
User that created the schedule
workspace_id: Union[uuid.UUID, str]
pydantic-field
Workspace scope of the schedule.
ScheduleRequestModel (ScheduleBaseModel, WorkspaceScopedRequestModel)
pydantic-model
Schedule request model.
Source code in zenml/models/schedule_model.py
class ScheduleRequestModel(ScheduleBaseModel, WorkspaceScopedRequestModel):
"""Schedule request model."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
ScheduleResponseModel (ScheduleBaseModel, WorkspaceScopedResponseModel)
pydantic-model
Schedule response model with workspace and user hydrated.
Source code in zenml/models/schedule_model.py
class ScheduleResponseModel(ScheduleBaseModel, WorkspaceScopedResponseModel):
"""Schedule response model with workspace and user hydrated."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
ScheduleUpdateModel (BaseModel)
pydantic-model
Schedule update model.
Source code in zenml/models/schedule_model.py
class ScheduleUpdateModel(BaseModel):
"""Schedule update model."""
name: Optional[str] = None
active: Optional[bool] = None
cron_expression: Optional[str] = None
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
interval_second: Optional[timedelta] = None
catchup: Optional[bool] = None
secret_models
Models representing secrets.
SecretBaseModel (BaseModel)
pydantic-model
Base model for secrets.
Source code in zenml/models/secret_models.py
class SecretBaseModel(BaseModel):
"""Base model for secrets."""
name: str = Field(
title="The name of the secret.",
max_length=STR_FIELD_MAX_LENGTH,
)
scope: SecretScope = Field(
SecretScope.WORKSPACE, title="The scope of the secret."
)
values: Dict[str, Optional[SecretStr]] = Field(
default_factory=dict, title="The values stored in this secret."
)
@property
def secret_values(self) -> Dict[str, str]:
"""A dictionary with all un-obfuscated values stored in this secret.
The values are returned as strings, not SecretStr. If a value is
None, it is not included in the returned dictionary. This is to enable
the use of None values in the update model to indicate that a secret
value should be deleted.
Returns:
A dictionary containing the secret's values.
"""
return {
k: v.get_secret_value()
for k, v in self.values.items()
if v is not None
}
def add_secret(self, key: str, value: str) -> None:
"""Adds a secret value to the secret.
Args:
key: The key of the secret value.
value: The secret value.
"""
self.values[key] = SecretStr(value)
def remove_secret(self, key: str) -> None:
"""Removes a secret value from the secret.
Args:
key: The key of the secret value.
"""
del self.values[key]
def remove_secrets(self) -> None:
"""Removes all secret values from the secret but keep the keys."""
self.values = {k: None for k in self.values.keys()}
secret_values: Dict[str, str]
property
readonly
A dictionary with all un-obfuscated values stored in this secret.
The values are returned as strings, not SecretStr. If a value is None, it is not included in the returned dictionary. This is to enable the use of None values in the update model to indicate that a secret value should be deleted.
Returns:
Type | Description |
---|---|
Dict[str, str] |
A dictionary containing the secret's values. |
add_secret(self, key, value)
Adds a secret value to the secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
The key of the secret value. |
required |
value |
str |
The secret value. |
required |
Source code in zenml/models/secret_models.py
def add_secret(self, key: str, value: str) -> None:
"""Adds a secret value to the secret.
Args:
key: The key of the secret value.
value: The secret value.
"""
self.values[key] = SecretStr(value)
remove_secret(self, key)
Removes a secret value from the secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
The key of the secret value. |
required |
Source code in zenml/models/secret_models.py
def remove_secret(self, key: str) -> None:
"""Removes a secret value from the secret.
Args:
key: The key of the secret value.
"""
del self.values[key]
remove_secrets(self)
Removes all secret values from the secret but keep the keys.
Source code in zenml/models/secret_models.py
def remove_secrets(self) -> None:
"""Removes all secret values from the secret but keep the keys."""
self.values = {k: None for k in self.values.keys()}
SecretFilterModel (WorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced filtering of all Secrets.
Source code in zenml/models/secret_models.py
class SecretFilterModel(WorkspaceScopedFilterModel):
"""Model to enable advanced filtering of all Secrets."""
FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*WorkspaceScopedFilterModel.FILTER_EXCLUDE_FIELDS,
"values",
]
name: Optional[str] = Field(
default=None,
description="Name of the secret",
)
scope: Optional[Union[SecretScope, str]] = Field(
default=None,
description="Scope in which to filter secrets",
)
workspace_id: Optional[Union[UUID, str]] = Field(
default=None, description="Workspace of the Secret"
)
user_id: Optional[Union[UUID, str]] = Field(
None, description="User that created the Secret"
)
@staticmethod
def _get_filtering_value(value: Optional[Any]) -> str:
"""Convert the value to a string that can be used for lexicographical filtering and sorting.
Args:
value: The value to convert.
Returns:
The value converted to string format that can be used for
lexicographical sorting and filtering.
"""
if value is None:
return ""
str_value = str(value)
if isinstance(value, datetime):
str_value = value.strftime("%Y-%m-%d %H:%M:%S")
return str_value
def secret_matches(self, secret: SecretResponseModel) -> bool:
"""Checks if a secret matches the filter criteria.
Args:
secret: The secret to check.
Returns:
True if the secret matches the filter criteria, False otherwise.
"""
for filter in self.list_of_filters:
column_value: Optional[Any] = None
if filter.column == "workspace_id":
column_value = secret.workspace.id
elif filter.column == "user_id":
column_value = secret.user.id if secret.user else None
else:
column_value = getattr(secret, filter.column)
# Convert the values to strings for lexicographical comparison.
str_column_value = self._get_filtering_value(column_value)
str_filter_value = self._get_filtering_value(filter.value)
# Compare the lexicographical values according to the operation.
if filter.operation == GenericFilterOps.EQUALS:
result = str_column_value == str_filter_value
elif filter.operation == GenericFilterOps.CONTAINS:
result = str_filter_value in str_column_value
elif filter.operation == GenericFilterOps.STARTSWITH:
result = str_column_value.startswith(str_filter_value)
elif filter.operation == GenericFilterOps.ENDSWITH:
result = str_column_value.endswith(str_filter_value)
elif filter.operation == GenericFilterOps.GT:
result = str_column_value > str_filter_value
elif filter.operation == GenericFilterOps.GTE:
result = str_column_value >= str_filter_value
elif filter.operation == GenericFilterOps.LT:
result = str_column_value < str_filter_value
elif filter.operation == GenericFilterOps.LTE:
result = str_column_value <= str_filter_value
# Exit early if the result is False for AND and True for OR
if self.logical_operator == LogicalOperators.AND:
if not result:
return False
else:
if result:
return True
# If we get here, all filters have been checked and the result is
# True for AND and False for OR
if self.logical_operator == LogicalOperators.AND:
return True
else:
return False
def sort_secrets(
self, secrets: List[SecretResponseModel]
) -> List[SecretResponseModel]:
"""Sorts a list of secrets according to the filter criteria.
Args:
secrets: The list of secrets to sort.
Returns:
The sorted list of secrets.
"""
column, sort_op = self.sorting_params
sorted_secrets = sorted(
secrets,
key=lambda secret: self._get_filtering_value(
getattr(secret, column)
),
reverse=sort_op == SorterOps.DESCENDING,
)
return sorted_secrets
name: str
pydantic-field
Name of the secret
scope: Union[zenml.enums.SecretScope, str]
pydantic-field
Scope in which to filter secrets
user_id: Union[uuid.UUID, str]
pydantic-field
User that created the Secret
workspace_id: Union[uuid.UUID, str]
pydantic-field
Workspace of the Secret
secret_matches(self, secret)
Checks if a secret matches the filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
SecretResponseModel |
The secret to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the secret matches the filter criteria, False otherwise. |
Source code in zenml/models/secret_models.py
def secret_matches(self, secret: SecretResponseModel) -> bool:
"""Checks if a secret matches the filter criteria.
Args:
secret: The secret to check.
Returns:
True if the secret matches the filter criteria, False otherwise.
"""
for filter in self.list_of_filters:
column_value: Optional[Any] = None
if filter.column == "workspace_id":
column_value = secret.workspace.id
elif filter.column == "user_id":
column_value = secret.user.id if secret.user else None
else:
column_value = getattr(secret, filter.column)
# Convert the values to strings for lexicographical comparison.
str_column_value = self._get_filtering_value(column_value)
str_filter_value = self._get_filtering_value(filter.value)
# Compare the lexicographical values according to the operation.
if filter.operation == GenericFilterOps.EQUALS:
result = str_column_value == str_filter_value
elif filter.operation == GenericFilterOps.CONTAINS:
result = str_filter_value in str_column_value
elif filter.operation == GenericFilterOps.STARTSWITH:
result = str_column_value.startswith(str_filter_value)
elif filter.operation == GenericFilterOps.ENDSWITH:
result = str_column_value.endswith(str_filter_value)
elif filter.operation == GenericFilterOps.GT:
result = str_column_value > str_filter_value
elif filter.operation == GenericFilterOps.GTE:
result = str_column_value >= str_filter_value
elif filter.operation == GenericFilterOps.LT:
result = str_column_value < str_filter_value
elif filter.operation == GenericFilterOps.LTE:
result = str_column_value <= str_filter_value
# Exit early if the result is False for AND and True for OR
if self.logical_operator == LogicalOperators.AND:
if not result:
return False
else:
if result:
return True
# If we get here, all filters have been checked and the result is
# True for AND and False for OR
if self.logical_operator == LogicalOperators.AND:
return True
else:
return False
sort_secrets(self, secrets)
Sorts a list of secrets according to the filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secrets |
List[zenml.models.secret_models.SecretResponseModel] |
The list of secrets to sort. |
required |
Returns:
Type | Description |
---|---|
List[zenml.models.secret_models.SecretResponseModel] |
The sorted list of secrets. |
Source code in zenml/models/secret_models.py
def sort_secrets(
self, secrets: List[SecretResponseModel]
) -> List[SecretResponseModel]:
"""Sorts a list of secrets according to the filter criteria.
Args:
secrets: The list of secrets to sort.
Returns:
The sorted list of secrets.
"""
column, sort_op = self.sorting_params
sorted_secrets = sorted(
secrets,
key=lambda secret: self._get_filtering_value(
getattr(secret, column)
),
reverse=sort_op == SorterOps.DESCENDING,
)
return sorted_secrets
SecretRequestModel (SecretBaseModel, WorkspaceScopedRequestModel)
pydantic-model
Secret request model.
Source code in zenml/models/secret_models.py
class SecretRequestModel(SecretBaseModel, WorkspaceScopedRequestModel):
"""Secret request model."""
ANALYTICS_FIELDS: ClassVar[List[str]] = ["scope"]
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
SecretResponseModel (SecretBaseModel, WorkspaceScopedResponseModel)
pydantic-model
Secret response model with user and workspace hydrated.
Source code in zenml/models/secret_models.py
class SecretResponseModel(SecretBaseModel, WorkspaceScopedResponseModel):
"""Secret response model with user and workspace hydrated."""
ANALYTICS_FIELDS: ClassVar[List[str]] = ["scope"]
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
SecretUpdateModel (SecretRequestModel)
pydantic-model
Secret update model.
Source code in zenml/models/secret_models.py
class SecretUpdateModel(SecretRequestModel):
"""Secret update model."""
scope: Optional[SecretScope] = Field( # type: ignore[assignment]
default=None, title="The scope of the secret."
)
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
server_models
Model definitions for ZenML servers.
ServerDatabaseType (StrEnum)
Enum for server database types.
Source code in zenml/models/server_models.py
class ServerDatabaseType(StrEnum):
"""Enum for server database types."""
SQLITE = "sqlite"
MYSQL = "mysql"
OTHER = "other"
ServerDeploymentType (StrEnum)
Enum for server deployment types.
Source code in zenml/models/server_models.py
class ServerDeploymentType(StrEnum):
"""Enum for server deployment types."""
LOCAL = "local"
DOCKER = "docker"
KUBERNETES = "kubernetes"
AWS = "aws"
GCP = "gcp"
AZURE = "azure"
ALPHA = "alpha"
OTHER = "other"
HF_SPACES = "hf_spaces"
ServerModel (BaseModel)
pydantic-model
Domain model for ZenML servers.
Source code in zenml/models/server_models.py
class ServerModel(BaseModel):
"""Domain model for ZenML servers."""
id: UUID = Field(default_factory=uuid4, title="The unique server id.")
version: str = Field(
title="The ZenML version that the server is running.",
)
deployment_type: ServerDeploymentType = Field(
ServerDeploymentType.OTHER,
title="The ZenML server deployment type.",
)
database_type: ServerDatabaseType = Field(
ServerDatabaseType.OTHER,
title="The database type that the server is using.",
)
secrets_store_type: SecretsStoreType = Field(
SecretsStoreType.NONE,
title="The type of secrets store that the server is using.",
)
def is_local(self) -> bool:
"""Return whether the server is running locally.
Returns:
True if the server is running locally, False otherwise.
"""
from zenml.config.global_config import GlobalConfiguration
# Local ZenML servers are identifiable by the fact that their
# server ID is the same as the local client (user) ID.
return self.id == GlobalConfiguration().user_id
is_local(self)
Return whether the server is running locally.
Returns:
Type | Description |
---|---|
bool |
True if the server is running locally, False otherwise. |
Source code in zenml/models/server_models.py
def is_local(self) -> bool:
"""Return whether the server is running locally.
Returns:
True if the server is running locally, False otherwise.
"""
from zenml.config.global_config import GlobalConfiguration
# Local ZenML servers are identifiable by the fact that their
# server ID is the same as the local client (user) ID.
return self.id == GlobalConfiguration().user_id
stack_models
Models representing stacks.
StackBaseModel (BaseModel)
pydantic-model
Base model for stacks.
Source code in zenml/models/stack_models.py
class StackBaseModel(BaseModel):
"""Base model for stacks."""
name: str = Field(
title="The name of the stack.", max_length=STR_FIELD_MAX_LENGTH
)
description: str = Field(
default="",
title="The description of the stack",
max_length=STR_FIELD_MAX_LENGTH,
)
StackFilterModel (ShareableWorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced filtering of all StackModels.
The Stack Model needs additional scoping. As such the _scope_user
field
can be set to the user that is doing the filtering. The
generate_filter()
method of the baseclass is overwritten to include the
scoping.
Source code in zenml/models/stack_models.py
class StackFilterModel(ShareableWorkspaceScopedFilterModel):
"""Model to enable advanced filtering of all StackModels.
The Stack Model needs additional scoping. As such the `_scope_user` field
can be set to the user that is doing the filtering. The
`generate_filter()` method of the baseclass is overwritten to include the
scoping.
"""
# `component_id` refers to a relationship through a link-table
# rather than a field in the db, hence it needs to be handled
# explicitly
FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*ShareableWorkspaceScopedFilterModel.FILTER_EXCLUDE_FIELDS,
"component_id", # This is a relationship, not a field
]
is_shared: Optional[Union[bool, str]] = Field(
default=None, description="If the stack is shared or private"
)
name: Optional[str] = Field(
default=None,
description="Name of the stack",
)
description: Optional[str] = Field(
default=None, description="Description of the stack"
)
workspace_id: Optional[Union[UUID, str]] = Field(
default=None, description="Workspace of the stack"
)
user_id: Optional[Union[UUID, str]] = Field(
default=None, description="User of the stack"
)
component_id: Optional[Union[UUID, str]] = Field(
default=None, description="Component in the stack"
)
component_id: Union[uuid.UUID, str]
pydantic-field
Component in the stack
description: str
pydantic-field
Description of the stack
is_shared: Union[bool, str]
pydantic-field
If the stack is shared or private
name: str
pydantic-field
Name of the stack
user_id: Union[uuid.UUID, str]
pydantic-field
User of the stack
workspace_id: Union[uuid.UUID, str]
pydantic-field
Workspace of the stack
StackRequestModel (StackBaseModel, ShareableRequestModel)
pydantic-model
Stack model with components, user and workspace as UUIDs.
Source code in zenml/models/stack_models.py
class StackRequestModel(StackBaseModel, ShareableRequestModel):
"""Stack model with components, user and workspace as UUIDs."""
components: Optional[Dict[StackComponentType, List[UUID]]] = Field(
default=None,
title="A mapping of stack component types to the actual"
"instances of components of this type.",
)
@property
def is_valid(self) -> bool:
"""Check if the stack is valid.
Returns:
True if the stack is valid, False otherwise.
"""
if not self.components:
return False
return (
StackComponentType.ARTIFACT_STORE in self.components
and StackComponentType.ORCHESTRATOR in self.components
)
is_valid: bool
property
readonly
Check if the stack is valid.
Returns:
Type | Description |
---|---|
bool |
True if the stack is valid, False otherwise. |
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
StackResponseModel (StackBaseModel, ShareableResponseModel)
pydantic-model
Stack model with Components, User and Workspace fully hydrated.
Source code in zenml/models/stack_models.py
class StackResponseModel(StackBaseModel, ShareableResponseModel):
"""Stack model with Components, User and Workspace fully hydrated."""
components: Dict[StackComponentType, List[ComponentResponseModel]] = Field(
title="A mapping of stack component types to the actual"
"instances of components of this type."
)
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Add the stack components to the stack analytics metadata.
Returns:
Dict of analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata.update({ct: c[0].flavor for ct, c in self.components.items()})
return metadata
@property
def is_valid(self) -> bool:
"""Check if the stack is valid.
Returns:
True if the stack is valid, False otherwise.
"""
return (
StackComponentType.ARTIFACT_STORE in self.components
and StackComponentType.ORCHESTRATOR in self.components
)
def to_yaml(self) -> Dict[str, Any]:
"""Create yaml representation of the Stack Model.
Returns:
The yaml representation of the Stack Model.
"""
component_data = {}
for component_type, components_list in self.components.items():
component = components_list[0]
component_dict = json.loads(
component.json(
include={"name", "type", "flavor", "configuration"}
)
)
component_data[component_type.value] = component_dict
# write zenml version and stack dict to YAML
yaml_data = {
"stack_name": self.name,
"components": component_data,
}
return yaml_data
is_valid: bool
property
readonly
Check if the stack is valid.
Returns:
Type | Description |
---|---|
bool |
True if the stack is valid, False otherwise. |
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
get_analytics_metadata(self)
Add the stack components to the stack analytics metadata.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
Dict of analytics metadata. |
Source code in zenml/models/stack_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Add the stack components to the stack analytics metadata.
Returns:
Dict of analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata.update({ct: c[0].flavor for ct, c in self.components.items()})
return metadata
to_yaml(self)
Create yaml representation of the Stack Model.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The yaml representation of the Stack Model. |
Source code in zenml/models/stack_models.py
def to_yaml(self) -> Dict[str, Any]:
"""Create yaml representation of the Stack Model.
Returns:
The yaml representation of the Stack Model.
"""
component_data = {}
for component_type, components_list in self.components.items():
component = components_list[0]
component_dict = json.loads(
component.json(
include={"name", "type", "flavor", "configuration"}
)
)
component_data[component_type.value] = component_dict
# write zenml version and stack dict to YAML
yaml_data = {
"stack_name": self.name,
"components": component_data,
}
return yaml_data
StackUpdateModel (StackRequestModel)
pydantic-model
The update model for stacks.
Source code in zenml/models/stack_models.py
class StackUpdateModel(StackRequestModel):
"""The update model for stacks."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
step_run_models
Models representing steps of pipeline runs.
StepRunBaseModel (BaseModel)
pydantic-model
Base model for step runs.
Source code in zenml/models/step_run_models.py
class StepRunBaseModel(BaseModel):
"""Base model for step runs."""
name: str = Field(
title="The name of the pipeline run step.",
max_length=STR_FIELD_MAX_LENGTH,
)
step: Step
pipeline_run_id: UUID
original_step_run_id: Optional[UUID] = None
status: ExecutionStatus
parent_step_ids: List[UUID] = []
cache_key: Optional[str] = Field(
title="The cache key of the step run.",
default=None,
max_length=STR_FIELD_MAX_LENGTH,
)
docstring: Optional[str] = Field(
title="The docstring of the step function or class.",
default=None,
max_length=TEXT_FIELD_MAX_LENGTH,
)
source_code: Optional[str] = Field(
title="The source code of the step function or class.",
default=None,
max_length=TEXT_FIELD_MAX_LENGTH,
)
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
StepRunFilterModel (WorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced filtering of all Artifacts.
Source code in zenml/models/step_run_models.py
class StepRunFilterModel(WorkspaceScopedFilterModel):
"""Model to enable advanced filtering of all Artifacts."""
name: Optional[str] = Field(
default=None,
description="Name of the step run",
)
entrypoint_name: Optional[str] = Field(
default=None,
description="Entrypoint name of the step run",
)
code_hash: Optional[str] = Field(
default=None,
description="Code hash for this step run",
)
cache_key: Optional[str] = Field(
default=None,
description="Cache key for this step run",
)
status: Optional[str] = Field(
default=None,
description="Status of the Step Run",
)
start_time: Optional[Union[datetime, str]] = Field(
default=None, description="Start time for this run"
)
end_time: Optional[Union[datetime, str]] = Field(
default=None, description="End time for this run"
)
pipeline_run_id: Optional[Union[UUID, str]] = Field(
default=None, description="Pipeline run of this step run"
)
original_step_run_id: Optional[Union[UUID, str]] = Field(
default=None, description="Original id for this step run"
)
user_id: Optional[Union[UUID, str]] = Field(
default=None, description="User that produced this step run"
)
workspace_id: Optional[Union[UUID, str]] = Field(
default=None, description="Workspace of this step run"
)
num_outputs: Optional[int] = Field(
default=None,
description="Amount of outputs for this Step Run",
)
cache_key: str
pydantic-field
Cache key for this step run
code_hash: str
pydantic-field
Code hash for this step run
end_time: Union[datetime.datetime, str]
pydantic-field
End time for this run
entrypoint_name: str
pydantic-field
Entrypoint name of the step run
name: str
pydantic-field
Name of the step run
num_outputs: int
pydantic-field
Amount of outputs for this Step Run
original_step_run_id: Union[uuid.UUID, str]
pydantic-field
Original id for this step run
pipeline_run_id: Union[uuid.UUID, str]
pydantic-field
Pipeline run of this step run
start_time: Union[datetime.datetime, str]
pydantic-field
Start time for this run
status: str
pydantic-field
Status of the Step Run
user_id: Union[uuid.UUID, str]
pydantic-field
User that produced this step run
workspace_id: Union[uuid.UUID, str]
pydantic-field
Workspace of this step run
StepRunRequestModel (StepRunBaseModel, WorkspaceScopedRequestModel)
pydantic-model
Request model for step runs.
Source code in zenml/models/step_run_models.py
class StepRunRequestModel(StepRunBaseModel, WorkspaceScopedRequestModel):
"""Request model for step runs."""
input_artifacts: Dict[str, UUID] = {}
output_artifacts: Dict[str, UUID] = {}
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
StepRunResponseModel (StepRunBaseModel, WorkspaceScopedResponseModel)
pydantic-model
Response model for step runs.
Source code in zenml/models/step_run_models.py
class StepRunResponseModel(StepRunBaseModel, WorkspaceScopedResponseModel):
"""Response model for step runs."""
input_artifacts: Dict[str, "ArtifactResponseModel"] = {}
output_artifacts: Dict[str, "ArtifactResponseModel"] = {}
metadata: Dict[str, "RunMetadataResponseModel"] = Field(
default={},
title="Metadata associated with this step run.",
)
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
StepRunUpdateModel (BaseModel)
pydantic-model
Update model for step runs.
Source code in zenml/models/step_run_models.py
class StepRunUpdateModel(BaseModel):
"""Update model for step runs."""
output_artifacts: Dict[str, UUID] = {}
status: Optional[ExecutionStatus] = None
end_time: Optional[datetime] = None
team_models
Models representing teams.
TeamBaseModel (BaseModel)
pydantic-model
Base model for teams.
Source code in zenml/models/team_models.py
class TeamBaseModel(BaseModel):
"""Base model for teams."""
name: str = Field(
title="The unique name of the team.",
max_length=STR_FIELD_MAX_LENGTH,
)
TeamFilterModel (BaseFilterModel)
pydantic-model
Model to enable advanced filtering of all Teams.
Source code in zenml/models/team_models.py
class TeamFilterModel(BaseFilterModel):
"""Model to enable advanced filtering of all Teams."""
name: Optional[str] = Field(
default=None,
description="Name of the team",
)
name: str
pydantic-field
Name of the team
TeamRequestModel (TeamBaseModel, BaseRequestModel)
pydantic-model
Request model for teams.
Source code in zenml/models/team_models.py
class TeamRequestModel(TeamBaseModel, BaseRequestModel):
"""Request model for teams."""
users: Optional[List[UUID]] = Field(
default=None, title="The list of users within this team."
)
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
TeamResponseModel (TeamBaseModel, BaseResponseModel)
pydantic-model
Response model for teams.
Source code in zenml/models/team_models.py
class TeamResponseModel(TeamBaseModel, BaseResponseModel):
"""Response model for teams."""
users: List["UserResponseModel"] = Field(
title="The list of users within this team."
)
@property
def user_ids(self) -> List[UUID]:
"""Returns a list of user IDs that are part of this team.
Returns:
A list of user IDs.
"""
if self.users:
return [u.id for u in self.users]
else:
return []
@property
def user_names(self) -> List[str]:
"""Returns a list names of users that are part of this team.
Returns:
A list of names of users.
"""
if self.users:
return [u.name for u in self.users]
else:
return []
user_ids: List[uuid.UUID]
property
readonly
Returns a list of user IDs that are part of this team.
Returns:
Type | Description |
---|---|
List[uuid.UUID] |
A list of user IDs. |
user_names: List[str]
property
readonly
Returns a list names of users that are part of this team.
Returns:
Type | Description |
---|---|
List[str] |
A list of names of users. |
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
TeamUpdateModel (TeamRequestModel)
pydantic-model
Update model for teams.
Source code in zenml/models/team_models.py
class TeamUpdateModel(TeamRequestModel):
"""Update model for teams."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
team_role_assignment_models
Models representing role assignments.
TeamRoleAssignmentBaseModel (BaseModel)
pydantic-model
Base model for role assignments.
Source code in zenml/models/team_role_assignment_models.py
class TeamRoleAssignmentBaseModel(BaseModel):
"""Base model for role assignments."""
TeamRoleAssignmentFilterModel (BaseFilterModel)
pydantic-model
Model to enable advanced filtering of all Role Assignments.
Source code in zenml/models/team_role_assignment_models.py
class TeamRoleAssignmentFilterModel(BaseFilterModel):
"""Model to enable advanced filtering of all Role Assignments."""
workspace_id: Optional[Union[UUID, str]] = Field(
default=None, description="Workspace of the RoleAssignment"
)
team_id: Optional[Union[UUID, str]] = Field(
default=None, description="Team in the RoleAssignment"
)
role_id: Optional[Union[UUID, str]] = Field(
default=None, description="Role in the RoleAssignment"
)
role_id: Union[uuid.UUID, str]
pydantic-field
Role in the RoleAssignment
team_id: Union[uuid.UUID, str]
pydantic-field
Team in the RoleAssignment
workspace_id: Union[uuid.UUID, str]
pydantic-field
Workspace of the RoleAssignment
TeamRoleAssignmentRequestModel (TeamRoleAssignmentBaseModel, BaseRequestModel)
pydantic-model
Request model for role assignments using UUIDs for all entities.
Source code in zenml/models/team_role_assignment_models.py
class TeamRoleAssignmentRequestModel(
TeamRoleAssignmentBaseModel, BaseRequestModel
):
"""Request model for role assignments using UUIDs for all entities."""
workspace: Optional[UUID] = Field(
default=None, title="The workspace that the role is limited to."
)
team: UUID = Field(
title="The user that the role is assigned to.",
)
role: UUID = Field(title="The role.")
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
TeamRoleAssignmentResponseModel (TeamRoleAssignmentBaseModel, BaseResponseModel)
pydantic-model
Response model for role assignments with all entities hydrated.
Source code in zenml/models/team_role_assignment_models.py
class TeamRoleAssignmentResponseModel(
TeamRoleAssignmentBaseModel, BaseResponseModel
):
"""Response model for role assignments with all entities hydrated."""
workspace: Optional["WorkspaceResponseModel"] = Field(
title="The workspace scope of this role assignment.", default=None
)
team: Optional["TeamResponseModel"] = Field(
title="The team the role is assigned to.", default=None
)
role: Optional["RoleResponseModel"] = Field(
title="The assigned role.", default=None
)
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
user_models
Models representing users.
JWTToken (BaseModel)
pydantic-model
Pydantic object representing a JWT token.
Attributes:
Name | Type | Description |
---|---|---|
token_type |
JWTTokenType |
The type of token. |
user_id |
UUID |
The id of the authenticated User |
permissions |
List[str] |
The permissions scope of the authenticated user |
Source code in zenml/models/user_models.py
class JWTToken(BaseModel):
"""Pydantic object representing a JWT token.
Attributes:
token_type: The type of token.
user_id: The id of the authenticated User
permissions: The permissions scope of the authenticated user
"""
JWT_ALGORITHM: ClassVar[str] = "HS256"
token_type: JWTTokenType
user_id: UUID
permissions: List[str]
@classmethod
def decode(cls, token_type: JWTTokenType, token: str) -> "JWTToken":
"""Decodes a JWT access token.
Decodes a JWT access token and returns a `JWTToken` object with the
information retrieved from its subject claim.
Args:
token_type: The type of token.
token: The encoded JWT token.
Returns:
The decoded JWT access token.
Raises:
AuthorizationException: If the token is invalid.
"""
# import here to keep these dependencies out of the client
from jose import JWTError, jwt
try:
payload = jwt.decode(
token,
GlobalConfiguration().jwt_secret_key,
algorithms=[cls.JWT_ALGORITHM],
)
except JWTError as e:
raise AuthorizationException(f"Invalid JWT token: {e}") from e
subject: str = payload.get("sub")
if subject is None:
raise AuthorizationException(
"Invalid JWT token: the subject claim is missing"
)
permissions: List[str] = payload.get("permissions")
if permissions is None:
raise AuthorizationException(
"Invalid JWT token: the permissions scope is missing"
)
try:
return cls(
token_type=token_type,
user_id=UUID(subject),
permissions=set(permissions),
)
except ValueError as e:
raise AuthorizationException(
f"Invalid JWT token: could not decode subject claim: {e}"
) from e
def encode(self, expire_minutes: Optional[int] = None) -> str:
"""Creates a JWT access token.
Generates and returns a JWT access token with the subject claim set to
contain the information in this Pydantic object.
Args:
expire_minutes: Number of minutes the token should be valid. If not
provided, the token will not be set to expire.
Returns:
The generated access token.
"""
# import here to keep these dependencies out of the client
from jose import jwt
claims: Dict[str, Any] = {
"sub": str(self.user_id),
"permissions": list(self.permissions),
}
if expire_minutes:
expire = datetime.utcnow() + timedelta(minutes=expire_minutes)
claims["exp"] = expire
token: str = jwt.encode(
claims,
GlobalConfiguration().jwt_secret_key,
algorithm=self.JWT_ALGORITHM,
)
return token
decode(token_type, token)
classmethod
Decodes a JWT access token.
Decodes a JWT access token and returns a JWTToken
object with the
information retrieved from its subject claim.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
token_type |
JWTTokenType |
The type of token. |
required |
token |
str |
The encoded JWT token. |
required |
Returns:
Type | Description |
---|---|
JWTToken |
The decoded JWT access token. |
Exceptions:
Type | Description |
---|---|
AuthorizationException |
If the token is invalid. |
Source code in zenml/models/user_models.py
@classmethod
def decode(cls, token_type: JWTTokenType, token: str) -> "JWTToken":
"""Decodes a JWT access token.
Decodes a JWT access token and returns a `JWTToken` object with the
information retrieved from its subject claim.
Args:
token_type: The type of token.
token: The encoded JWT token.
Returns:
The decoded JWT access token.
Raises:
AuthorizationException: If the token is invalid.
"""
# import here to keep these dependencies out of the client
from jose import JWTError, jwt
try:
payload = jwt.decode(
token,
GlobalConfiguration().jwt_secret_key,
algorithms=[cls.JWT_ALGORITHM],
)
except JWTError as e:
raise AuthorizationException(f"Invalid JWT token: {e}") from e
subject: str = payload.get("sub")
if subject is None:
raise AuthorizationException(
"Invalid JWT token: the subject claim is missing"
)
permissions: List[str] = payload.get("permissions")
if permissions is None:
raise AuthorizationException(
"Invalid JWT token: the permissions scope is missing"
)
try:
return cls(
token_type=token_type,
user_id=UUID(subject),
permissions=set(permissions),
)
except ValueError as e:
raise AuthorizationException(
f"Invalid JWT token: could not decode subject claim: {e}"
) from e
encode(self, expire_minutes=None)
Creates a JWT access token.
Generates and returns a JWT access token with the subject claim set to contain the information in this Pydantic object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
expire_minutes |
Optional[int] |
Number of minutes the token should be valid. If not provided, the token will not be set to expire. |
None |
Returns:
Type | Description |
---|---|
str |
The generated access token. |
Source code in zenml/models/user_models.py
def encode(self, expire_minutes: Optional[int] = None) -> str:
"""Creates a JWT access token.
Generates and returns a JWT access token with the subject claim set to
contain the information in this Pydantic object.
Args:
expire_minutes: Number of minutes the token should be valid. If not
provided, the token will not be set to expire.
Returns:
The generated access token.
"""
# import here to keep these dependencies out of the client
from jose import jwt
claims: Dict[str, Any] = {
"sub": str(self.user_id),
"permissions": list(self.permissions),
}
if expire_minutes:
expire = datetime.utcnow() + timedelta(minutes=expire_minutes)
claims["exp"] = expire
token: str = jwt.encode(
claims,
GlobalConfiguration().jwt_secret_key,
algorithm=self.JWT_ALGORITHM,
)
return token
JWTTokenType (StrEnum)
The type of JWT token.
Source code in zenml/models/user_models.py
class JWTTokenType(StrEnum):
"""The type of JWT token."""
ACCESS_TOKEN = "access_token"
UserAuthModel (UserBaseModel, BaseResponseModel)
pydantic-model
Authentication Model for the User.
This model is only used server-side. The server endpoints can use this model to authenticate the user credentials (Token, Password).
Source code in zenml/models/user_models.py
class UserAuthModel(UserBaseModel, BaseResponseModel):
"""Authentication Model for the User.
This model is only used server-side. The server endpoints can use this model
to authenticate the user credentials (Token, Password).
"""
active: bool = Field(default=False, title="Active account.")
activation_token: Optional[SecretStr] = Field(default=None, exclude=True)
password: Optional[SecretStr] = Field(default=None, exclude=True)
teams: Optional[List["TeamResponseModel"]] = Field(
default=None, title="The list of teams for this user."
)
def generate_access_token(self, permissions: List[str]) -> str:
"""Generates an access token.
Generates an access token and returns it.
Args:
permissions: Permissions to add to the token
Returns:
The generated access token.
"""
return JWTToken(
token_type=JWTTokenType.ACCESS_TOKEN,
user_id=self.id,
permissions=permissions,
).encode()
@classmethod
def _is_hashed_secret(cls, secret: SecretStr) -> bool:
"""Checks if a secret value is already hashed.
Args:
secret: The secret value to check.
Returns:
True if the secret value is hashed, otherwise False.
"""
return (
re.match(r"^\$2[ayb]\$.{56}$", secret.get_secret_value())
is not None
)
@classmethod
def _get_hashed_secret(cls, secret: Optional[SecretStr]) -> Optional[str]:
"""Hashes the input secret and returns the hash value.
Only applied if supplied and if not already hashed.
Args:
secret: The secret value to hash.
Returns:
The secret hash value, or None if no secret was supplied.
"""
if secret is None:
return None
if cls._is_hashed_secret(secret):
return secret.get_secret_value()
pwd_context = cls._get_crypt_context()
return cast(str, pwd_context.hash(secret.get_secret_value()))
def get_password(self) -> Optional[str]:
"""Get the password.
Returns:
The password as a plain string, if it exists.
"""
if self.password is None:
return None
return self.password.get_secret_value()
def get_hashed_password(self) -> Optional[str]:
"""Returns the hashed password, if configured.
Returns:
The hashed password.
"""
return self._get_hashed_secret(self.password)
def get_hashed_activation_token(self) -> Optional[str]:
"""Returns the hashed activation token, if configured.
Returns:
The hashed activation token.
"""
return self._get_hashed_secret(self.activation_token)
@classmethod
def verify_password(
cls, plain_password: str, user: Optional["UserAuthModel"] = None
) -> bool:
"""Verifies a given plain password against the stored password.
Args:
plain_password: Input password to be verified.
user: User for which the password is to be verified.
Returns:
True if the passwords match.
"""
# even when the user or password is not set, we still want to execute
# the password hash verification to protect against response discrepancy
# attacks (https://cwe.mitre.org/data/definitions/204.html)
password_hash: Optional[str] = None
if user is not None and user.password is not None: # and user.active:
password_hash = user.get_hashed_password()
pwd_context = cls._get_crypt_context()
return cast(bool, pwd_context.verify(plain_password, password_hash))
@classmethod
def verify_access_token(cls, token: str) -> Optional["UserAuthModel"]:
"""Verifies an access token.
Verifies an access token and returns the user that was used to generate
it if the token is valid and None otherwise.
Args:
token: The access token to verify.
Returns:
The user that generated the token if valid, None otherwise.
"""
try:
access_token = JWTToken.decode(
token_type=JWTTokenType.ACCESS_TOKEN, token=token
)
except AuthorizationException:
return None
zen_store = GlobalConfiguration().zen_store
try:
user = zen_store.get_auth_user(
user_name_or_id=access_token.user_id
)
except KeyError:
return None
else:
if user.active:
return user
return None
@classmethod
def verify_activation_token(
cls, activation_token: str, user: Optional["UserAuthModel"] = None
) -> bool:
"""Verifies a given activation token against the stored token.
Args:
activation_token: Input activation token to be verified.
user: User for which the activation token is to be verified.
Returns:
True if the token is valid.
"""
# even when the user or token is not set, we still want to execute the
# token hash verification to protect against response discrepancy
# attacks (https://cwe.mitre.org/data/definitions/204.html)
token_hash: Optional[str] = None
if (
user is not None
and user.activation_token is not None
and not user.active
):
token_hash = user.get_hashed_activation_token()
pwd_context = cls._get_crypt_context()
return cast(bool, pwd_context.verify(activation_token, token_hash))
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
generate_access_token(self, permissions)
Generates an access token.
Generates an access token and returns it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
permissions |
List[str] |
Permissions to add to the token |
required |
Returns:
Type | Description |
---|---|
str |
The generated access token. |
Source code in zenml/models/user_models.py
def generate_access_token(self, permissions: List[str]) -> str:
"""Generates an access token.
Generates an access token and returns it.
Args:
permissions: Permissions to add to the token
Returns:
The generated access token.
"""
return JWTToken(
token_type=JWTTokenType.ACCESS_TOKEN,
user_id=self.id,
permissions=permissions,
).encode()
get_hashed_activation_token(self)
Returns the hashed activation token, if configured.
Returns:
Type | Description |
---|---|
Optional[str] |
The hashed activation token. |
Source code in zenml/models/user_models.py
def get_hashed_activation_token(self) -> Optional[str]:
"""Returns the hashed activation token, if configured.
Returns:
The hashed activation token.
"""
return self._get_hashed_secret(self.activation_token)
get_hashed_password(self)
Returns the hashed password, if configured.
Returns:
Type | Description |
---|---|
Optional[str] |
The hashed password. |
Source code in zenml/models/user_models.py
def get_hashed_password(self) -> Optional[str]:
"""Returns the hashed password, if configured.
Returns:
The hashed password.
"""
return self._get_hashed_secret(self.password)
get_password(self)
Get the password.
Returns:
Type | Description |
---|---|
Optional[str] |
The password as a plain string, if it exists. |
Source code in zenml/models/user_models.py
def get_password(self) -> Optional[str]:
"""Get the password.
Returns:
The password as a plain string, if it exists.
"""
if self.password is None:
return None
return self.password.get_secret_value()
verify_access_token(token)
classmethod
Verifies an access token.
Verifies an access token and returns the user that was used to generate it if the token is valid and None otherwise.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
token |
str |
The access token to verify. |
required |
Returns:
Type | Description |
---|---|
Optional[UserAuthModel] |
The user that generated the token if valid, None otherwise. |
Source code in zenml/models/user_models.py
@classmethod
def verify_access_token(cls, token: str) -> Optional["UserAuthModel"]:
"""Verifies an access token.
Verifies an access token and returns the user that was used to generate
it if the token is valid and None otherwise.
Args:
token: The access token to verify.
Returns:
The user that generated the token if valid, None otherwise.
"""
try:
access_token = JWTToken.decode(
token_type=JWTTokenType.ACCESS_TOKEN, token=token
)
except AuthorizationException:
return None
zen_store = GlobalConfiguration().zen_store
try:
user = zen_store.get_auth_user(
user_name_or_id=access_token.user_id
)
except KeyError:
return None
else:
if user.active:
return user
return None
verify_activation_token(activation_token, user=None)
classmethod
Verifies a given activation token against the stored token.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
activation_token |
str |
Input activation token to be verified. |
required |
user |
Optional[UserAuthModel] |
User for which the activation token is to be verified. |
None |
Returns:
Type | Description |
---|---|
bool |
True if the token is valid. |
Source code in zenml/models/user_models.py
@classmethod
def verify_activation_token(
cls, activation_token: str, user: Optional["UserAuthModel"] = None
) -> bool:
"""Verifies a given activation token against the stored token.
Args:
activation_token: Input activation token to be verified.
user: User for which the activation token is to be verified.
Returns:
True if the token is valid.
"""
# even when the user or token is not set, we still want to execute the
# token hash verification to protect against response discrepancy
# attacks (https://cwe.mitre.org/data/definitions/204.html)
token_hash: Optional[str] = None
if (
user is not None
and user.activation_token is not None
and not user.active
):
token_hash = user.get_hashed_activation_token()
pwd_context = cls._get_crypt_context()
return cast(bool, pwd_context.verify(activation_token, token_hash))
verify_password(plain_password, user=None)
classmethod
Verifies a given plain password against the stored password.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
plain_password |
str |
Input password to be verified. |
required |
user |
Optional[UserAuthModel] |
User for which the password is to be verified. |
None |
Returns:
Type | Description |
---|---|
bool |
True if the passwords match. |
Source code in zenml/models/user_models.py
@classmethod
def verify_password(
cls, plain_password: str, user: Optional["UserAuthModel"] = None
) -> bool:
"""Verifies a given plain password against the stored password.
Args:
plain_password: Input password to be verified.
user: User for which the password is to be verified.
Returns:
True if the passwords match.
"""
# even when the user or password is not set, we still want to execute
# the password hash verification to protect against response discrepancy
# attacks (https://cwe.mitre.org/data/definitions/204.html)
password_hash: Optional[str] = None
if user is not None and user.password is not None: # and user.active:
password_hash = user.get_hashed_password()
pwd_context = cls._get_crypt_context()
return cast(bool, pwd_context.verify(plain_password, password_hash))
UserBaseModel (BaseModel)
pydantic-model
Base model for users.
Source code in zenml/models/user_models.py
class UserBaseModel(BaseModel):
"""Base model for users."""
name: str = Field(
title="The unique username for the account.",
max_length=STR_FIELD_MAX_LENGTH,
)
full_name: str = Field(
default="",
title="The full name for the account owner.",
max_length=STR_FIELD_MAX_LENGTH,
)
email_opted_in: Optional[bool] = Field(
default=None,
title="Whether the user agreed to share their email.",
description="`null` if not answered, `true` if agreed, "
"`false` if skipped.",
)
hub_token: Optional[str] = Field(
default=None,
title="JWT Token for the connected Hub account.",
max_length=STR_FIELD_MAX_LENGTH,
)
active: bool = Field(default=False, title="Active account.")
@classmethod
def _get_crypt_context(cls) -> "CryptContext":
"""Returns the password encryption context.
Returns:
The password encryption context.
"""
from passlib.context import CryptContext
return CryptContext(schemes=["bcrypt"], deprecated="auto")
email_opted_in: bool
pydantic-field
null
if not answered, true
if agreed, false
if skipped.
UserFilterModel (BaseFilterModel)
pydantic-model
Model to enable advanced filtering of all Users.
Source code in zenml/models/user_models.py
class UserFilterModel(BaseFilterModel):
"""Model to enable advanced filtering of all Users."""
name: Optional[str] = Field(
default=None,
description="Name of the user",
)
full_name: Optional[str] = Field(
default=None,
description="Full Name of the user",
)
email: Optional[str] = Field(
default=None,
description="Full Name of the user",
)
active: Optional[Union[bool, str]] = Field(
default=None,
description="Full Name of the user",
)
email_opted_in: Optional[Union[bool, str]] = Field(
default=None,
description="Full Name of the user",
)
active: Union[bool, str]
pydantic-field
Full Name of the user
email: str
pydantic-field
Full Name of the user
email_opted_in: Union[bool, str]
pydantic-field
Full Name of the user
full_name: str
pydantic-field
Full Name of the user
name: str
pydantic-field
Name of the user
UserRequestModel (UserBaseModel, BaseRequestModel)
pydantic-model
Request model for users.
This model is used to create a user. The email field is optional but is more commonly set on the UpdateRequestModel which inherits from this model. Users can also optionally set their password during creation.
Source code in zenml/models/user_models.py
class UserRequestModel(UserBaseModel, BaseRequestModel):
"""Request model for users.
This model is used to create a user. The email field is optional but is
more commonly set on the UpdateRequestModel which inherits from this model.
Users can also optionally set their password during creation.
"""
ANALYTICS_FIELDS: ClassVar[List[str]] = [
"name",
"full_name",
"active",
"email_opted_in",
]
email: Optional[str] = Field(
default=None,
title="The email address associated with the account.",
max_length=STR_FIELD_MAX_LENGTH,
)
password: Optional[str] = Field(
default=None,
title="A password for the user.",
max_length=STR_FIELD_MAX_LENGTH,
)
activation_token: Optional[str] = Field(
default=None, max_length=STR_FIELD_MAX_LENGTH
)
class Config:
"""Pydantic configuration class."""
# Validate attributes when assigning them
validate_assignment = True
# Forbid extra attributes to prevent unexpected behavior
extra = "forbid"
underscore_attrs_are_private = True
@classmethod
def _create_hashed_secret(cls, secret: Optional[str]) -> Optional[str]:
"""Hashes the input secret and returns the hash value.
Only applied if supplied and if not already hashed.
Args:
secret: The secret value to hash.
Returns:
The secret hash value, or None if no secret was supplied.
"""
if secret is None:
return None
pwd_context = cls._get_crypt_context()
return cast(str, pwd_context.hash(secret))
def create_hashed_password(self) -> Optional[str]:
"""Hashes the password.
Returns:
The hashed password.
"""
return self._create_hashed_secret(self.password)
def create_hashed_activation_token(self) -> Optional[str]:
"""Hashes the activation token.
Returns:
The hashed activation token.
"""
return self._create_hashed_secret(self.activation_token)
def generate_activation_token(self) -> str:
"""Generates and stores a new activation token.
Returns:
The generated activation token.
"""
self.activation_token = token_hex(32)
return self.activation_token
Config
Pydantic configuration class.
Source code in zenml/models/user_models.py
class Config:
"""Pydantic configuration class."""
# Validate attributes when assigning them
validate_assignment = True
# Forbid extra attributes to prevent unexpected behavior
extra = "forbid"
underscore_attrs_are_private = True
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
create_hashed_activation_token(self)
Hashes the activation token.
Returns:
Type | Description |
---|---|
Optional[str] |
The hashed activation token. |
Source code in zenml/models/user_models.py
def create_hashed_activation_token(self) -> Optional[str]:
"""Hashes the activation token.
Returns:
The hashed activation token.
"""
return self._create_hashed_secret(self.activation_token)
create_hashed_password(self)
Hashes the password.
Returns:
Type | Description |
---|---|
Optional[str] |
The hashed password. |
Source code in zenml/models/user_models.py
def create_hashed_password(self) -> Optional[str]:
"""Hashes the password.
Returns:
The hashed password.
"""
return self._create_hashed_secret(self.password)
generate_activation_token(self)
Generates and stores a new activation token.
Returns:
Type | Description |
---|---|
str |
The generated activation token. |
Source code in zenml/models/user_models.py
def generate_activation_token(self) -> str:
"""Generates and stores a new activation token.
Returns:
The generated activation token.
"""
self.activation_token = token_hex(32)
return self.activation_token
UserResponseModel (UserBaseModel, BaseResponseModel)
pydantic-model
Response model for users.
This returns the activation_token (which is required for the user-invitation-flow of the frontend. This also optionally includes the team the user is a part of. The email is returned optionally as well for use by the analytics on the client-side.
Source code in zenml/models/user_models.py
class UserResponseModel(UserBaseModel, BaseResponseModel):
"""Response model for users.
This returns the activation_token (which is required for the
user-invitation-flow of the frontend. This also optionally includes the
team the user is a part of. The email is returned optionally as well
for use by the analytics on the client-side.
"""
ANALYTICS_FIELDS: ClassVar[List[str]] = [
"name",
"full_name",
"active",
"email_opted_in",
]
activation_token: Optional[str] = Field(
default=None, max_length=STR_FIELD_MAX_LENGTH
)
teams: Optional[List["TeamResponseModel"]] = Field(
default=None, title="The list of teams for this user."
)
roles: Optional[List["RoleResponseModel"]] = Field(
default=None, title="The list of roles for this user."
)
email: Optional[str] = Field(
default="",
title="The email address associated with the account.",
max_length=STR_FIELD_MAX_LENGTH,
)
def generate_access_token(self, permissions: List[str]) -> str:
"""Generates an access token.
Generates an access token and returns it.
Args:
permissions: Permissions to add to the token
Returns:
The generated access token.
"""
return JWTToken(
token_type=JWTTokenType.ACCESS_TOKEN,
user_id=self.id,
permissions=permissions,
).encode()
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
generate_access_token(self, permissions)
Generates an access token.
Generates an access token and returns it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
permissions |
List[str] |
Permissions to add to the token |
required |
Returns:
Type | Description |
---|---|
str |
The generated access token. |
Source code in zenml/models/user_models.py
def generate_access_token(self, permissions: List[str]) -> str:
"""Generates an access token.
Generates an access token and returns it.
Args:
permissions: Permissions to add to the token
Returns:
The generated access token.
"""
return JWTToken(
token_type=JWTTokenType.ACCESS_TOKEN,
user_id=self.id,
permissions=permissions,
).encode()
UserUpdateModel (UserRequestModel)
pydantic-model
Update model for users.
Source code in zenml/models/user_models.py
class UserUpdateModel(UserRequestModel):
"""Update model for users."""
@root_validator
def user_email_updates(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Validate that the UserUpdateModel conforms to the email-opt-in-flow.
Args:
values: The values to validate.
Returns:
The validated values.
Raises:
ValueError: If the email was not provided when the email_opted_in
field was set to True.
"""
# When someone sets the email, or updates the email and hasn't
# before explicitly opted out, they are opted in
if values["email"] is not None:
if values["email_opted_in"] is None:
values["email_opted_in"] = True
# It should not be possible to do opt in without an email
if values["email_opted_in"] is True:
if values["email"] is None:
raise ValueError(
"Please provide an email, when you are opting-in with "
"your email."
)
return values
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
user_email_updates(values)
classmethod
Validate that the UserUpdateModel conforms to the email-opt-in-flow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
values |
Dict[str, Any] |
The values to validate. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The validated values. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the email was not provided when the email_opted_in field was set to True. |
Source code in zenml/models/user_models.py
@root_validator
def user_email_updates(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Validate that the UserUpdateModel conforms to the email-opt-in-flow.
Args:
values: The values to validate.
Returns:
The validated values.
Raises:
ValueError: If the email was not provided when the email_opted_in
field was set to True.
"""
# When someone sets the email, or updates the email and hasn't
# before explicitly opted out, they are opted in
if values["email"] is not None:
if values["email_opted_in"] is None:
values["email_opted_in"] = True
# It should not be possible to do opt in without an email
if values["email_opted_in"] is True:
if values["email"] is None:
raise ValueError(
"Please provide an email, when you are opting-in with "
"your email."
)
return values
user_role_assignment_models
Models representing role assignments.
UserRoleAssignmentBaseModel (BaseModel)
pydantic-model
Base model for role assignments.
Source code in zenml/models/user_role_assignment_models.py
class UserRoleAssignmentBaseModel(BaseModel):
"""Base model for role assignments."""
UserRoleAssignmentFilterModel (BaseFilterModel)
pydantic-model
Model to enable advanced filtering of all Role Assignments.
Source code in zenml/models/user_role_assignment_models.py
class UserRoleAssignmentFilterModel(BaseFilterModel):
"""Model to enable advanced filtering of all Role Assignments."""
workspace_id: Optional[Union[UUID, str]] = Field(
default=None, description="Workspace of the RoleAssignment"
)
user_id: Optional[Union[UUID, str]] = Field(
default=None, description="User in the RoleAssignment"
)
role_id: Optional[Union[UUID, str]] = Field(
default=None, description="Role in the RoleAssignment"
)
role_id: Union[uuid.UUID, str]
pydantic-field
Role in the RoleAssignment
user_id: Union[uuid.UUID, str]
pydantic-field
User in the RoleAssignment
workspace_id: Union[uuid.UUID, str]
pydantic-field
Workspace of the RoleAssignment
UserRoleAssignmentRequestModel (UserRoleAssignmentBaseModel, BaseRequestModel)
pydantic-model
Request model for role assignments using UUIDs for all entities.
Source code in zenml/models/user_role_assignment_models.py
class UserRoleAssignmentRequestModel(
UserRoleAssignmentBaseModel, BaseRequestModel
):
"""Request model for role assignments using UUIDs for all entities."""
workspace: Optional[UUID] = Field(
default=None,
title="The workspace that the role is limited to.",
)
user: UUID = Field(title="The user that the role is assigned to.")
role: UUID = Field(title="The role.")
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
UserRoleAssignmentResponseModel (UserRoleAssignmentBaseModel, BaseResponseModel)
pydantic-model
Response model for role assignments with all entities hydrated.
Source code in zenml/models/user_role_assignment_models.py
class UserRoleAssignmentResponseModel(
UserRoleAssignmentBaseModel, BaseResponseModel
):
"""Response model for role assignments with all entities hydrated."""
workspace: Optional["WorkspaceResponseModel"] = Field(
title="The workspace scope of this role assignment.", default=None
)
user: Optional["UserResponseModel"] = Field(
title="The user the role is assigned to.", default=None
)
role: Optional["RoleResponseModel"] = Field(
title="The assigned role.", default=None
)
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
visualization_models
Models representing visualizations.
BaseVisualizationModel (BaseModel)
pydantic-model
Base model for visualizations.
Source code in zenml/models/visualization_models.py
class BaseVisualizationModel(BaseModel):
"""Base model for visualizations."""
type: VisualizationType
LoadedVisualizationModel (BaseVisualizationModel)
pydantic-model
Model for loaded visualization.
Source code in zenml/models/visualization_models.py
class LoadedVisualizationModel(BaseVisualizationModel):
"""Model for loaded visualization."""
value: Union[str, bytes]
VisualizationModel (BaseVisualizationModel)
pydantic-model
Model for unloaded visualization.
Source code in zenml/models/visualization_models.py
class VisualizationModel(BaseVisualizationModel):
"""Model for unloaded visualization."""
uri: str
workspace_models
Models representing workspaces.
WorkspaceBaseModel (BaseModel)
pydantic-model
Base model for workspaces.
Source code in zenml/models/workspace_models.py
class WorkspaceBaseModel(BaseModel):
"""Base model for workspaces."""
name: str = Field(
title="The unique name of the workspace.",
max_length=STR_FIELD_MAX_LENGTH,
)
description: str = Field(
default="",
title="The description of the workspace.",
max_length=STR_FIELD_MAX_LENGTH,
)
WorkspaceFilterModel (BaseFilterModel)
pydantic-model
Model to enable advanced filtering of all Workspaces.
Source code in zenml/models/workspace_models.py
class WorkspaceFilterModel(BaseFilterModel):
"""Model to enable advanced filtering of all Workspaces."""
name: Optional[str] = Field(
default=None,
description="Name of the workspace",
)
name: str
pydantic-field
Name of the workspace
WorkspaceRequestModel (WorkspaceBaseModel, BaseRequestModel)
pydantic-model
Request model for workspaces.
Source code in zenml/models/workspace_models.py
class WorkspaceRequestModel(WorkspaceBaseModel, BaseRequestModel):
"""Request model for workspaces."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
WorkspaceResponseModel (WorkspaceBaseModel, BaseResponseModel)
pydantic-model
Response model for workspaces.
Source code in zenml/models/workspace_models.py
class WorkspaceResponseModel(WorkspaceBaseModel, BaseResponseModel):
"""Response model for workspaces."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
WorkspaceUpdateModel (WorkspaceRequestModel)
pydantic-model
Update model for workspaces.
Source code in zenml/models/workspace_models.py
class WorkspaceUpdateModel(WorkspaceRequestModel):
"""Update model for workspaces."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.