Models
zenml.models
special
Pydantic models for the various concepts in ZenML.
artifact_models
Models representing artifacts.
ArtifactBaseModel (BaseModel)
pydantic-model
Base model for artifacts.
Source code in zenml/models/artifact_models.py
class ArtifactBaseModel(BaseModel):
"""Base model for artifacts."""
name: str = Field(
title="Name of the output in the parent step.",
max_length=STR_FIELD_MAX_LENGTH,
)
artifact_store_id: Optional[UUID] = Field(
title="ID of the artifact store in which this artifact is stored.",
default=None,
)
type: ArtifactType = Field(title="Type of the artifact.")
uri: str = Field(
title="URI of the artifact.", max_length=STR_FIELD_MAX_LENGTH
)
materializer: Source = Field(
title="Materializer class to use for this artifact.",
)
data_type: Source = Field(
title="Data type of the artifact.",
)
visualizations: Optional[List[VisualizationModel]] = Field(
default=None, title="Visualizations of the artifact."
)
_convert_source = convert_source_validator("materializer", "data_type")
ArtifactFilterModel (WorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced filtering of all Artifacts.
Source code in zenml/models/artifact_models.py
class ArtifactFilterModel(WorkspaceScopedFilterModel):
"""Model to enable advanced filtering of all Artifacts."""
# `only_unused` refers to a property of the artifacts relationship
# rather than a field in the db, hence it needs to be handled
# explicitly
FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*WorkspaceScopedFilterModel.FILTER_EXCLUDE_FIELDS,
"only_unused",
]
name: Optional[str] = Field(
default=None,
description="Name of the artifact",
)
uri: Optional[str] = Field(
default=None,
description="Uri of the artifact",
)
materializer: Optional[str] = Field(
default=None,
description="Materializer used to produce the artifact",
)
type: Optional[str] = Field(
default=None,
description="Type of the artifact",
)
data_type: Optional[str] = Field(
default=None,
description="Datatype of the artifact",
)
artifact_store_id: Optional[Union[UUID, str]] = Field(
default=None, description="Artifact store for this artifact"
)
workspace_id: Optional[Union[UUID, str]] = Field(
default=None, description="Workspace for this artifact"
)
user_id: Optional[Union[UUID, str]] = Field(
default=None, description="User that produced this artifact"
)
only_unused: Optional[bool] = Field(
default=False, description="Filter only for unused artifacts"
)
artifact_store_id: Union[uuid.UUID, str]
pydantic-field
Artifact store for this artifact
data_type: str
pydantic-field
Datatype of the artifact
materializer: str
pydantic-field
Materializer used to produce the artifact
name: str
pydantic-field
Name of the artifact
only_unused: bool
pydantic-field
Filter only for unused artifacts
type: str
pydantic-field
Type of the artifact
uri: str
pydantic-field
Uri of the artifact
user_id: Union[uuid.UUID, str]
pydantic-field
User that produced this artifact
workspace_id: Union[uuid.UUID, str]
pydantic-field
Workspace for this artifact
ArtifactRequestModel (ArtifactBaseModel, WorkspaceScopedRequestModel)
pydantic-model
Request model for artifacts.
Source code in zenml/models/artifact_models.py
class ArtifactRequestModel(ArtifactBaseModel, WorkspaceScopedRequestModel):
"""Request model for artifacts."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
ArtifactResponseModel (ArtifactBaseModel, WorkspaceScopedResponseModel)
pydantic-model
Response model for artifacts.
Source code in zenml/models/artifact_models.py
class ArtifactResponseModel(ArtifactBaseModel, WorkspaceScopedResponseModel):
"""Response model for artifacts."""
producer_step_run_id: Optional[UUID] = Field(
title="ID of the step run that produced this artifact.",
default=None,
)
metadata: Dict[str, "RunMetadataResponseModel"] = Field(
default={}, title="Metadata of the artifact."
)
@property
def step(self) -> "StepRunResponseModel":
"""Get the step that produced this artifact.
Returns:
The step that produced this artifact.
"""
from zenml.utils.artifact_utils import get_producer_step_of_artifact
return get_producer_step_of_artifact(self)
@property
def run(self) -> "PipelineRunResponseModel":
"""Get the pipeline run that produced this artifact.
Returns:
The pipeline run that produced this artifact.
"""
return self.step.run
def load(self) -> Any:
"""Materializes (loads) the data stored in this artifact.
Returns:
The materialized data.
"""
from zenml.utils.artifact_utils import load_artifact
return load_artifact(self)
def read(self) -> Any:
"""(Deprecated) Materializes (loads) the data stored in this artifact.
Returns:
The materialized data.
"""
logger.warning(
"`artifact.read()` is deprecated and will be removed in a future "
"release. Please use `artifact.load()` instead."
)
return self.load()
def visualize(self, title: Optional[str] = None) -> None:
"""Visualize the artifact in notebook environments.
Args:
title: Optional title to show before the visualizations.
"""
from zenml.utils.visualization_utils import visualize_artifact
visualize_artifact(self, title=title)
run: PipelineRunResponseModel
property
readonly
Get the pipeline run that produced this artifact.
Returns:
Type | Description |
---|---|
PipelineRunResponseModel |
The pipeline run that produced this artifact. |
step: StepRunResponseModel
property
readonly
Get the step that produced this artifact.
Returns:
Type | Description |
---|---|
StepRunResponseModel |
The step that produced this artifact. |
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
load(self)
Materializes (loads) the data stored in this artifact.
Returns:
Type | Description |
---|---|
Any |
The materialized data. |
Source code in zenml/models/artifact_models.py
def load(self) -> Any:
"""Materializes (loads) the data stored in this artifact.
Returns:
The materialized data.
"""
from zenml.utils.artifact_utils import load_artifact
return load_artifact(self)
read(self)
(Deprecated) Materializes (loads) the data stored in this artifact.
Returns:
Type | Description |
---|---|
Any |
The materialized data. |
Source code in zenml/models/artifact_models.py
def read(self) -> Any:
"""(Deprecated) Materializes (loads) the data stored in this artifact.
Returns:
The materialized data.
"""
logger.warning(
"`artifact.read()` is deprecated and will be removed in a future "
"release. Please use `artifact.load()` instead."
)
return self.load()
visualize(self, title=None)
Visualize the artifact in notebook environments.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
title |
Optional[str] |
Optional title to show before the visualizations. |
None |
Source code in zenml/models/artifact_models.py
def visualize(self, title: Optional[str] = None) -> None:
"""Visualize the artifact in notebook environments.
Args:
title: Optional title to show before the visualizations.
"""
from zenml.utils.visualization_utils import visualize_artifact
visualize_artifact(self, title=title)
base_models
Base domain model definitions.
BaseRequestModel (BaseZenModel)
pydantic-model
Base request model.
Used as a base class for all request models.
Source code in zenml/models/base_models.py
class BaseRequestModel(BaseZenModel):
"""Base request model.
Used as a base class for all request models.
"""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
BaseResponseModel (BaseZenModel)
pydantic-model
Base domain model.
Used as a base class for all domain models that have the following common characteristics:
- are uniquely identified by a UUID
- have a creation timestamp and a last modified timestamp
Source code in zenml/models/base_models.py
class BaseResponseModel(BaseZenModel):
"""Base domain model.
Used as a base class for all domain models that have the following common
characteristics:
* are uniquely identified by a UUID
* have a creation timestamp and a last modified timestamp
"""
id: UUID = Field(title="The unique resource id.")
created: datetime = Field(title="Time when this resource was created.")
updated: datetime = Field(
title="Time when this resource was last updated."
)
def __hash__(self) -> int:
"""Implementation of hash magic method.
Returns:
Hash of the UUID.
"""
return hash((type(self),) + tuple([self.id]))
def __eq__(self, other: Any) -> bool:
"""Implementation of equality magic method.
Args:
other: The other object to compare to.
Returns:
True if the other object is of the same type and has the same UUID.
"""
if isinstance(other, BaseResponseModel):
return self.id == other.id
else:
return False
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for base response models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["entity_id"] = self.id
return metadata
__eq__(self, other)
special
Implementation of equality magic method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
Any |
The other object to compare to. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the other object is of the same type and has the same UUID. |
Source code in zenml/models/base_models.py
def __eq__(self, other: Any) -> bool:
"""Implementation of equality magic method.
Args:
other: The other object to compare to.
Returns:
True if the other object is of the same type and has the same UUID.
"""
if isinstance(other, BaseResponseModel):
return self.id == other.id
else:
return False
__hash__(self)
special
Implementation of hash magic method.
Returns:
Type | Description |
---|---|
int |
Hash of the UUID. |
Source code in zenml/models/base_models.py
def __hash__(self) -> int:
"""Implementation of hash magic method.
Returns:
Hash of the UUID.
"""
return hash((type(self),) + tuple([self.id]))
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
get_analytics_metadata(self)
Fetches the analytics metadata for base response models.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The analytics metadata. |
Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for base response models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["entity_id"] = self.id
return metadata
BaseZenModel (AnalyticsTrackedModelMixin)
pydantic-model
Base model class for all ZenML models.
This class is used as a base class for all ZenML models. It provides functionality for tracking analytics events and proper encoding of SecretStr values.
Source code in zenml/models/base_models.py
class BaseZenModel(AnalyticsTrackedModelMixin):
"""Base model class for all ZenML models.
This class is used as a base class for all ZenML models. It provides
functionality for tracking analytics events and proper encoding of
SecretStr values.
"""
class Config:
"""Pydantic configuration class."""
# This is needed to allow the REST client and server to unpack SecretStr
# values correctly.
json_encoders = {
SecretStr: lambda v: v.get_secret_value()
if v is not None
else None
}
# Allow extras on all models to support forwards and backwards
# compatibility (e.g. new fields in newer versions of ZenML servers
# are allowed to be present in older versions of ZenML clients and
# vice versa).
extra = "allow"
Config
Pydantic configuration class.
Source code in zenml/models/base_models.py
class Config:
"""Pydantic configuration class."""
# This is needed to allow the REST client and server to unpack SecretStr
# values correctly.
json_encoders = {
SecretStr: lambda v: v.get_secret_value()
if v is not None
else None
}
# Allow extras on all models to support forwards and backwards
# compatibility (e.g. new fields in newer versions of ZenML servers
# are allowed to be present in older versions of ZenML clients and
# vice versa).
extra = "allow"
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
ShareableRequestModel (WorkspaceScopedRequestModel)
pydantic-model
Base shareable workspace-scoped domain model.
Used as a base class for all domain models that are workspace-scoped and are shareable.
Source code in zenml/models/base_models.py
class ShareableRequestModel(WorkspaceScopedRequestModel):
"""Base shareable workspace-scoped domain model.
Used as a base class for all domain models that are workspace-scoped and are
shareable.
"""
is_shared: bool = Field(
default=False,
title=(
"Flag describing if this resource is shared with other users in "
"the same workspace."
),
)
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for workspace scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["is_shared"] = self.is_shared
return metadata
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
get_analytics_metadata(self)
Fetches the analytics metadata for workspace scoped models.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The analytics metadata. |
Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for workspace scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["is_shared"] = self.is_shared
return metadata
ShareableResponseModel (WorkspaceScopedResponseModel)
pydantic-model
Base shareable workspace-scoped domain model.
Used as a base class for all domain models that are workspace-scoped and are shareable.
Source code in zenml/models/base_models.py
class ShareableResponseModel(WorkspaceScopedResponseModel):
"""Base shareable workspace-scoped domain model.
Used as a base class for all domain models that are workspace-scoped and are
shareable.
"""
is_shared: bool = Field(
title=(
"Flag describing if this resource is shared with other users in "
"the same workspace."
),
)
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for workspace scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["is_shared"] = self.is_shared
return metadata
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
get_analytics_metadata(self)
Fetches the analytics metadata for workspace scoped models.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The analytics metadata. |
Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for workspace scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["is_shared"] = self.is_shared
return metadata
UserScopedRequestModel (BaseRequestModel)
pydantic-model
Base user-owned request model.
Used as a base class for all domain models that are "owned" by a user.
Source code in zenml/models/base_models.py
class UserScopedRequestModel(BaseRequestModel):
"""Base user-owned request model.
Used as a base class for all domain models that are "owned" by a user.
"""
user: UUID = Field(title="The id of the user that created this resource.")
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for user scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["user_id"] = self.user
return metadata
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
get_analytics_metadata(self)
Fetches the analytics metadata for user scoped models.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The analytics metadata. |
Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for user scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["user_id"] = self.user
return metadata
UserScopedResponseModel (BaseResponseModel)
pydantic-model
Base user-owned domain model.
Used as a base class for all domain models that are "owned" by a user.
Source code in zenml/models/base_models.py
class UserScopedResponseModel(BaseResponseModel):
"""Base user-owned domain model.
Used as a base class for all domain models that are "owned" by a user.
"""
user: Union["UserResponseModel", None] = Field(
title="The user that created this resource.", nullable=True
)
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for user scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
if self.user is not None:
metadata["user_id"] = self.user.id
return metadata
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
get_analytics_metadata(self)
Fetches the analytics metadata for user scoped models.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The analytics metadata. |
Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for user scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
if self.user is not None:
metadata["user_id"] = self.user.id
return metadata
WorkspaceScopedRequestModel (UserScopedRequestModel)
pydantic-model
Base workspace-scoped request domain model.
Used as a base class for all domain models that are workspace-scoped.
Source code in zenml/models/base_models.py
class WorkspaceScopedRequestModel(UserScopedRequestModel):
"""Base workspace-scoped request domain model.
Used as a base class for all domain models that are workspace-scoped.
"""
workspace: UUID = Field(
title="The workspace to which this resource belongs."
)
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for workspace scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["workspace_id"] = self.workspace
return metadata
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
get_analytics_metadata(self)
Fetches the analytics metadata for workspace scoped models.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The analytics metadata. |
Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for workspace scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["workspace_id"] = self.workspace
return metadata
WorkspaceScopedResponseModel (UserScopedResponseModel)
pydantic-model
Base workspace-scoped domain model.
Used as a base class for all domain models that are workspace-scoped.
Source code in zenml/models/base_models.py
class WorkspaceScopedResponseModel(UserScopedResponseModel):
"""Base workspace-scoped domain model.
Used as a base class for all domain models that are workspace-scoped.
"""
workspace: "WorkspaceResponseModel" = Field(
title="The workspace of this resource."
)
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for workspace scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["workspace_id"] = self.workspace.id
return metadata
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
get_analytics_metadata(self)
Fetches the analytics metadata for workspace scoped models.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The analytics metadata. |
Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for workspace scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["workspace_id"] = self.workspace.id
return metadata
update_model(_cls)
Base update model.
This is used as a decorator on top of request models to convert them into update models where the fields are optional and can be set to None.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
_cls |
Type[~T] |
The class to decorate |
required |
Returns:
Type | Description |
---|---|
Type[~T] |
The decorated class. |
Source code in zenml/models/base_models.py
def update_model(_cls: Type[T]) -> Type[T]:
"""Base update model.
This is used as a decorator on top of request models to convert them
into update models where the fields are optional and can be set to None.
Args:
_cls: The class to decorate
Returns:
The decorated class.
"""
for _, value in _cls.__fields__.items():
value.required = False
value.allow_none = True
return _cls
code_repository_models
Models representing code repositories.
CodeReferenceBaseModel (BaseModel)
pydantic-model
Base model for code references.
Source code in zenml/models/code_repository_models.py
class CodeReferenceBaseModel(BaseModel):
"""Base model for code references."""
commit: str = Field(description="The commit of the code reference.")
subdirectory: str = Field(
description="The subdirectory of the code reference."
)
commit: str
pydantic-field
required
The commit of the code reference.
subdirectory: str
pydantic-field
required
The subdirectory of the code reference.
CodeReferenceRequestModel (CodeReferenceBaseModel, BaseRequestModel)
pydantic-model
Code reference request model.
Source code in zenml/models/code_repository_models.py
class CodeReferenceRequestModel(CodeReferenceBaseModel, BaseRequestModel):
"""Code reference request model."""
code_repository: UUID = Field(
description="The repository of the code reference."
)
code_repository: UUID
pydantic-field
required
The repository of the code reference.
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
CodeReferenceResponseModel (CodeReferenceBaseModel, BaseResponseModel)
pydantic-model
Code reference response model.
Source code in zenml/models/code_repository_models.py
class CodeReferenceResponseModel(CodeReferenceBaseModel, BaseResponseModel):
"""Code reference response model."""
code_repository: CodeRepositoryResponseModel = Field(
description="The repository of the code reference."
)
code_repository: CodeRepositoryResponseModel
pydantic-field
required
The repository of the code reference.
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
CodeRepositoryBaseModel (BaseModel)
pydantic-model
Base model for code repositories.
Source code in zenml/models/code_repository_models.py
class CodeRepositoryBaseModel(BaseModel):
"""Base model for code repositories."""
name: str = Field(
title="The name of the code repository.",
max_length=STR_FIELD_MAX_LENGTH,
)
config: Dict[str, Any] = Field(
description="Configuration for the code repository."
)
source: Source = Field(description="The code repository source.")
logo_url: Optional[str] = Field(
description="Optional URL of a logo (png, jpg or svg) for the code repository."
)
description: Optional[str] = Field(
description="Code repository description.",
max_length=TEXT_FIELD_MAX_LENGTH,
)
config: Dict[str, Any]
pydantic-field
required
Configuration for the code repository.
description: ConstrainedStrValue
pydantic-field
Code repository description.
logo_url: str
pydantic-field
Optional URL of a logo (png, jpg or svg) for the code repository.
source: Source
pydantic-field
required
The code repository source.
CodeRepositoryFilterModel (WorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced filtering of all code repositories.
Source code in zenml/models/code_repository_models.py
class CodeRepositoryFilterModel(WorkspaceScopedFilterModel):
"""Model to enable advanced filtering of all code repositories."""
name: Optional[str] = Field(
description="Name of the code repository.",
)
workspace_id: Union[UUID, str, None] = Field(
description="Workspace of the code repository."
)
user_id: Union[UUID, str, None] = Field(
description="User that created the code repository."
)
name: str
pydantic-field
Name of the code repository.
user_id: Union[uuid.UUID, str]
pydantic-field
User that created the code repository.
workspace_id: Union[uuid.UUID, str]
pydantic-field
Workspace of the code repository.
CodeRepositoryRequestModel (CodeRepositoryBaseModel, WorkspaceScopedRequestModel)
pydantic-model
Code repository request model.
Source code in zenml/models/code_repository_models.py
class CodeRepositoryRequestModel(
CodeRepositoryBaseModel, WorkspaceScopedRequestModel
):
"""Code repository request model."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
CodeRepositoryResponseModel (CodeRepositoryBaseModel, WorkspaceScopedResponseModel)
pydantic-model
Code repository response model.
Source code in zenml/models/code_repository_models.py
class CodeRepositoryResponseModel(
CodeRepositoryBaseModel, WorkspaceScopedResponseModel
):
"""Code repository response model."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
CodeRepositoryUpdateModel (CodeRepositoryRequestModel)
pydantic-model
Code repository update model.
Source code in zenml/models/code_repository_models.py
class CodeRepositoryUpdateModel(CodeRepositoryRequestModel):
"""Code repository update model."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
component_models
Models representing stack components.
ComponentBaseModel (BaseModel)
pydantic-model
Base model for stack components.
Source code in zenml/models/component_models.py
class ComponentBaseModel(BaseModel):
"""Base model for stack components."""
name: str = Field(
title="The name of the stack component.",
max_length=STR_FIELD_MAX_LENGTH,
)
type: StackComponentType = Field(
title="The type of the stack component.",
)
flavor: str = Field(
title="The flavor of the stack component.",
max_length=STR_FIELD_MAX_LENGTH,
)
configuration: Dict[str, Any] = Field(
title="The stack component configuration.",
)
connector_resource_id: Optional[str] = Field(
default=None,
description="The ID of a specific resource instance to "
"gain access to through the connector",
)
labels: Optional[Dict[str, Any]] = Field(
default=None,
title="The stack component labels.",
)
connector_resource_id: str
pydantic-field
The ID of a specific resource instance to gain access to through the connector
ComponentFilterModel (ShareableWorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced filtering of all ComponentModels.
The Component Model needs additional scoping. As such the _scope_user
field can be set to the user that is doing the filtering. The
generate_filter()
method of the baseclass is overwritten to include the
scoping.
Source code in zenml/models/component_models.py
class ComponentFilterModel(ShareableWorkspaceScopedFilterModel):
"""Model to enable advanced filtering of all ComponentModels.
The Component Model needs additional scoping. As such the `_scope_user`
field can be set to the user that is doing the filtering. The
`generate_filter()` method of the baseclass is overwritten to include the
scoping.
"""
FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*ShareableWorkspaceScopedFilterModel.FILTER_EXCLUDE_FIELDS,
"scope_type",
]
CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*ShareableWorkspaceScopedFilterModel.CLI_EXCLUDE_FIELDS,
"scope_type",
]
scope_type: Optional[str] = Field(
default=None,
description="The type to scope this query to.",
)
is_shared: Optional[Union[bool, str]] = Field(
default=None, description="If the stack is shared or private"
)
name: Optional[str] = Field(
default=None,
description="Name of the stack component",
)
flavor: Optional[str] = Field(
default=None,
description="Flavor of the stack component",
)
type: Optional[str] = Field(
default=None,
description="Type of the stack component",
)
workspace_id: Optional[Union[UUID, str]] = Field(
default=None, description="Workspace of the stack component"
)
user_id: Optional[Union[UUID, str]] = Field(
default=None, description="User of the stack"
)
connector_id: Optional[Union[UUID, str]] = Field(
default=None, description="Connector linked to the stack component"
)
def set_scope_type(self, component_type: str) -> None:
"""Set the type of component on which to perform the filtering to scope the response.
Args:
component_type: The type of component to scope the query to.
"""
self.scope_type = component_type
def generate_filter(
self, table: Type["SQLModel"]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
"""Generate the filter for the query.
Stack components can be scoped by type to narrow the search.
Args:
table: The Table that is being queried from.
Returns:
The filter expression for the query.
"""
from sqlalchemy import and_
base_filter = super().generate_filter(table)
if self.scope_type:
type_filter = getattr(table, "type") == self.scope_type
return and_(base_filter, type_filter)
return base_filter
connector_id: Union[uuid.UUID, str]
pydantic-field
Connector linked to the stack component
flavor: str
pydantic-field
Flavor of the stack component
is_shared: Union[bool, str]
pydantic-field
If the stack is shared or private
name: str
pydantic-field
Name of the stack component
scope_type: str
pydantic-field
The type to scope this query to.
type: str
pydantic-field
Type of the stack component
user_id: Union[uuid.UUID, str]
pydantic-field
User of the stack
workspace_id: Union[uuid.UUID, str]
pydantic-field
Workspace of the stack component
generate_filter(self, table)
Generate the filter for the query.
Stack components can be scoped by type to narrow the search.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
Type[SQLModel] |
The Table that is being queried from. |
required |
Returns:
Type | Description |
---|---|
Union[BinaryExpression[Any], BooleanClauseList[Any]] |
The filter expression for the query. |
Source code in zenml/models/component_models.py
def generate_filter(
self, table: Type["SQLModel"]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
"""Generate the filter for the query.
Stack components can be scoped by type to narrow the search.
Args:
table: The Table that is being queried from.
Returns:
The filter expression for the query.
"""
from sqlalchemy import and_
base_filter = super().generate_filter(table)
if self.scope_type:
type_filter = getattr(table, "type") == self.scope_type
return and_(base_filter, type_filter)
return base_filter
set_scope_type(self, component_type)
Set the type of component on which to perform the filtering to scope the response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_type |
str |
The type of component to scope the query to. |
required |
Source code in zenml/models/component_models.py
def set_scope_type(self, component_type: str) -> None:
"""Set the type of component on which to perform the filtering to scope the response.
Args:
component_type: The type of component to scope the query to.
"""
self.scope_type = component_type
ComponentRequestModel (ComponentBaseModel, ShareableRequestModel)
pydantic-model
Request model for stack components.
Source code in zenml/models/component_models.py
class ComponentRequestModel(ComponentBaseModel, ShareableRequestModel):
"""Request model for stack components."""
ANALYTICS_FIELDS: ClassVar[List[str]] = ["type", "flavor"]
connector: Optional[UUID] = Field(
default=None,
title="The service connector linked to this stack component.",
)
@validator("name")
def name_cant_be_a_secret_reference(cls, name: str) -> str:
"""Validator to ensure that the given name is not a secret reference.
Args:
name: The name to validate.
Returns:
The name if it is not a secret reference.
Raises:
ValueError: If the name is a secret reference.
"""
if secret_utils.is_secret_reference(name):
raise ValueError(
"Passing the `name` attribute of a stack component as a "
"secret reference is not allowed."
)
return name
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
name_cant_be_a_secret_reference(name)
classmethod
Validator to ensure that the given name is not a secret reference.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name to validate. |
required |
Returns:
Type | Description |
---|---|
str |
The name if it is not a secret reference. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the name is a secret reference. |
Source code in zenml/models/component_models.py
@validator("name")
def name_cant_be_a_secret_reference(cls, name: str) -> str:
"""Validator to ensure that the given name is not a secret reference.
Args:
name: The name to validate.
Returns:
The name if it is not a secret reference.
Raises:
ValueError: If the name is a secret reference.
"""
if secret_utils.is_secret_reference(name):
raise ValueError(
"Passing the `name` attribute of a stack component as a "
"secret reference is not allowed."
)
return name
ComponentResponseModel (ComponentBaseModel, ShareableResponseModel)
pydantic-model
Response model for stack components.
Source code in zenml/models/component_models.py
class ComponentResponseModel(ComponentBaseModel, ShareableResponseModel):
"""Response model for stack components."""
ANALYTICS_FIELDS: ClassVar[List[str]] = ["type", "flavor"]
connector: Optional["ServiceConnectorResponseModel"] = Field(
default=None,
title="The service connector linked to this stack component.",
)
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
ComponentUpdateModel (ComponentRequestModel)
pydantic-model
Update model for stack components.
Source code in zenml/models/component_models.py
class ComponentUpdateModel(ComponentRequestModel):
"""Update model for stack components."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
constants
Constants used by ZenML domain models.
filter_models
Base filter model definitions.
BaseFilterModel (BaseModel)
pydantic-model
Class to unify all filter, paginate and sort request parameters.
This Model allows fine-grained filtering, sorting and pagination of resources.
Usage example for subclasses of this class:
ResourceListModel(
name="contains:default",
workspace="default"
count_steps="gte:5"
sort_by="created",
page=2,
size=50
)
Source code in zenml/models/filter_models.py
class BaseFilterModel(BaseModel):
"""Class to unify all filter, paginate and sort request parameters.
This Model allows fine-grained filtering, sorting and pagination of
resources.
Usage example for subclasses of this class:
```
ResourceListModel(
name="contains:default",
workspace="default"
count_steps="gte:5"
sort_by="created",
page=2,
size=50
)
```
"""
# List of fields that cannot be used as filters.
FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
"sort_by",
"page",
"size",
"logical_operator",
]
# List of fields that are not even mentioned as options in the CLI.
CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = []
sort_by: str = Field(
default="created", description="Which column to sort by."
)
logical_operator: LogicalOperators = Field(
default=LogicalOperators.AND,
description="Which logical operator to use between all filters "
"['and', 'or']",
)
page: int = Field(
default=PAGINATION_STARTING_PAGE, ge=1, description="Page number"
)
size: int = Field(
default=PAGE_SIZE_DEFAULT,
ge=1,
le=PAGE_SIZE_MAXIMUM,
description="Page size",
)
id: Optional[Union[UUID, str]] = Field(
default=None, description="Id for this resource"
)
created: Optional[Union[datetime, str]] = Field(
default=None, description="Created"
)
updated: Optional[Union[datetime, str]] = Field(
default=None, description="Updated"
)
@validator("sort_by", pre=True)
def validate_sort_by(cls, v: str) -> str:
"""Validate that the sort_column is a valid column with a valid operand.
Args:
v: The sort_by field value.
Returns:
The validated sort_by field value.
Raises:
ValidationError: If the sort_by field is not a string.
ValueError: If the resource can't be sorted by this field.
"""
# Somehow pydantic allows you to pass in int values, which will be
# interpreted as string, however within the validator they are still
# integers, which don't have a .split() method
if not isinstance(v, str):
raise ValidationError(
f"str type expected for the sort_by field. "
f"Received a {type(v)}"
)
column = v
split_value = v.split(":", 1)
if len(split_value) == 2:
column = split_value[1]
if split_value[0] not in SorterOps.values():
logger.warning(
"Invalid operand used for column sorting. "
"Only the following operands are supported `%s`. "
"Defaulting to 'asc' on column `%s`.",
SorterOps.values(),
column,
)
v = column
if column in cls.FILTER_EXCLUDE_FIELDS:
raise ValueError(
f"This resource can not be sorted by this field: '{v}'"
)
elif column in cls.__fields__:
return v
else:
raise ValueError(
"You can only sort by valid fields of this resource"
)
@root_validator(pre=True)
def filter_ops(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Parse incoming filters to ensure all filters are legal.
Args:
values: The values of the class.
Returns:
The values of the class.
"""
cls._generate_filter_list(values)
return values
@property
def list_of_filters(self) -> List[Filter]:
"""Converts the class variables into a list of usable Filter Models.
Returns:
A list of Filter models.
"""
return self._generate_filter_list(
{key: getattr(self, key) for key in self.__fields__}
)
@property
def sorting_params(self) -> Tuple[str, SorterOps]:
"""Converts the class variables into a list of usable Filter Models.
Returns:
A tuple of the column to sort by and the sorting operand.
"""
column = self.sort_by
# The default sorting operand is asc
operator = SorterOps.ASCENDING
# Check if user explicitly set an operand
split_value = self.sort_by.split(":", 1)
if len(split_value) == 2:
column = split_value[1]
operator = SorterOps(split_value[0])
return column, operator
@classmethod
def _generate_filter_list(cls, values: Dict[str, Any]) -> List[Filter]:
"""Create a list of filters from a (column, value) dictionary.
Args:
values: A dictionary of column names and values to filter on.
Returns:
A list of filters.
"""
list_of_filters: List[Filter] = []
for key, value in values.items():
# Ignore excluded filters
if key in cls.FILTER_EXCLUDE_FIELDS:
continue
# Skip filtering for None values
if value is None:
continue
# Determine the operator and filter value
value, operator = cls._resolve_operator(value)
# Define the filter
filter = cls._define_filter(
column=key, value=value, operator=operator
)
list_of_filters.append(filter)
return list_of_filters
@staticmethod
def _resolve_operator(value: Any) -> Tuple[Any, GenericFilterOps]:
"""Determine the operator and filter value from a user-provided value.
If the user-provided value is a string of the form "operator:value",
then the operator is extracted and the value is returned. Otherwise,
`GenericFilterOps.EQUALS` is used as default operator and the value
is returned as-is.
Args:
value: The user-provided value.
Returns:
A tuple of the filter value and the operator.
"""
operator = GenericFilterOps.EQUALS # Default operator
if isinstance(value, str):
split_value = value.split(":", 1)
if (
len(split_value) == 2
and split_value[0] in GenericFilterOps.values()
):
value = split_value[1]
operator = GenericFilterOps(split_value[0])
return value, operator
@classmethod
def _define_filter(
cls, column: str, value: Any, operator: GenericFilterOps
) -> Filter:
"""Define a filter for a given column.
Args:
column: The column to filter on.
value: The value by which to filter.
operator: The operator to use for filtering.
Returns:
A Filter object.
"""
# Create datetime filters
if cls.is_datetime_field(column):
return cls._define_datetime_filter(
column=column,
value=value,
operator=operator,
)
# Create UUID filters
if cls.is_uuid_field(column):
return cls._define_uuid_filter(
column=column,
value=value,
operator=operator,
)
# Create int filters
if cls.is_int_field(column):
return NumericFilter(
operation=GenericFilterOps(operator),
column=column,
value=int(value),
)
# Create bool filters
if cls.is_bool_field(column):
return cls._define_bool_filter(
column=column,
value=value,
operator=operator,
)
# Create str filters
if cls.is_str_field(column):
return StrFilter(
operation=GenericFilterOps(operator),
column=column,
value=value,
)
# Handle unsupported datatypes
logger.warning(
f"The Datatype {cls.__fields__[column].type_} might not be "
"supported for filtering. Defaulting to a string filter."
)
return StrFilter(
operation=GenericFilterOps(operator),
column=column,
value=str(value),
)
@classmethod
def is_datetime_field(cls, k: str) -> bool:
"""Checks if it's a datetime field.
Args:
k: The key to check.
Returns:
True if the field is a datetime field, False otherwise.
"""
return (
issubclass(datetime, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is datetime
)
@classmethod
def is_uuid_field(cls, k: str) -> bool:
"""Checks if it's a uuid field.
Args:
k: The key to check.
Returns:
True if the field is a uuid field, False otherwise.
"""
return (
issubclass(UUID, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is UUID
)
@classmethod
def is_int_field(cls, k: str) -> bool:
"""Checks if it's a int field.
Args:
k: The key to check.
Returns:
True if the field is a int field, False otherwise.
"""
return (
issubclass(int, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is int
)
@classmethod
def is_bool_field(cls, k: str) -> bool:
"""Checks if it's a bool field.
Args:
k: The key to check.
Returns:
True if the field is a bool field, False otherwise.
"""
return (
issubclass(bool, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is bool
)
@classmethod
def is_str_field(cls, k: str) -> bool:
"""Checks if it's a string field.
Args:
k: The key to check.
Returns:
True if the field is a string field, False otherwise.
"""
return (
issubclass(str, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is str
)
@classmethod
def is_sort_by_field(cls, k: str) -> bool:
"""Checks if it's a sort by field.
Args:
k: The key to check.
Returns:
True if the field is a sort by field, False otherwise.
"""
return (
issubclass(str, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ == str
) and k == "sort_by"
@staticmethod
def _define_datetime_filter(
column: str, value: Any, operator: GenericFilterOps
) -> NumericFilter:
"""Define a datetime filter for a given column.
Args:
column: The column to filter on.
value: The datetime value by which to filter.
operator: The operator to use for filtering.
Returns:
A Filter object.
Raises:
ValueError: If the value is not a valid datetime.
"""
try:
if isinstance(value, datetime):
datetime_value = value
else:
datetime_value = datetime.strptime(
value, FILTERING_DATETIME_FORMAT
)
except ValueError as e:
raise ValueError(
"The datetime filter only works with values in the following "
f"format: {FILTERING_DATETIME_FORMAT}"
) from e
datetime_filter = NumericFilter(
operation=GenericFilterOps(operator),
column=column,
value=datetime_value,
)
return datetime_filter
@staticmethod
def _define_uuid_filter(
column: str, value: Any, operator: GenericFilterOps
) -> UUIDFilter:
"""Define a UUID filter for a given column.
Args:
column: The column to filter on.
value: The UUID value by which to filter.
operator: The operator to use for filtering.
Returns:
A Filter object.
Raises:
ValueError: If the value is not a valid UUID.
"""
# For equality checks, ensure that the value is a valid UUID.
if operator == GenericFilterOps.EQUALS and not isinstance(value, UUID):
try:
UUID(value)
except ValueError as e:
raise ValueError(
"Invalid value passed as UUID query parameter."
) from e
# Cast the value to string for further comparisons.
value = str(value)
# Generate the filter.
uuid_filter = UUIDFilter(
operation=GenericFilterOps(operator),
column=column,
value=value,
)
return uuid_filter
@staticmethod
def _define_bool_filter(
column: str, value: Any, operator: GenericFilterOps
) -> BoolFilter:
"""Define a bool filter for a given column.
Args:
column: The column to filter on.
value: The bool value by which to filter.
operator: The operator to use for filtering.
Returns:
A Filter object.
"""
if GenericFilterOps(operator) != GenericFilterOps.EQUALS:
logger.warning(
"Boolean filters do not support any"
"operation except for equals. Defaulting"
"to an `equals` comparison."
)
return BoolFilter(
operation=GenericFilterOps.EQUALS,
column=column,
value=bool(value),
)
@property
def offset(self) -> int:
"""Returns the offset needed for the query on the data persistence layer.
Returns:
The offset for the query.
"""
return self.size * (self.page - 1)
def generate_filter(
self, table: Type[SQLModel]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
"""Generate the filter for the query.
Args:
table: The Table that is being queried from.
Returns:
The filter expression for the query.
Raises:
RuntimeError: If a valid logical operator is not supplied.
"""
from sqlalchemy import and_
from sqlmodel import or_
filters = []
for column_filter in self.list_of_filters:
filters.append(
column_filter.generate_query_conditions(table=table)
)
if self.logical_operator == LogicalOperators.OR:
return or_(False, *filters)
elif self.logical_operator == LogicalOperators.AND:
return and_(True, *filters)
else:
raise RuntimeError("No valid logical operator was supplied.")
def apply_filter(
self,
query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
table: Type["AnySchema"],
) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
filters = self.generate_filter(table=table)
if filters is not None:
query = query.where(filters)
return query
created: Union[datetime.datetime, str]
pydantic-field
Created
id: Union[uuid.UUID, str]
pydantic-field
Id for this resource
list_of_filters: List[zenml.models.filter_models.Filter]
property
readonly
Converts the class variables into a list of usable Filter Models.
Returns:
Type | Description |
---|---|
List[zenml.models.filter_models.Filter] |
A list of Filter models. |
logical_operator: LogicalOperators
pydantic-field
Which logical operator to use between all filters ['and', 'or']
offset: int
property
readonly
Returns the offset needed for the query on the data persistence layer.
Returns:
Type | Description |
---|---|
int |
The offset for the query. |
page: ConstrainedIntValue
pydantic-field
Page number
size: ConstrainedIntValue
pydantic-field
Page size
sort_by: str
pydantic-field
Which column to sort by.
sorting_params: Tuple[str, zenml.enums.SorterOps]
property
readonly
Converts the class variables into a list of usable Filter Models.
Returns:
Type | Description |
---|---|
Tuple[str, zenml.enums.SorterOps] |
A tuple of the column to sort by and the sorting operand. |
updated: Union[datetime.datetime, str]
pydantic-field
Updated
apply_filter(self, query, table)
Applies the filter to a query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
Union[Select[AnySchema], SelectOfScalar[AnySchema]] |
The query to which to apply the filter. |
required |
table |
Type[AnySchema] |
The query table. |
required |
Returns:
Type | Description |
---|---|
Union[Select[AnySchema], SelectOfScalar[AnySchema]] |
The query with filter applied. |
Source code in zenml/models/filter_models.py
def apply_filter(
self,
query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
table: Type["AnySchema"],
) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
filters = self.generate_filter(table=table)
if filters is not None:
query = query.where(filters)
return query
filter_ops(values)
classmethod
Parse incoming filters to ensure all filters are legal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
values |
Dict[str, Any] |
The values of the class. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The values of the class. |
Source code in zenml/models/filter_models.py
@root_validator(pre=True)
def filter_ops(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Parse incoming filters to ensure all filters are legal.
Args:
values: The values of the class.
Returns:
The values of the class.
"""
cls._generate_filter_list(values)
return values
generate_filter(self, table)
Generate the filter for the query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
Type[sqlmodel.main.SQLModel] |
The Table that is being queried from. |
required |
Returns:
Type | Description |
---|---|
Union[BinaryExpression[Any], BooleanClauseList[Any]] |
The filter expression for the query. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If a valid logical operator is not supplied. |
Source code in zenml/models/filter_models.py
def generate_filter(
self, table: Type[SQLModel]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
"""Generate the filter for the query.
Args:
table: The Table that is being queried from.
Returns:
The filter expression for the query.
Raises:
RuntimeError: If a valid logical operator is not supplied.
"""
from sqlalchemy import and_
from sqlmodel import or_
filters = []
for column_filter in self.list_of_filters:
filters.append(
column_filter.generate_query_conditions(table=table)
)
if self.logical_operator == LogicalOperators.OR:
return or_(False, *filters)
elif self.logical_operator == LogicalOperators.AND:
return and_(True, *filters)
else:
raise RuntimeError("No valid logical operator was supplied.")
is_bool_field(k)
classmethod
Checks if it's a bool field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a bool field, False otherwise. |
Source code in zenml/models/filter_models.py
@classmethod
def is_bool_field(cls, k: str) -> bool:
"""Checks if it's a bool field.
Args:
k: The key to check.
Returns:
True if the field is a bool field, False otherwise.
"""
return (
issubclass(bool, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is bool
)
is_datetime_field(k)
classmethod
Checks if it's a datetime field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a datetime field, False otherwise. |
Source code in zenml/models/filter_models.py
@classmethod
def is_datetime_field(cls, k: str) -> bool:
"""Checks if it's a datetime field.
Args:
k: The key to check.
Returns:
True if the field is a datetime field, False otherwise.
"""
return (
issubclass(datetime, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is datetime
)
is_int_field(k)
classmethod
Checks if it's a int field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a int field, False otherwise. |
Source code in zenml/models/filter_models.py
@classmethod
def is_int_field(cls, k: str) -> bool:
"""Checks if it's a int field.
Args:
k: The key to check.
Returns:
True if the field is a int field, False otherwise.
"""
return (
issubclass(int, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is int
)
is_sort_by_field(k)
classmethod
Checks if it's a sort by field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a sort by field, False otherwise. |
Source code in zenml/models/filter_models.py
@classmethod
def is_sort_by_field(cls, k: str) -> bool:
"""Checks if it's a sort by field.
Args:
k: The key to check.
Returns:
True if the field is a sort by field, False otherwise.
"""
return (
issubclass(str, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ == str
) and k == "sort_by"
is_str_field(k)
classmethod
Checks if it's a string field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a string field, False otherwise. |
Source code in zenml/models/filter_models.py
@classmethod
def is_str_field(cls, k: str) -> bool:
"""Checks if it's a string field.
Args:
k: The key to check.
Returns:
True if the field is a string field, False otherwise.
"""
return (
issubclass(str, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is str
)
is_uuid_field(k)
classmethod
Checks if it's a uuid field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a uuid field, False otherwise. |
Source code in zenml/models/filter_models.py
@classmethod
def is_uuid_field(cls, k: str) -> bool:
"""Checks if it's a uuid field.
Args:
k: The key to check.
Returns:
True if the field is a uuid field, False otherwise.
"""
return (
issubclass(UUID, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is UUID
)
validate_sort_by(v)
classmethod
Validate that the sort_column is a valid column with a valid operand.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
v |
str |
The sort_by field value. |
required |
Returns:
Type | Description |
---|---|
str |
The validated sort_by field value. |
Exceptions:
Type | Description |
---|---|
ValidationError |
If the sort_by field is not a string. |
ValueError |
If the resource can't be sorted by this field. |
Source code in zenml/models/filter_models.py
@validator("sort_by", pre=True)
def validate_sort_by(cls, v: str) -> str:
"""Validate that the sort_column is a valid column with a valid operand.
Args:
v: The sort_by field value.
Returns:
The validated sort_by field value.
Raises:
ValidationError: If the sort_by field is not a string.
ValueError: If the resource can't be sorted by this field.
"""
# Somehow pydantic allows you to pass in int values, which will be
# interpreted as string, however within the validator they are still
# integers, which don't have a .split() method
if not isinstance(v, str):
raise ValidationError(
f"str type expected for the sort_by field. "
f"Received a {type(v)}"
)
column = v
split_value = v.split(":", 1)
if len(split_value) == 2:
column = split_value[1]
if split_value[0] not in SorterOps.values():
logger.warning(
"Invalid operand used for column sorting. "
"Only the following operands are supported `%s`. "
"Defaulting to 'asc' on column `%s`.",
SorterOps.values(),
column,
)
v = column
if column in cls.FILTER_EXCLUDE_FIELDS:
raise ValueError(
f"This resource can not be sorted by this field: '{v}'"
)
elif column in cls.__fields__:
return v
else:
raise ValueError(
"You can only sort by valid fields of this resource"
)
BoolFilter (Filter)
pydantic-model
Filter for all Boolean fields.
Source code in zenml/models/filter_models.py
class BoolFilter(Filter):
"""Filter for all Boolean fields."""
ALLOWED_OPS: ClassVar[List[str]] = [GenericFilterOps.EQUALS]
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions for a boolean column.
Args:
column: The boolean column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
return column == self.value
generate_query_conditions_from_column(self, column)
Generate query conditions for a boolean column.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
column |
Any |
The boolean column of an SQLModel table on which to filter. |
required |
Returns:
Type | Description |
---|---|
Any |
A list of query conditions. |
Source code in zenml/models/filter_models.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions for a boolean column.
Args:
column: The boolean column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
return column == self.value
Filter (BaseModel, ABC)
pydantic-model
Filter for all fields.
A Filter is a combination of a column, a value that the user uses to
filter on this column and an operation to use. The easiest example
would be user equals aria
with column=user
, value=aria
and the
operation=equals
.
All subclasses of this class will support different sets of operations. This operation set is defined in the ALLOWED_OPS class variable.
Source code in zenml/models/filter_models.py
class Filter(BaseModel, ABC):
"""Filter for all fields.
A Filter is a combination of a column, a value that the user uses to
filter on this column and an operation to use. The easiest example
would be `user equals aria` with column=`user`, value=`aria` and the
operation=`equals`.
All subclasses of this class will support different sets of operations.
This operation set is defined in the ALLOWED_OPS class variable.
"""
ALLOWED_OPS: ClassVar[List[str]] = []
operation: GenericFilterOps
column: str
value: Any
@validator("operation", pre=True)
def validate_operation(cls, op: str) -> str:
"""Validate that the operation is a valid op for the field type.
Args:
op: The operation of this filter.
Returns:
The operation if it is valid.
Raises:
ValueError: If the operation is not valid for this field type.
"""
if op not in cls.ALLOWED_OPS:
raise ValueError(
f"This datatype can not be filtered using this operation: "
f"'{op}'. The allowed operations are: {cls.ALLOWED_OPS}"
)
else:
return op
def generate_query_conditions(
self,
table: Type[SQLModel],
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
"""Generate the query conditions for the database.
This method converts the Filter class into an appropriate SQLModel
query condition, to be used when filtering on the Database.
Args:
table: The SQLModel table to use for the query creation
Returns:
A list of conditions that will be combined using the `and` operation
"""
column = getattr(table, self.column)
conditions = self.generate_query_conditions_from_column(column)
return conditions # type:ignore[no-any-return]
@abstractmethod
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions given the corresponding database column.
This method should be overridden by subclasses to define how each
supported operation in `self.ALLOWED_OPS` can be used to filter the
given column by `self.value`.
Args:
column: The column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
generate_query_conditions(self, table)
Generate the query conditions for the database.
This method converts the Filter class into an appropriate SQLModel query condition, to be used when filtering on the Database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
Type[sqlmodel.main.SQLModel] |
The SQLModel table to use for the query creation |
required |
Returns:
Type | Description |
---|---|
Union[BinaryExpression[Any], BooleanClauseList[Any]] |
A list of conditions that will be combined using the |
Source code in zenml/models/filter_models.py
def generate_query_conditions(
self,
table: Type[SQLModel],
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
"""Generate the query conditions for the database.
This method converts the Filter class into an appropriate SQLModel
query condition, to be used when filtering on the Database.
Args:
table: The SQLModel table to use for the query creation
Returns:
A list of conditions that will be combined using the `and` operation
"""
column = getattr(table, self.column)
conditions = self.generate_query_conditions_from_column(column)
return conditions # type:ignore[no-any-return]
generate_query_conditions_from_column(self, column)
Generate query conditions given the corresponding database column.
This method should be overridden by subclasses to define how each
supported operation in self.ALLOWED_OPS
can be used to filter the
given column by self.value
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
column |
Any |
The column of an SQLModel table on which to filter. |
required |
Returns:
Type | Description |
---|---|
Any |
A list of query conditions. |
Source code in zenml/models/filter_models.py
@abstractmethod
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions given the corresponding database column.
This method should be overridden by subclasses to define how each
supported operation in `self.ALLOWED_OPS` can be used to filter the
given column by `self.value`.
Args:
column: The column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
validate_operation(op)
classmethod
Validate that the operation is a valid op for the field type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
op |
str |
The operation of this filter. |
required |
Returns:
Type | Description |
---|---|
str |
The operation if it is valid. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the operation is not valid for this field type. |
Source code in zenml/models/filter_models.py
@validator("operation", pre=True)
def validate_operation(cls, op: str) -> str:
"""Validate that the operation is a valid op for the field type.
Args:
op: The operation of this filter.
Returns:
The operation if it is valid.
Raises:
ValueError: If the operation is not valid for this field type.
"""
if op not in cls.ALLOWED_OPS:
raise ValueError(
f"This datatype can not be filtered using this operation: "
f"'{op}'. The allowed operations are: {cls.ALLOWED_OPS}"
)
else:
return op
NumericFilter (Filter)
pydantic-model
Filter for all numeric fields.
Source code in zenml/models/filter_models.py
class NumericFilter(Filter):
"""Filter for all numeric fields."""
value: Union[float, datetime]
ALLOWED_OPS: ClassVar[List[str]] = [
GenericFilterOps.EQUALS,
GenericFilterOps.GT,
GenericFilterOps.GTE,
GenericFilterOps.LT,
GenericFilterOps.LTE,
]
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions for a UUID column.
Args:
column: The UUID column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
if self.operation == GenericFilterOps.GTE:
return column >= self.value
if self.operation == GenericFilterOps.GT:
return column > self.value
if self.operation == GenericFilterOps.LTE:
return column <= self.value
if self.operation == GenericFilterOps.LT:
return column < self.value
return column == self.value
generate_query_conditions_from_column(self, column)
Generate query conditions for a UUID column.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
column |
Any |
The UUID column of an SQLModel table on which to filter. |
required |
Returns:
Type | Description |
---|---|
Any |
A list of query conditions. |
Source code in zenml/models/filter_models.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions for a UUID column.
Args:
column: The UUID column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
if self.operation == GenericFilterOps.GTE:
return column >= self.value
if self.operation == GenericFilterOps.GT:
return column > self.value
if self.operation == GenericFilterOps.LTE:
return column <= self.value
if self.operation == GenericFilterOps.LT:
return column < self.value
return column == self.value
ShareableWorkspaceScopedFilterModel (WorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced scoping with workspace and user scoped shareable things.
Source code in zenml/models/filter_models.py
class ShareableWorkspaceScopedFilterModel(WorkspaceScopedFilterModel):
"""Model to enable advanced scoping with workspace and user scoped shareable things."""
FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*WorkspaceScopedFilterModel.FILTER_EXCLUDE_FIELDS,
"scope_user",
]
CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*WorkspaceScopedFilterModel.CLI_EXCLUDE_FIELDS,
"scope_user",
]
scope_user: Optional[UUID] = Field(
default=None,
description="The user to scope this query to.",
)
def set_scope_user(self, user_id: UUID) -> None:
"""Set the user that is performing the filtering to scope the response.
Args:
user_id: The user ID to scope the response to.
"""
self.scope_user = user_id
def apply_filter(
self,
query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
table: Type["AnySchema"],
) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
from sqlmodel import or_
query = super().apply_filter(query=query, table=table)
if self.scope_user:
scope_filter = or_(
getattr(table, "user_id") == self.scope_user,
getattr(table, "is_shared").is_(True),
)
query = query.where(scope_filter)
return query
scope_user: UUID
pydantic-field
The user to scope this query to.
apply_filter(self, query, table)
Applies the filter to a query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
Union[Select[AnySchema], SelectOfScalar[AnySchema]] |
The query to which to apply the filter. |
required |
table |
Type[AnySchema] |
The query table. |
required |
Returns:
Type | Description |
---|---|
Union[Select[AnySchema], SelectOfScalar[AnySchema]] |
The query with filter applied. |
Source code in zenml/models/filter_models.py
def apply_filter(
self,
query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
table: Type["AnySchema"],
) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
from sqlmodel import or_
query = super().apply_filter(query=query, table=table)
if self.scope_user:
scope_filter = or_(
getattr(table, "user_id") == self.scope_user,
getattr(table, "is_shared").is_(True),
)
query = query.where(scope_filter)
return query
set_scope_user(self, user_id)
Set the user that is performing the filtering to scope the response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
UUID |
The user ID to scope the response to. |
required |
Source code in zenml/models/filter_models.py
def set_scope_user(self, user_id: UUID) -> None:
"""Set the user that is performing the filtering to scope the response.
Args:
user_id: The user ID to scope the response to.
"""
self.scope_user = user_id
StrFilter (Filter)
pydantic-model
Filter for all string fields.
Source code in zenml/models/filter_models.py
class StrFilter(Filter):
"""Filter for all string fields."""
ALLOWED_OPS: ClassVar[List[str]] = [
GenericFilterOps.EQUALS,
GenericFilterOps.STARTSWITH,
GenericFilterOps.CONTAINS,
GenericFilterOps.ENDSWITH,
]
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions for a string column.
Args:
column: The string column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
if self.operation == GenericFilterOps.CONTAINS:
return column.like(f"%{self.value}%")
if self.operation == GenericFilterOps.STARTSWITH:
return column.startswith(f"{self.value}")
if self.operation == GenericFilterOps.ENDSWITH:
return column.endswith(f"{self.value}")
return column == self.value
generate_query_conditions_from_column(self, column)
Generate query conditions for a string column.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
column |
Any |
The string column of an SQLModel table on which to filter. |
required |
Returns:
Type | Description |
---|---|
Any |
A list of query conditions. |
Source code in zenml/models/filter_models.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions for a string column.
Args:
column: The string column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
if self.operation == GenericFilterOps.CONTAINS:
return column.like(f"%{self.value}%")
if self.operation == GenericFilterOps.STARTSWITH:
return column.startswith(f"{self.value}")
if self.operation == GenericFilterOps.ENDSWITH:
return column.endswith(f"{self.value}")
return column == self.value
UUIDFilter (StrFilter)
pydantic-model
Filter for all uuid fields which are mostly treated like strings.
Source code in zenml/models/filter_models.py
class UUIDFilter(StrFilter):
"""Filter for all uuid fields which are mostly treated like strings."""
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions for a UUID column.
Args:
column: The UUID column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
import sqlalchemy
from sqlalchemy_utils.functions import cast_if
# For equality checks, compare the UUID directly
if self.operation == GenericFilterOps.EQUALS:
return column == self.value
# For all other operations, cast and handle the column as string
return super().generate_query_conditions_from_column(
column=cast_if(column, sqlalchemy.String)
)
generate_query_conditions_from_column(self, column)
Generate query conditions for a UUID column.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
column |
Any |
The UUID column of an SQLModel table on which to filter. |
required |
Returns:
Type | Description |
---|---|
Any |
A list of query conditions. |
Source code in zenml/models/filter_models.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions for a UUID column.
Args:
column: The UUID column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
import sqlalchemy
from sqlalchemy_utils.functions import cast_if
# For equality checks, compare the UUID directly
if self.operation == GenericFilterOps.EQUALS:
return column == self.value
# For all other operations, cast and handle the column as string
return super().generate_query_conditions_from_column(
column=cast_if(column, sqlalchemy.String)
)
WorkspaceScopedFilterModel (BaseFilterModel)
pydantic-model
Model to enable advanced scoping with workspace.
Source code in zenml/models/filter_models.py
class WorkspaceScopedFilterModel(BaseFilterModel):
"""Model to enable advanced scoping with workspace."""
FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*BaseFilterModel.FILTER_EXCLUDE_FIELDS,
"scope_workspace",
]
CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*BaseFilterModel.CLI_EXCLUDE_FIELDS,
"scope_workspace",
]
scope_workspace: Optional[UUID] = Field(
default=None,
description="The workspace to scope this query to.",
)
def set_scope_workspace(self, workspace_id: UUID) -> None:
"""Set the workspace to scope this response.
Args:
workspace_id: The workspace to scope this response to.
"""
self.scope_workspace = workspace_id
def apply_filter(
self,
query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
table: Type["AnySchema"],
) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
from sqlmodel import or_
query = super().apply_filter(query=query, table=table)
if self.scope_workspace:
scope_filter = or_(
getattr(table, "workspace_id") == self.scope_workspace,
getattr(table, "workspace_id").is_(None),
)
query = query.where(scope_filter)
return query
scope_workspace: UUID
pydantic-field
The workspace to scope this query to.
apply_filter(self, query, table)
Applies the filter to a query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
Union[Select[AnySchema], SelectOfScalar[AnySchema]] |
The query to which to apply the filter. |
required |
table |
Type[AnySchema] |
The query table. |
required |
Returns:
Type | Description |
---|---|
Union[Select[AnySchema], SelectOfScalar[AnySchema]] |
The query with filter applied. |
Source code in zenml/models/filter_models.py
def apply_filter(
self,
query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
table: Type["AnySchema"],
) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
from sqlmodel import or_
query = super().apply_filter(query=query, table=table)
if self.scope_workspace:
scope_filter = or_(
getattr(table, "workspace_id") == self.scope_workspace,
getattr(table, "workspace_id").is_(None),
)
query = query.where(scope_filter)
return query
set_scope_workspace(self, workspace_id)
Set the workspace to scope this response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workspace_id |
UUID |
The workspace to scope this response to. |
required |
Source code in zenml/models/filter_models.py
def set_scope_workspace(self, workspace_id: UUID) -> None:
"""Set the workspace to scope this response.
Args:
workspace_id: The workspace to scope this response to.
"""
self.scope_workspace = workspace_id
flavor_models
Models representing stack component flavors.
FlavorBaseModel (BaseModel)
pydantic-model
Base model for stack component flavors.
Source code in zenml/models/flavor_models.py
class FlavorBaseModel(BaseModel):
"""Base model for stack component flavors."""
name: str = Field(
title="The name of the Flavor.",
max_length=STR_FIELD_MAX_LENGTH,
)
type: StackComponentType = Field(title="The type of the Flavor.")
config_schema: Dict[str, Any] = Field(
title="The JSON schema of this flavor's corresponding configuration.",
)
connector_type: Optional[str] = Field(
default=None,
title="The type of the connector that this flavor uses.",
max_length=STR_FIELD_MAX_LENGTH,
)
connector_resource_type: Optional[str] = Field(
default=None,
title="The resource type of the connector that this flavor uses.",
max_length=STR_FIELD_MAX_LENGTH,
)
connector_resource_id_attr: Optional[str] = Field(
default=None,
title="The name of an attribute in the stack component configuration "
"that plays the role of resource ID when linked to a service connector.",
max_length=STR_FIELD_MAX_LENGTH,
)
source: str = Field(
title="The path to the module which contains this Flavor.",
max_length=STR_FIELD_MAX_LENGTH,
)
integration: Optional[str] = Field(
title="The name of the integration that the Flavor belongs to.",
max_length=STR_FIELD_MAX_LENGTH,
)
logo_url: Optional[str] = Field(
default=None,
title="Optionally, a url pointing to a png,"
"svg or jpg can be attached.",
)
docs_url: Optional[str] = Field(
default=None,
title="Optionally, a url pointing to docs, within docs.zenml.io.",
)
sdk_docs_url: Optional[str] = Field(
default=None,
title="Optionally, a url pointing to SDK docs,"
"within sdkdocs.zenml.io.",
)
is_custom: bool = Field(
title="Whether or not this flavor is a custom, user created flavor.",
default=True,
)
@property
def connector_requirements(self) -> Optional[ServiceConnectorRequirements]:
"""Returns the connector requirements for the flavor.
Returns:
The connector requirements for the flavor.
"""
if not self.connector_resource_type:
return None
return ServiceConnectorRequirements(
connector_type=self.connector_type,
resource_type=self.connector_resource_type,
resource_id_attr=self.connector_resource_id_attr,
)
connector_requirements: Optional[zenml.models.service_connector_models.ServiceConnectorRequirements]
property
readonly
Returns the connector requirements for the flavor.
Returns:
Type | Description |
---|---|
Optional[zenml.models.service_connector_models.ServiceConnectorRequirements] |
The connector requirements for the flavor. |
FlavorFilterModel (WorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced filtering of all Flavors.
Source code in zenml/models/flavor_models.py
class FlavorFilterModel(WorkspaceScopedFilterModel):
"""Model to enable advanced filtering of all Flavors."""
name: Optional[str] = Field(
default=None,
description="Name of the flavor",
)
type: Optional[str] = Field(
default=None,
description="Stack Component Type of the stack flavor",
)
integration: Optional[str] = Field(
default=None,
description="Integration associated with the flavor",
)
workspace_id: Optional[Union[UUID, str]] = Field(
default=None, description="Workspace of the stack"
)
user_id: Optional[Union[UUID, str]] = Field(
default=None, description="User of the stack"
)
integration: str
pydantic-field
Integration associated with the flavor
name: str
pydantic-field
Name of the flavor
type: str
pydantic-field
Stack Component Type of the stack flavor
user_id: Union[uuid.UUID, str]
pydantic-field
User of the stack
workspace_id: Union[uuid.UUID, str]
pydantic-field
Workspace of the stack
FlavorRequestModel (FlavorBaseModel, BaseRequestModel)
pydantic-model
Request model for stack component flavors.
Source code in zenml/models/flavor_models.py
class FlavorRequestModel(FlavorBaseModel, BaseRequestModel):
"""Request model for stack component flavors."""
ANALYTICS_FIELDS: ClassVar[List[str]] = [
"type",
"integration",
]
user: Optional[UUID] = Field(
default=None, title="The id of the user that created this resource."
)
workspace: Optional[UUID] = Field(
default=None, title="The workspace to which this resource belongs."
)
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
FlavorResponseModel (FlavorBaseModel, BaseResponseModel)
pydantic-model
Response model for stack component flavors.
Source code in zenml/models/flavor_models.py
class FlavorResponseModel(FlavorBaseModel, BaseResponseModel):
"""Response model for stack component flavors."""
ANALYTICS_FIELDS: ClassVar[List[str]] = [
"id",
"type",
"integration",
]
user: Union["UserResponseModel", None] = Field(
title="The user that created this resource.", nullable=True
)
workspace: Optional["WorkspaceResponseModel"] = Field(
title="The project of this resource."
)
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
FlavorUpdateModel (FlavorRequestModel)
pydantic-model
Update model for flavors.
Source code in zenml/models/flavor_models.py
class FlavorUpdateModel(FlavorRequestModel):
"""Update model for flavors."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
hub_plugin_models
Models representing ZenML Hub plugins.
HubPluginBaseModel (BaseModel)
pydantic-model
Base model for a ZenML Hub plugin.
Source code in zenml/models/hub_plugin_models.py
class HubPluginBaseModel(BaseModel):
"""Base model for a ZenML Hub plugin."""
name: str
description: Optional[str]
version: Optional[str]
release_notes: Optional[str]
repository_url: str
repository_subdirectory: Optional[str]
repository_branch: Optional[str]
repository_commit: Optional[str]
tags: Optional[List[str]]
logo_url: Optional[str]
HubPluginRequestModel (HubPluginBaseModel)
pydantic-model
Request model for a ZenML Hub plugin.
Source code in zenml/models/hub_plugin_models.py
class HubPluginRequestModel(HubPluginBaseModel):
"""Request model for a ZenML Hub plugin."""
HubPluginResponseModel (HubPluginBaseModel)
pydantic-model
Response model for a ZenML Hub plugin.
Source code in zenml/models/hub_plugin_models.py
class HubPluginResponseModel(HubPluginBaseModel):
"""Response model for a ZenML Hub plugin."""
id: UUID
status: PluginStatus
author: str
version: str
index_url: Optional[str]
package_name: Optional[str]
requirements: Optional[List[str]]
build_logs: Optional[str]
created: datetime
updated: datetime
HubUserResponseModel (BaseModel)
pydantic-model
Model for a ZenML Hub user.
Source code in zenml/models/hub_plugin_models.py
class HubUserResponseModel(BaseModel):
"""Model for a ZenML Hub user."""
id: UUID
email: str
username: Optional[str]
PluginStatus (StrEnum)
Enum that represents the status of a plugin.
- PENDING: Plugin is being built
- FAILED: Plugin build failed
- AVAILABLE: Plugin is available for installation
- YANKED: Plugin was yanked and is no longer available
Source code in zenml/models/hub_plugin_models.py
class PluginStatus(StrEnum):
"""Enum that represents the status of a plugin.
- PENDING: Plugin is being built
- FAILED: Plugin build failed
- AVAILABLE: Plugin is available for installation
- YANKED: Plugin was yanked and is no longer available
"""
PENDING = "pending"
FAILED = "failed"
AVAILABLE = "available"
YANKED = "yanked"
logs_models
Models representing logs files.
LogsBaseModel (BaseModel)
pydantic-model
Base model for logs.
Source code in zenml/models/logs_models.py
class LogsBaseModel(BaseModel):
"""Base model for logs."""
uri: str = Field(
title="The uri of the logs file",
max_length=STR_FIELD_MAX_LENGTH,
)
artifact_store_id: Union[str, UUID] = Field(
title="The artifact store ID to associate the logs with.",
max_length=STR_FIELD_MAX_LENGTH,
)
LogsRequestModel (LogsBaseModel, BaseRequestModel)
pydantic-model
Request model for logs.
Source code in zenml/models/logs_models.py
class LogsRequestModel(LogsBaseModel, BaseRequestModel):
"""Request model for logs."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
LogsResponseModel (LogsBaseModel, BaseResponseModel)
pydantic-model
Response model for logs.
Source code in zenml/models/logs_models.py
class LogsResponseModel(LogsBaseModel, BaseResponseModel):
"""Response model for logs."""
step_run_id: Optional[Union[str, UUID]] = Field(
title="Step ID to associate the logs with.",
default=None,
description="When this is set, pipeline_run_id should be set to None.",
)
pipeline_run_id: Optional[Union[str, UUID]] = Field(
title="Pipeline run ID to associate the logs with.",
default=None,
description="When this is set, step_run_id should be set to None.",
)
pipeline_run_id: Union[uuid.UUID, str]
pydantic-field
When this is set, step_run_id should be set to None.
step_run_id: Union[uuid.UUID, str]
pydantic-field
When this is set, pipeline_run_id should be set to None.
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
page_model
Model implementation for easy pagination for Lists of ZenML Domain Models.
The code contained within this file has been inspired by the fastapi-pagination library: https://github.com/uriyyo/fastapi-pagination
Page (GenericModel, Generic)
pydantic-model
Return Model for List Models to accommodate pagination.
Source code in zenml/models/page_model.py
class Page(GenericModel, Generic[B]):
"""Return Model for List Models to accommodate pagination."""
index: PositiveInt
max_size: PositiveInt
total_pages: NonNegativeInt
total: NonNegativeInt
items: List[B]
__params_type__ = BaseFilterModel
@property
def size(self) -> int:
"""Return the item count of the page.
Returns:
The amount of items in the page.
"""
return len(self.items)
def __len__(self) -> int:
"""Return the item count of the page.
This enables `len(page)`.
Returns:
The amount of items in the page.
"""
return len(self.items)
def __getitem__(self, index: int) -> B:
"""Return the item at the given index.
This enables `page[index]`.
Args:
index: The index to get the item from.
Returns:
The item at the given index.
"""
return self.items[index]
def __iter__(self) -> Generator[B, None, None]: # type: ignore[override]
"""Return an iterator over the items in the page.
This enables `for item in page` loops, but breaks `dict(page)`.
Yields:
An iterator over the items in the page.
"""
for item in self.items.__iter__():
yield item
def __contains__(self, item: B) -> bool:
"""Returns whether the page contains a specific item.
This enables `item in page` checks.
Args:
item: The item to check for.
Returns:
Whether the item is in the page.
"""
return item in self.items
class Config:
"""Pydantic configuration class."""
# This is needed to allow the REST API server to unpack SecretStr
# values correctly before sending them to the client.
json_encoders = {
SecretStr: lambda v: v.get_secret_value() if v else None
}
size: int
property
readonly
Return the item count of the page.
Returns:
Type | Description |
---|---|
int |
The amount of items in the page. |
Config
Pydantic configuration class.
Source code in zenml/models/page_model.py
class Config:
"""Pydantic configuration class."""
# This is needed to allow the REST API server to unpack SecretStr
# values correctly before sending them to the client.
json_encoders = {
SecretStr: lambda v: v.get_secret_value() if v else None
}
__params_type__ (BaseModel)
pydantic-model
Class to unify all filter, paginate and sort request parameters.
This Model allows fine-grained filtering, sorting and pagination of resources.
Usage example for subclasses of this class:
ResourceListModel(
name="contains:default",
workspace="default"
count_steps="gte:5"
sort_by="created",
page=2,
size=50
)
Source code in zenml/models/page_model.py
class BaseFilterModel(BaseModel):
"""Class to unify all filter, paginate and sort request parameters.
This Model allows fine-grained filtering, sorting and pagination of
resources.
Usage example for subclasses of this class:
```
ResourceListModel(
name="contains:default",
workspace="default"
count_steps="gte:5"
sort_by="created",
page=2,
size=50
)
```
"""
# List of fields that cannot be used as filters.
FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
"sort_by",
"page",
"size",
"logical_operator",
]
# List of fields that are not even mentioned as options in the CLI.
CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = []
sort_by: str = Field(
default="created", description="Which column to sort by."
)
logical_operator: LogicalOperators = Field(
default=LogicalOperators.AND,
description="Which logical operator to use between all filters "
"['and', 'or']",
)
page: int = Field(
default=PAGINATION_STARTING_PAGE, ge=1, description="Page number"
)
size: int = Field(
default=PAGE_SIZE_DEFAULT,
ge=1,
le=PAGE_SIZE_MAXIMUM,
description="Page size",
)
id: Optional[Union[UUID, str]] = Field(
default=None, description="Id for this resource"
)
created: Optional[Union[datetime, str]] = Field(
default=None, description="Created"
)
updated: Optional[Union[datetime, str]] = Field(
default=None, description="Updated"
)
@validator("sort_by", pre=True)
def validate_sort_by(cls, v: str) -> str:
"""Validate that the sort_column is a valid column with a valid operand.
Args:
v: The sort_by field value.
Returns:
The validated sort_by field value.
Raises:
ValidationError: If the sort_by field is not a string.
ValueError: If the resource can't be sorted by this field.
"""
# Somehow pydantic allows you to pass in int values, which will be
# interpreted as string, however within the validator they are still
# integers, which don't have a .split() method
if not isinstance(v, str):
raise ValidationError(
f"str type expected for the sort_by field. "
f"Received a {type(v)}"
)
column = v
split_value = v.split(":", 1)
if len(split_value) == 2:
column = split_value[1]
if split_value[0] not in SorterOps.values():
logger.warning(
"Invalid operand used for column sorting. "
"Only the following operands are supported `%s`. "
"Defaulting to 'asc' on column `%s`.",
SorterOps.values(),
column,
)
v = column
if column in cls.FILTER_EXCLUDE_FIELDS:
raise ValueError(
f"This resource can not be sorted by this field: '{v}'"
)
elif column in cls.__fields__:
return v
else:
raise ValueError(
"You can only sort by valid fields of this resource"
)
@root_validator(pre=True)
def filter_ops(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Parse incoming filters to ensure all filters are legal.
Args:
values: The values of the class.
Returns:
The values of the class.
"""
cls._generate_filter_list(values)
return values
@property
def list_of_filters(self) -> List[Filter]:
"""Converts the class variables into a list of usable Filter Models.
Returns:
A list of Filter models.
"""
return self._generate_filter_list(
{key: getattr(self, key) for key in self.__fields__}
)
@property
def sorting_params(self) -> Tuple[str, SorterOps]:
"""Converts the class variables into a list of usable Filter Models.
Returns:
A tuple of the column to sort by and the sorting operand.
"""
column = self.sort_by
# The default sorting operand is asc
operator = SorterOps.ASCENDING
# Check if user explicitly set an operand
split_value = self.sort_by.split(":", 1)
if len(split_value) == 2:
column = split_value[1]
operator = SorterOps(split_value[0])
return column, operator
@classmethod
def _generate_filter_list(cls, values: Dict[str, Any]) -> List[Filter]:
"""Create a list of filters from a (column, value) dictionary.
Args:
values: A dictionary of column names and values to filter on.
Returns:
A list of filters.
"""
list_of_filters: List[Filter] = []
for key, value in values.items():
# Ignore excluded filters
if key in cls.FILTER_EXCLUDE_FIELDS:
continue
# Skip filtering for None values
if value is None:
continue
# Determine the operator and filter value
value, operator = cls._resolve_operator(value)
# Define the filter
filter = cls._define_filter(
column=key, value=value, operator=operator
)
list_of_filters.append(filter)
return list_of_filters
@staticmethod
def _resolve_operator(value: Any) -> Tuple[Any, GenericFilterOps]:
"""Determine the operator and filter value from a user-provided value.
If the user-provided value is a string of the form "operator:value",
then the operator is extracted and the value is returned. Otherwise,
`GenericFilterOps.EQUALS` is used as default operator and the value
is returned as-is.
Args:
value: The user-provided value.
Returns:
A tuple of the filter value and the operator.
"""
operator = GenericFilterOps.EQUALS # Default operator
if isinstance(value, str):
split_value = value.split(":", 1)
if (
len(split_value) == 2
and split_value[0] in GenericFilterOps.values()
):
value = split_value[1]
operator = GenericFilterOps(split_value[0])
return value, operator
@classmethod
def _define_filter(
cls, column: str, value: Any, operator: GenericFilterOps
) -> Filter:
"""Define a filter for a given column.
Args:
column: The column to filter on.
value: The value by which to filter.
operator: The operator to use for filtering.
Returns:
A Filter object.
"""
# Create datetime filters
if cls.is_datetime_field(column):
return cls._define_datetime_filter(
column=column,
value=value,
operator=operator,
)
# Create UUID filters
if cls.is_uuid_field(column):
return cls._define_uuid_filter(
column=column,
value=value,
operator=operator,
)
# Create int filters
if cls.is_int_field(column):
return NumericFilter(
operation=GenericFilterOps(operator),
column=column,
value=int(value),
)
# Create bool filters
if cls.is_bool_field(column):
return cls._define_bool_filter(
column=column,
value=value,
operator=operator,
)
# Create str filters
if cls.is_str_field(column):
return StrFilter(
operation=GenericFilterOps(operator),
column=column,
value=value,
)
# Handle unsupported datatypes
logger.warning(
f"The Datatype {cls.__fields__[column].type_} might not be "
"supported for filtering. Defaulting to a string filter."
)
return StrFilter(
operation=GenericFilterOps(operator),
column=column,
value=str(value),
)
@classmethod
def is_datetime_field(cls, k: str) -> bool:
"""Checks if it's a datetime field.
Args:
k: The key to check.
Returns:
True if the field is a datetime field, False otherwise.
"""
return (
issubclass(datetime, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is datetime
)
@classmethod
def is_uuid_field(cls, k: str) -> bool:
"""Checks if it's a uuid field.
Args:
k: The key to check.
Returns:
True if the field is a uuid field, False otherwise.
"""
return (
issubclass(UUID, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is UUID
)
@classmethod
def is_int_field(cls, k: str) -> bool:
"""Checks if it's a int field.
Args:
k: The key to check.
Returns:
True if the field is a int field, False otherwise.
"""
return (
issubclass(int, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is int
)
@classmethod
def is_bool_field(cls, k: str) -> bool:
"""Checks if it's a bool field.
Args:
k: The key to check.
Returns:
True if the field is a bool field, False otherwise.
"""
return (
issubclass(bool, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is bool
)
@classmethod
def is_str_field(cls, k: str) -> bool:
"""Checks if it's a string field.
Args:
k: The key to check.
Returns:
True if the field is a string field, False otherwise.
"""
return (
issubclass(str, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is str
)
@classmethod
def is_sort_by_field(cls, k: str) -> bool:
"""Checks if it's a sort by field.
Args:
k: The key to check.
Returns:
True if the field is a sort by field, False otherwise.
"""
return (
issubclass(str, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ == str
) and k == "sort_by"
@staticmethod
def _define_datetime_filter(
column: str, value: Any, operator: GenericFilterOps
) -> NumericFilter:
"""Define a datetime filter for a given column.
Args:
column: The column to filter on.
value: The datetime value by which to filter.
operator: The operator to use for filtering.
Returns:
A Filter object.
Raises:
ValueError: If the value is not a valid datetime.
"""
try:
if isinstance(value, datetime):
datetime_value = value
else:
datetime_value = datetime.strptime(
value, FILTERING_DATETIME_FORMAT
)
except ValueError as e:
raise ValueError(
"The datetime filter only works with values in the following "
f"format: {FILTERING_DATETIME_FORMAT}"
) from e
datetime_filter = NumericFilter(
operation=GenericFilterOps(operator),
column=column,
value=datetime_value,
)
return datetime_filter
@staticmethod
def _define_uuid_filter(
column: str, value: Any, operator: GenericFilterOps
) -> UUIDFilter:
"""Define a UUID filter for a given column.
Args:
column: The column to filter on.
value: The UUID value by which to filter.
operator: The operator to use for filtering.
Returns:
A Filter object.
Raises:
ValueError: If the value is not a valid UUID.
"""
# For equality checks, ensure that the value is a valid UUID.
if operator == GenericFilterOps.EQUALS and not isinstance(value, UUID):
try:
UUID(value)
except ValueError as e:
raise ValueError(
"Invalid value passed as UUID query parameter."
) from e
# Cast the value to string for further comparisons.
value = str(value)
# Generate the filter.
uuid_filter = UUIDFilter(
operation=GenericFilterOps(operator),
column=column,
value=value,
)
return uuid_filter
@staticmethod
def _define_bool_filter(
column: str, value: Any, operator: GenericFilterOps
) -> BoolFilter:
"""Define a bool filter for a given column.
Args:
column: The column to filter on.
value: The bool value by which to filter.
operator: The operator to use for filtering.
Returns:
A Filter object.
"""
if GenericFilterOps(operator) != GenericFilterOps.EQUALS:
logger.warning(
"Boolean filters do not support any"
"operation except for equals. Defaulting"
"to an `equals` comparison."
)
return BoolFilter(
operation=GenericFilterOps.EQUALS,
column=column,
value=bool(value),
)
@property
def offset(self) -> int:
"""Returns the offset needed for the query on the data persistence layer.
Returns:
The offset for the query.
"""
return self.size * (self.page - 1)
def generate_filter(
self, table: Type[SQLModel]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
"""Generate the filter for the query.
Args:
table: The Table that is being queried from.
Returns:
The filter expression for the query.
Raises:
RuntimeError: If a valid logical operator is not supplied.
"""
from sqlalchemy import and_
from sqlmodel import or_
filters = []
for column_filter in self.list_of_filters:
filters.append(
column_filter.generate_query_conditions(table=table)
)
if self.logical_operator == LogicalOperators.OR:
return or_(False, *filters)
elif self.logical_operator == LogicalOperators.AND:
return and_(True, *filters)
else:
raise RuntimeError("No valid logical operator was supplied.")
def apply_filter(
self,
query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
table: Type["AnySchema"],
) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
filters = self.generate_filter(table=table)
if filters is not None:
query = query.where(filters)
return query
created: Union[datetime.datetime, str]
pydantic-field
Created
id: Union[uuid.UUID, str]
pydantic-field
Id for this resource
list_of_filters: List[zenml.models.filter_models.Filter]
property
readonly
Converts the class variables into a list of usable Filter Models.
Returns:
Type | Description |
---|---|
List[zenml.models.filter_models.Filter] |
A list of Filter models. |
logical_operator: LogicalOperators
pydantic-field
Which logical operator to use between all filters ['and', 'or']
offset: int
property
readonly
Returns the offset needed for the query on the data persistence layer.
Returns:
Type | Description |
---|---|
int |
The offset for the query. |
page: ConstrainedIntValue
pydantic-field
Page number
size: ConstrainedIntValue
pydantic-field
Page size
sort_by: str
pydantic-field
Which column to sort by.
sorting_params: Tuple[str, zenml.enums.SorterOps]
property
readonly
Converts the class variables into a list of usable Filter Models.
Returns:
Type | Description |
---|---|
Tuple[str, zenml.enums.SorterOps] |
A tuple of the column to sort by and the sorting operand. |
updated: Union[datetime.datetime, str]
pydantic-field
Updated
apply_filter(self, query, table)
Applies the filter to a query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
Union[Select[AnySchema], SelectOfScalar[AnySchema]] |
The query to which to apply the filter. |
required |
table |
Type[AnySchema] |
The query table. |
required |
Returns:
Type | Description |
---|---|
Union[Select[AnySchema], SelectOfScalar[AnySchema]] |
The query with filter applied. |
Source code in zenml/models/page_model.py
def apply_filter(
self,
query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
table: Type["AnySchema"],
) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
filters = self.generate_filter(table=table)
if filters is not None:
query = query.where(filters)
return query
filter_ops(values)
classmethod
Parse incoming filters to ensure all filters are legal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
values |
Dict[str, Any] |
The values of the class. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The values of the class. |
Source code in zenml/models/page_model.py
@root_validator(pre=True)
def filter_ops(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Parse incoming filters to ensure all filters are legal.
Args:
values: The values of the class.
Returns:
The values of the class.
"""
cls._generate_filter_list(values)
return values
generate_filter(self, table)
Generate the filter for the query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
Type[sqlmodel.main.SQLModel] |
The Table that is being queried from. |
required |
Returns:
Type | Description |
---|---|
Union[BinaryExpression[Any], BooleanClauseList[Any]] |
The filter expression for the query. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If a valid logical operator is not supplied. |
Source code in zenml/models/page_model.py
def generate_filter(
self, table: Type[SQLModel]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
"""Generate the filter for the query.
Args:
table: The Table that is being queried from.
Returns:
The filter expression for the query.
Raises:
RuntimeError: If a valid logical operator is not supplied.
"""
from sqlalchemy import and_
from sqlmodel import or_
filters = []
for column_filter in self.list_of_filters:
filters.append(
column_filter.generate_query_conditions(table=table)
)
if self.logical_operator == LogicalOperators.OR:
return or_(False, *filters)
elif self.logical_operator == LogicalOperators.AND:
return and_(True, *filters)
else:
raise RuntimeError("No valid logical operator was supplied.")
is_bool_field(k)
classmethod
Checks if it's a bool field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a bool field, False otherwise. |
Source code in zenml/models/page_model.py
@classmethod
def is_bool_field(cls, k: str) -> bool:
"""Checks if it's a bool field.
Args:
k: The key to check.
Returns:
True if the field is a bool field, False otherwise.
"""
return (
issubclass(bool, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is bool
)
is_datetime_field(k)
classmethod
Checks if it's a datetime field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a datetime field, False otherwise. |
Source code in zenml/models/page_model.py
@classmethod
def is_datetime_field(cls, k: str) -> bool:
"""Checks if it's a datetime field.
Args:
k: The key to check.
Returns:
True if the field is a datetime field, False otherwise.
"""
return (
issubclass(datetime, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is datetime
)
is_int_field(k)
classmethod
Checks if it's a int field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a int field, False otherwise. |
Source code in zenml/models/page_model.py
@classmethod
def is_int_field(cls, k: str) -> bool:
"""Checks if it's a int field.
Args:
k: The key to check.
Returns:
True if the field is a int field, False otherwise.
"""
return (
issubclass(int, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is int
)
is_sort_by_field(k)
classmethod
Checks if it's a sort by field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a sort by field, False otherwise. |
Source code in zenml/models/page_model.py
@classmethod
def is_sort_by_field(cls, k: str) -> bool:
"""Checks if it's a sort by field.
Args:
k: The key to check.
Returns:
True if the field is a sort by field, False otherwise.
"""
return (
issubclass(str, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ == str
) and k == "sort_by"
is_str_field(k)
classmethod
Checks if it's a string field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a string field, False otherwise. |
Source code in zenml/models/page_model.py
@classmethod
def is_str_field(cls, k: str) -> bool:
"""Checks if it's a string field.
Args:
k: The key to check.
Returns:
True if the field is a string field, False otherwise.
"""
return (
issubclass(str, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is str
)
is_uuid_field(k)
classmethod
Checks if it's a uuid field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a uuid field, False otherwise. |
Source code in zenml/models/page_model.py
@classmethod
def is_uuid_field(cls, k: str) -> bool:
"""Checks if it's a uuid field.
Args:
k: The key to check.
Returns:
True if the field is a uuid field, False otherwise.
"""
return (
issubclass(UUID, get_args(cls.__fields__[k].type_))
or cls.__fields__[k].type_ is UUID
)
validate_sort_by(v)
classmethod
Validate that the sort_column is a valid column with a valid operand.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
v |
str |
The sort_by field value. |
required |
Returns:
Type | Description |
---|---|
str |
The validated sort_by field value. |
Exceptions:
Type | Description |
---|---|
ValidationError |
If the sort_by field is not a string. |
ValueError |
If the resource can't be sorted by this field. |
Source code in zenml/models/page_model.py
@validator("sort_by", pre=True)
def validate_sort_by(cls, v: str) -> str:
"""Validate that the sort_column is a valid column with a valid operand.
Args:
v: The sort_by field value.
Returns:
The validated sort_by field value.
Raises:
ValidationError: If the sort_by field is not a string.
ValueError: If the resource can't be sorted by this field.
"""
# Somehow pydantic allows you to pass in int values, which will be
# interpreted as string, however within the validator they are still
# integers, which don't have a .split() method
if not isinstance(v, str):
raise ValidationError(
f"str type expected for the sort_by field. "
f"Received a {type(v)}"
)
column = v
split_value = v.split(":", 1)
if len(split_value) == 2:
column = split_value[1]
if split_value[0] not in SorterOps.values():
logger.warning(
"Invalid operand used for column sorting. "
"Only the following operands are supported `%s`. "
"Defaulting to 'asc' on column `%s`.",
SorterOps.values(),
column,
)
v = column
if column in cls.FILTER_EXCLUDE_FIELDS:
raise ValueError(
f"This resource can not be sorted by this field: '{v}'"
)
elif column in cls.__fields__:
return v
else:
raise ValueError(
"You can only sort by valid fields of this resource"
)
__contains__(self, item)
special
Returns whether the page contains a specific item.
This enables item in page
checks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item |
~B |
The item to check for. |
required |
Returns:
Type | Description |
---|---|
bool |
Whether the item is in the page. |
Source code in zenml/models/page_model.py
def __contains__(self, item: B) -> bool:
"""Returns whether the page contains a specific item.
This enables `item in page` checks.
Args:
item: The item to check for.
Returns:
Whether the item is in the page.
"""
return item in self.items
__getitem__(self, index)
special
Return the item at the given index.
This enables page[index]
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
index |
int |
The index to get the item from. |
required |
Returns:
Type | Description |
---|---|
~B |
The item at the given index. |
Source code in zenml/models/page_model.py
def __getitem__(self, index: int) -> B:
"""Return the item at the given index.
This enables `page[index]`.
Args:
index: The index to get the item from.
Returns:
The item at the given index.
"""
return self.items[index]
__iter__(self)
special
Return an iterator over the items in the page.
This enables for item in page
loops, but breaks dict(page)
.
Yields:
Type | Description |
---|---|
Generator[~B, NoneType, NoneType] |
An iterator over the items in the page. |
Source code in zenml/models/page_model.py
def __iter__(self) -> Generator[B, None, None]: # type: ignore[override]
"""Return an iterator over the items in the page.
This enables `for item in page` loops, but breaks `dict(page)`.
Yields:
An iterator over the items in the page.
"""
for item in self.items.__iter__():
yield item
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
__len__(self)
special
Return the item count of the page.
This enables len(page)
.
Returns:
Type | Description |
---|---|
int |
The amount of items in the page. |
Source code in zenml/models/page_model.py
def __len__(self) -> int:
"""Return the item count of the page.
This enables `len(page)`.
Returns:
The amount of items in the page.
"""
return len(self.items)
pipeline_build_models
Models representing pipeline builds.
BuildItem (BaseModel)
pydantic-model
Pipeline build item.
Attributes:
Name | Type | Description |
---|---|---|
image |
str |
The image name or digest. |
dockerfile |
Optional[str] |
The contents of the Dockerfile used to build the image. |
requirements |
Optional[str] |
The pip requirements installed in the image. This is a string consisting of multiple concatenated requirements.txt files. |
settings_checksum |
Optional[str] |
Checksum of the settings used for the build. |
contains_code |
bool |
Whether the image contains user files. |
requires_code_download |
bool |
Whether the image needs to download files. |
Source code in zenml/models/pipeline_build_models.py
class BuildItem(BaseModel):
"""Pipeline build item.
Attributes:
image: The image name or digest.
dockerfile: The contents of the Dockerfile used to build the image.
requirements: The pip requirements installed in the image. This is a
string consisting of multiple concatenated requirements.txt files.
settings_checksum: Checksum of the settings used for the build.
contains_code: Whether the image contains user files.
requires_code_download: Whether the image needs to download files.
"""
image: str = Field(title="The image name or digest.")
dockerfile: Optional[str] = Field(
title="The dockerfile used to build the image."
)
requirements: Optional[str] = Field(
title="The pip requirements installed in the image."
)
settings_checksum: Optional[str] = Field(
title="The checksum of the build settings."
)
contains_code: bool = Field(
default=True, title="Whether the image contains user files."
)
requires_code_download: bool = Field(
default=False, title="Whether the image needs to download files."
)
PipelineBuildBaseModel (YAMLSerializationMixin)
pydantic-model
Base model for pipeline builds.
Attributes:
Name | Type | Description |
---|---|---|
images |
Dict[str, zenml.models.pipeline_build_models.BuildItem] |
Docker images of this build. |
is_local |
bool |
Whether the images are stored locally or in a container registry. |
Source code in zenml/models/pipeline_build_models.py
class PipelineBuildBaseModel(pydantic_utils.YAMLSerializationMixin):
"""Base model for pipeline builds.
Attributes:
images: Docker images of this build.
is_local: Whether the images are stored locally or in a container
registry.
"""
images: Dict[str, BuildItem] = Field(
default={}, title="The images of this build."
)
is_local: bool = Field(
title="Whether the build images are stored in a container registry or locally.",
)
contains_code: bool = Field(
title="Whether any image of the build contains user code.",
)
zenml_version: Optional[str] = Field(
title="The version of ZenML used for this build."
)
python_version: Optional[str] = Field(
title="The Python version used for this build."
)
checksum: Optional[str] = Field(title="The build checksum.")
@property
def requires_code_download(self) -> bool:
"""Whether the build requires code download.
Returns:
Whether the build requires code download.
"""
return any(
item.requires_code_download for item in self.images.values()
)
@staticmethod
def get_image_key(component_key: str, step: Optional[str] = None) -> str:
"""Get the image key.
Args:
component_key: The component key.
step: The pipeline step for which the image was built.
Returns:
The image key.
"""
if step:
return f"{step}.{component_key}"
else:
return component_key
def get_image(self, component_key: str, step: Optional[str] = None) -> str:
"""Get the image built for a specific key.
Args:
component_key: The key for which to get the image.
step: The pipeline step for which to get the image. If no image
exists for this step, will fallback to the pipeline image for
the same key.
Returns:
The image name or digest.
"""
return self._get_item(component_key=component_key, step=step).image
def get_settings_checksum(
self, component_key: str, step: Optional[str] = None
) -> Optional[str]:
"""Get the settings checksum for a specific key.
Args:
component_key: The key for which to get the checksum.
step: The pipeline step for which to get the checksum. If no
image exists for this step, will fallback to the pipeline image
for the same key.
Returns:
The settings checksum.
"""
return self._get_item(
component_key=component_key, step=step
).settings_checksum
def _get_item(
self, component_key: str, step: Optional[str] = None
) -> BuildItem:
"""Get the item for a specific key.
Args:
component_key: The key for which to get the item.
step: The pipeline step for which to get the item. If no item
exists for this step, will fallback to the item for
the same key.
Raises:
KeyError: If no item exists for the given key.
Returns:
The build item.
"""
if step:
try:
combined_key = self.get_image_key(
component_key=component_key, step=step
)
return self.images[combined_key]
except KeyError:
pass
try:
return self.images[component_key]
except KeyError:
raise KeyError(
f"Unable to find image for key {component_key}. Available keys: "
f"{set(self.images)}."
)
requires_code_download: bool
property
readonly
Whether the build requires code download.
Returns:
Type | Description |
---|---|
bool |
Whether the build requires code download. |
get_image(self, component_key, step=None)
Get the image built for a specific key.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_key |
str |
The key for which to get the image. |
required |
step |
Optional[str] |
The pipeline step for which to get the image. If no image exists for this step, will fallback to the pipeline image for the same key. |
None |
Returns:
Type | Description |
---|---|
str |
The image name or digest. |
Source code in zenml/models/pipeline_build_models.py
def get_image(self, component_key: str, step: Optional[str] = None) -> str:
"""Get the image built for a specific key.
Args:
component_key: The key for which to get the image.
step: The pipeline step for which to get the image. If no image
exists for this step, will fallback to the pipeline image for
the same key.
Returns:
The image name or digest.
"""
return self._get_item(component_key=component_key, step=step).image
get_image_key(component_key, step=None)
staticmethod
Get the image key.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_key |
str |
The component key. |
required |
step |
Optional[str] |
The pipeline step for which the image was built. |
None |
Returns:
Type | Description |
---|---|
str |
The image key. |
Source code in zenml/models/pipeline_build_models.py
@staticmethod
def get_image_key(component_key: str, step: Optional[str] = None) -> str:
"""Get the image key.
Args:
component_key: The component key.
step: The pipeline step for which the image was built.
Returns:
The image key.
"""
if step:
return f"{step}.{component_key}"
else:
return component_key
get_settings_checksum(self, component_key, step=None)
Get the settings checksum for a specific key.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_key |
str |
The key for which to get the checksum. |
required |
step |
Optional[str] |
The pipeline step for which to get the checksum. If no image exists for this step, will fallback to the pipeline image for the same key. |
None |
Returns:
Type | Description |
---|---|
Optional[str] |
The settings checksum. |
Source code in zenml/models/pipeline_build_models.py
def get_settings_checksum(
self, component_key: str, step: Optional[str] = None
) -> Optional[str]:
"""Get the settings checksum for a specific key.
Args:
component_key: The key for which to get the checksum.
step: The pipeline step for which to get the checksum. If no
image exists for this step, will fallback to the pipeline image
for the same key.
Returns:
The settings checksum.
"""
return self._get_item(
component_key=component_key, step=step
).settings_checksum
PipelineBuildFilterModel (WorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced filtering of all pipeline builds.
Source code in zenml/models/pipeline_build_models.py
class PipelineBuildFilterModel(WorkspaceScopedFilterModel):
"""Model to enable advanced filtering of all pipeline builds."""
workspace_id: Union[UUID, str, None] = Field(
description="Workspace for this pipeline build."
)
user_id: Union[UUID, str, None] = Field(
description="User that produced this pipeline build."
)
pipeline_id: Union[UUID, str, None] = Field(
description="Pipeline associated with the pipeline build.",
)
stack_id: Union[UUID, str, None] = Field(
description="Stack used for the Pipeline Run"
)
is_local: Optional[bool] = Field(
description="Whether the build images are stored in a container registry or locally.",
)
contains_code: Optional[bool] = Field(
description="Whether any image of the build contains user code.",
)
zenml_version: Optional[str] = Field(
description="The version of ZenML used for this build."
)
python_version: Optional[str] = Field(
description="The Python version used for this build."
)
checksum: Optional[str] = Field(description="The build checksum.")
checksum: str
pydantic-field
The build checksum.
contains_code: bool
pydantic-field
Whether any image of the build contains user code.
is_local: bool
pydantic-field
Whether the build images are stored in a container registry or locally.
pipeline_id: Union[uuid.UUID, str]
pydantic-field
Pipeline associated with the pipeline build.
python_version: str
pydantic-field
The Python version used for this build.
stack_id: Union[uuid.UUID, str]
pydantic-field
Stack used for the Pipeline Run
user_id: Union[uuid.UUID, str]
pydantic-field
User that produced this pipeline build.
workspace_id: Union[uuid.UUID, str]
pydantic-field
Workspace for this pipeline build.
zenml_version: str
pydantic-field
The version of ZenML used for this build.
PipelineBuildRequestModel (PipelineBuildBaseModel, WorkspaceScopedRequestModel)
pydantic-model
Request model for pipelines builds.
Source code in zenml/models/pipeline_build_models.py
class PipelineBuildRequestModel(
PipelineBuildBaseModel, WorkspaceScopedRequestModel
):
"""Request model for pipelines builds."""
stack: Optional[UUID] = Field(
title="The stack that was used for this build."
)
pipeline: Optional[UUID] = Field(
title="The pipeline that was used for this build."
)
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
PipelineBuildResponseModel (PipelineBuildBaseModel, WorkspaceScopedResponseModel)
pydantic-model
Response model for pipeline builds.
Source code in zenml/models/pipeline_build_models.py
class PipelineBuildResponseModel(
PipelineBuildBaseModel, WorkspaceScopedResponseModel
):
"""Response model for pipeline builds."""
pipeline: Optional["PipelineResponseModel"] = Field(
title="The pipeline that was used for this build."
)
stack: Optional["StackResponseModel"] = Field(
title="The stack that was used for this build."
)
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
pipeline_deployment_models
Models representing pipeline deployments.
PipelineDeploymentBaseModel (BaseModel)
pydantic-model
Base model for pipeline deployments.
Source code in zenml/models/pipeline_deployment_models.py
class PipelineDeploymentBaseModel(BaseModel):
"""Base model for pipeline deployments."""
run_name_template: str = Field(
title="The run name template for runs created using this deployment.",
)
pipeline_configuration: PipelineConfiguration = Field(
title="The pipeline configuration for this deployment."
)
step_configurations: Dict[str, Step] = Field(
default={}, title="The step configurations for this deployment."
)
client_environment: Dict[str, str] = Field(
default={}, title="The client environment for this deployment."
)
@property
def requires_included_files(self) -> bool:
"""Whether the deployment requires included files.
Returns:
Whether the deployment requires included files.
"""
return any(
step.config.docker_settings.source_files == SourceFileMode.INCLUDE
for step in self.step_configurations.values()
)
@property
def requires_code_download(self) -> bool:
"""Whether the deployment requires downloading some code files.
Returns:
Whether the deployment requires downloading some code files.
"""
return any(
step.config.docker_settings.source_files == SourceFileMode.DOWNLOAD
for step in self.step_configurations.values()
)
requires_code_download: bool
property
readonly
Whether the deployment requires downloading some code files.
Returns:
Type | Description |
---|---|
bool |
Whether the deployment requires downloading some code files. |
requires_included_files: bool
property
readonly
Whether the deployment requires included files.
Returns:
Type | Description |
---|---|
bool |
Whether the deployment requires included files. |
PipelineDeploymentFilterModel (WorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced filtering of all pipeline deployments.
Source code in zenml/models/pipeline_deployment_models.py
class PipelineDeploymentFilterModel(WorkspaceScopedFilterModel):
"""Model to enable advanced filtering of all pipeline deployments."""
workspace_id: Optional[Union[UUID, str]] = Field(
default=None, description="Workspace for this deployment."
)
user_id: Optional[Union[UUID, str]] = Field(
default=None, description="User that created this deployment."
)
pipeline_id: Optional[Union[UUID, str]] = Field(
default=None, description="Pipeline associated with the deployment."
)
stack_id: Optional[Union[UUID, str]] = Field(
default=None, description="Stack associated with the deployment."
)
build_id: Optional[Union[UUID, str]] = Field(
default=None, description="Build associated with the deployment."
)
schedule_id: Optional[Union[UUID, str]] = Field(
default=None, description="Schedule associated with the deployment."
)
build_id: Union[uuid.UUID, str]
pydantic-field
Build associated with the deployment.
pipeline_id: Union[uuid.UUID, str]
pydantic-field
Pipeline associated with the deployment.
schedule_id: Union[uuid.UUID, str]
pydantic-field
Schedule associated with the deployment.
stack_id: Union[uuid.UUID, str]
pydantic-field
Stack associated with the deployment.
user_id: Union[uuid.UUID, str]
pydantic-field
User that created this deployment.
workspace_id: Union[uuid.UUID, str]
pydantic-field
Workspace for this deployment.
PipelineDeploymentRequestModel (PipelineDeploymentBaseModel, WorkspaceScopedRequestModel)
pydantic-model
Request model for pipeline deployments.
Source code in zenml/models/pipeline_deployment_models.py
class PipelineDeploymentRequestModel(
PipelineDeploymentBaseModel, WorkspaceScopedRequestModel
):
"""Request model for pipeline deployments."""
stack: UUID = Field(title="The stack associated with the deployment.")
pipeline: Optional[UUID] = Field(
title="The pipeline associated with the deployment."
)
build: Optional[UUID] = Field(
title="The build associated with the deployment."
)
schedule: Optional[UUID] = Field(
title="The schedule associated with the deployment."
)
code_reference: Optional["CodeReferenceRequestModel"] = Field(
title="The code reference associated with the deployment."
)
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
PipelineDeploymentResponseModel (PipelineDeploymentBaseModel, WorkspaceScopedResponseModel)
pydantic-model
Response model for pipeline deployments.
Source code in zenml/models/pipeline_deployment_models.py
class PipelineDeploymentResponseModel(
PipelineDeploymentBaseModel, WorkspaceScopedResponseModel
):
"""Response model for pipeline deployments."""
pipeline: Optional["PipelineResponseModel"] = Field(
title="The pipeline associated with the deployment."
)
stack: Optional["StackResponseModel"] = Field(
title="The stack associated with the deployment."
)
build: Optional["PipelineBuildResponseModel"] = Field(
title="The pipeline build associated with the deployment."
)
schedule: Optional["ScheduleResponseModel"] = Field(
title="The schedule associated with the deployment."
)
code_reference: Optional["CodeReferenceResponseModel"] = Field(
title="The code reference associated with the deployment."
)
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
pipeline_models
Models representing pipelines.
PipelineBaseModel (BaseModel)
pydantic-model
Base model for pipelines.
Source code in zenml/models/pipeline_models.py
class PipelineBaseModel(BaseModel):
"""Base model for pipelines."""
name: str = Field(
title="The name of the pipeline.",
max_length=STR_FIELD_MAX_LENGTH,
)
version: str = Field(
title="The version of the pipeline.",
max_length=STR_FIELD_MAX_LENGTH,
)
version_hash: str = Field(
title="The version hash of the pipeline.",
max_length=STR_FIELD_MAX_LENGTH,
)
docstring: Optional[str] = Field(
title="The docstring of the pipeline.",
max_length=TEXT_FIELD_MAX_LENGTH,
)
spec: PipelineSpec = Field(title="The spec of the pipeline.")
PipelineFilterModel (WorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced filtering of all Workspaces.
Source code in zenml/models/pipeline_models.py
class PipelineFilterModel(WorkspaceScopedFilterModel):
"""Model to enable advanced filtering of all Workspaces."""
name: Optional[str] = Field(
default=None,
description="Name of the Pipeline",
)
version: Optional[str] = Field(
default=None,
description="Version of the Pipeline",
)
version_hash: Optional[str] = Field(
default=None,
description="Version hash of the Pipeline",
)
docstring: Optional[str] = Field(
default=None,
description="Docstring of the Pipeline",
)
workspace_id: Optional[Union[UUID, str]] = Field(
default=None, description="Workspace of the Pipeline"
)
user_id: Optional[Union[UUID, str]] = Field(
default=None, description="User of the Pipeline"
)
docstring: str
pydantic-field
Docstring of the Pipeline
name: str
pydantic-field
Name of the Pipeline
user_id: Union[uuid.UUID, str]
pydantic-field
User of the Pipeline
version: str
pydantic-field
Version of the Pipeline
version_hash: str
pydantic-field
Version hash of the Pipeline
workspace_id: Union[uuid.UUID, str]
pydantic-field
Workspace of the Pipeline
PipelineRequestModel (PipelineBaseModel, WorkspaceScopedRequestModel)
pydantic-model
Pipeline request model.
Source code in zenml/models/pipeline_models.py
class PipelineRequestModel(PipelineBaseModel, WorkspaceScopedRequestModel):
"""Pipeline request model."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
PipelineResponseModel (PipelineBaseModel, WorkspaceScopedResponseModel)
pydantic-model
Pipeline response model user, workspace, runs, and status hydrated.
Source code in zenml/models/pipeline_models.py
class PipelineResponseModel(PipelineBaseModel, WorkspaceScopedResponseModel):
"""Pipeline response model user, workspace, runs, and status hydrated."""
status: Optional[List[ExecutionStatus]] = Field(
default=None, title="The status of the last 3 Pipeline Runs."
)
def get_runs(self, **kwargs: Any) -> List["PipelineRunResponseModel"]:
"""Get runs of this pipeline.
Can be used to fetch runs other than `self.runs` and supports
fine-grained filtering and pagination.
Args:
**kwargs: Further arguments for filtering or pagination that are
passed to `client.list_pipeline_runs()`.
Returns:
List of runs of this pipeline.
"""
from zenml.client import Client
return Client().list_pipeline_runs(pipeline_id=self.id, **kwargs).items
@property
def runs(self) -> List["PipelineRunResponseModel"]:
"""Returns the 50 most recent runs of this pipeline in descending order.
Returns:
The 50 most recent runs of this pipeline in descending order.
"""
return self.get_runs()
@property
def num_runs(self) -> int:
"""Returns the number of runs of this pipeline.
Returns:
The number of runs of this pipeline.
"""
from zenml.client import Client
return Client().list_pipeline_runs(pipeline_id=self.id, size=1).total
@property
def last_run(self) -> "PipelineRunResponseModel":
"""Returns the last run of this pipeline.
Returns:
The last run of this pipeline.
Raises:
RuntimeError: If no runs were found for this pipeline.
"""
runs = self.get_runs(size=1)
if not runs:
raise RuntimeError(
f"No runs found for pipeline '{self.name}' with id {self.id}."
)
return runs[0]
@property
def last_successful_run(self) -> "PipelineRunResponseModel":
"""Returns the last successful run of this pipeline.
Returns:
The last successful run of this pipeline.
Raises:
RuntimeError: If no successful runs were found for this pipeline.
"""
runs = self.get_runs(status=ExecutionStatus.COMPLETED, size=1)
if not runs:
raise RuntimeError(
f"No successful runs found for pipeline '{self.name}' with id "
f"{self.id}."
)
return runs[0]
last_run: PipelineRunResponseModel
property
readonly
Returns the last run of this pipeline.
Returns:
Type | Description |
---|---|
PipelineRunResponseModel |
The last run of this pipeline. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If no runs were found for this pipeline. |
last_successful_run: PipelineRunResponseModel
property
readonly
Returns the last successful run of this pipeline.
Returns:
Type | Description |
---|---|
PipelineRunResponseModel |
The last successful run of this pipeline. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If no successful runs were found for this pipeline. |
num_runs: int
property
readonly
Returns the number of runs of this pipeline.
Returns:
Type | Description |
---|---|
int |
The number of runs of this pipeline. |
runs: List[PipelineRunResponseModel]
property
readonly
Returns the 50 most recent runs of this pipeline in descending order.
Returns:
Type | Description |
---|---|
List[PipelineRunResponseModel] |
The 50 most recent runs of this pipeline in descending order. |
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
get_runs(self, **kwargs)
Get runs of this pipeline.
Can be used to fetch runs other than self.runs
and supports
fine-grained filtering and pagination.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs |
Any |
Further arguments for filtering or pagination that are
passed to |
{} |
Returns:
Type | Description |
---|---|
List[PipelineRunResponseModel] |
List of runs of this pipeline. |
Source code in zenml/models/pipeline_models.py
def get_runs(self, **kwargs: Any) -> List["PipelineRunResponseModel"]:
"""Get runs of this pipeline.
Can be used to fetch runs other than `self.runs` and supports
fine-grained filtering and pagination.
Args:
**kwargs: Further arguments for filtering or pagination that are
passed to `client.list_pipeline_runs()`.
Returns:
List of runs of this pipeline.
"""
from zenml.client import Client
return Client().list_pipeline_runs(pipeline_id=self.id, **kwargs).items
PipelineUpdateModel (PipelineRequestModel)
pydantic-model
Pipeline update model.
Source code in zenml/models/pipeline_models.py
class PipelineUpdateModel(PipelineRequestModel):
"""Pipeline update model."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
pipeline_run_models
Models representing pipeline runs.
PipelineRunBaseModel (BaseModel)
pydantic-model
Base model for pipeline runs.
Source code in zenml/models/pipeline_run_models.py
class PipelineRunBaseModel(BaseModel):
"""Base model for pipeline runs."""
name: str = Field(
title="The name of the pipeline run.",
max_length=STR_FIELD_MAX_LENGTH,
)
orchestrator_run_id: Optional[str] = Field(
title="The orchestrator run ID.",
max_length=STR_FIELD_MAX_LENGTH,
default=None,
)
schedule_id: Optional[UUID] = Field(
title="The ID of the schedule that triggered this pipeline run.",
default=None,
)
enable_cache: Optional[bool] = Field(
title="Whether to enable caching for this pipeline run.",
default=None,
)
start_time: Optional[datetime] = Field(
title="The start time of the pipeline run.",
default=None,
)
end_time: Optional[datetime] = Field(
title="The end time of the pipeline run.",
default=None,
)
status: ExecutionStatus = Field(
title="The status of the pipeline run.",
)
config: PipelineConfiguration = Field(
title="The pipeline configuration used for this pipeline run.",
)
num_steps: Optional[int] = Field(
title="The number of steps in this pipeline run.",
default=None,
)
client_version: Optional[str] = Field(
title="Client version.",
default=current_zenml_version,
max_length=STR_FIELD_MAX_LENGTH,
)
server_version: Optional[str] = Field(
title="Server version.",
max_length=STR_FIELD_MAX_LENGTH,
)
client_environment: Dict[str, str] = Field(
default={},
title=(
"Environment of the client that initiated this pipeline run "
"(OS, Python version, etc.)."
),
)
orchestrator_environment: Dict[str, str] = Field(
default={},
title=(
"Environment of the orchestrator that executed this pipeline run "
"(OS, Python version, etc.)."
),
)
PipelineRunFilterModel (WorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced filtering of all Workspaces.
Source code in zenml/models/pipeline_run_models.py
class PipelineRunFilterModel(WorkspaceScopedFilterModel):
"""Model to enable advanced filtering of all Workspaces."""
FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*WorkspaceScopedFilterModel.FILTER_EXCLUDE_FIELDS,
"unlisted",
"code_repository_id",
]
name: Optional[str] = Field(
default=None,
description="Name of the Pipeline Run",
)
orchestrator_run_id: Optional[str] = Field(
default=None,
description="Name of the Pipeline Run within the orchestrator",
)
pipeline_id: Optional[Union[UUID, str]] = Field(
default=None, description="Pipeline associated with the Pipeline Run"
)
workspace_id: Optional[Union[UUID, str]] = Field(
default=None, description="Workspace of the Pipeline Run"
)
user_id: Optional[Union[UUID, str]] = Field(
default=None, description="User that created the Pipeline Run"
)
stack_id: Optional[Union[UUID, str]] = Field(
default=None, description="Stack used for the Pipeline Run"
)
schedule_id: Optional[Union[UUID, str]] = Field(
default=None, description="Schedule that triggered the Pipeline Run"
)
build_id: Optional[Union[UUID, str]] = Field(
default=None, description="Build used for the Pipeline Run"
)
deployment_id: Optional[Union[UUID, str]] = Field(
default=None, description="Deployment used for the Pipeline Run"
)
code_repository_id: Optional[Union[UUID, str]] = Field(
default=None, description="Code repository used for the Pipeline Run"
)
status: Optional[str] = Field(
default=None,
description="Name of the Pipeline Run",
)
start_time: Optional[Union[datetime, str]] = Field(
default=None, description="Start time for this run"
)
end_time: Optional[Union[datetime, str]] = Field(
default=None, description="End time for this run"
)
num_steps: Optional[int] = Field(
default=None,
description="Amount of steps in the Pipeline Run",
)
unlisted: Optional[bool] = None
def generate_filter(
self, table: Type["SQLModel"]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
"""Generate the filter for the query.
Args:
table: The Table that is being queried from.
Returns:
The filter expression for the query.
"""
from sqlalchemy import and_
from sqlmodel import or_
base_filter = super().generate_filter(table)
operator = (
or_ if self.logical_operator == LogicalOperators.OR else and_
)
if self.unlisted is not None:
if self.unlisted is True:
unlisted_filter = getattr(table, "pipeline_id").is_(None)
else:
unlisted_filter = getattr(table, "pipeline_id").is_not(None)
base_filter = operator(base_filter, unlisted_filter)
if self.code_repository_id:
from zenml.zen_stores.schemas import (
CodeReferenceSchema,
PipelineDeploymentSchema,
PipelineRunSchema,
)
code_repo_filter = and_( # type: ignore[type-var]
PipelineRunSchema.deployment_id == PipelineDeploymentSchema.id,
PipelineDeploymentSchema.code_reference_id
== CodeReferenceSchema.id,
CodeReferenceSchema.code_repository_id
== self.code_repository_id,
)
base_filter = operator(base_filter, code_repo_filter)
return base_filter
build_id: Union[uuid.UUID, str]
pydantic-field
Build used for the Pipeline Run
code_repository_id: Union[uuid.UUID, str]
pydantic-field
Code repository used for the Pipeline Run
deployment_id: Union[uuid.UUID, str]
pydantic-field
Deployment used for the Pipeline Run
end_time: Union[datetime.datetime, str]
pydantic-field
End time for this run
name: str
pydantic-field
Name of the Pipeline Run
num_steps: int
pydantic-field
Amount of steps in the Pipeline Run
orchestrator_run_id: str
pydantic-field
Name of the Pipeline Run within the orchestrator
pipeline_id: Union[uuid.UUID, str]
pydantic-field
Pipeline associated with the Pipeline Run
schedule_id: Union[uuid.UUID, str]
pydantic-field
Schedule that triggered the Pipeline Run
stack_id: Union[uuid.UUID, str]
pydantic-field
Stack used for the Pipeline Run
start_time: Union[datetime.datetime, str]
pydantic-field
Start time for this run
status: str
pydantic-field
Name of the Pipeline Run
user_id: Union[uuid.UUID, str]
pydantic-field
User that created the Pipeline Run
workspace_id: Union[uuid.UUID, str]
pydantic-field
Workspace of the Pipeline Run
generate_filter(self, table)
Generate the filter for the query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
Type[SQLModel] |
The Table that is being queried from. |
required |
Returns:
Type | Description |
---|---|
Union[BinaryExpression[Any], BooleanClauseList[Any]] |
The filter expression for the query. |
Source code in zenml/models/pipeline_run_models.py
def generate_filter(
self, table: Type["SQLModel"]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
"""Generate the filter for the query.
Args:
table: The Table that is being queried from.
Returns:
The filter expression for the query.
"""
from sqlalchemy import and_
from sqlmodel import or_
base_filter = super().generate_filter(table)
operator = (
or_ if self.logical_operator == LogicalOperators.OR else and_
)
if self.unlisted is not None:
if self.unlisted is True:
unlisted_filter = getattr(table, "pipeline_id").is_(None)
else:
unlisted_filter = getattr(table, "pipeline_id").is_not(None)
base_filter = operator(base_filter, unlisted_filter)
if self.code_repository_id:
from zenml.zen_stores.schemas import (
CodeReferenceSchema,
PipelineDeploymentSchema,
PipelineRunSchema,
)
code_repo_filter = and_( # type: ignore[type-var]
PipelineRunSchema.deployment_id == PipelineDeploymentSchema.id,
PipelineDeploymentSchema.code_reference_id
== CodeReferenceSchema.id,
CodeReferenceSchema.code_repository_id
== self.code_repository_id,
)
base_filter = operator(base_filter, code_repo_filter)
return base_filter
PipelineRunRequestModel (PipelineRunBaseModel, WorkspaceScopedRequestModel)
pydantic-model
Pipeline run model with user, workspace, pipeline, and stack as UUIDs.
Source code in zenml/models/pipeline_run_models.py
class PipelineRunRequestModel(
PipelineRunBaseModel, WorkspaceScopedRequestModel
):
"""Pipeline run model with user, workspace, pipeline, and stack as UUIDs."""
id: UUID
stack: Optional[UUID] # Might become None if the stack is deleted.
pipeline: Optional[UUID] # Unlisted runs have this as None.
build: Optional[UUID]
deployment: Optional[UUID]
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
PipelineRunResponseModel (PipelineRunBaseModel, WorkspaceScopedResponseModel)
pydantic-model
Pipeline run model with user, workspace, pipeline, and stack hydrated.
Source code in zenml/models/pipeline_run_models.py
class PipelineRunResponseModel(
PipelineRunBaseModel, WorkspaceScopedResponseModel
):
"""Pipeline run model with user, workspace, pipeline, and stack hydrated."""
pipeline: Optional["PipelineResponseModel"] = Field(
default=None, title="The pipeline this run belongs to."
)
stack: Optional["StackResponseModel"] = Field(
default=None, title="The stack that was used for this run."
)
metadata: Dict[str, "RunMetadataResponseModel"] = Field(
default={},
title="Metadata associated with this pipeline run.",
)
build: Optional["PipelineBuildResponseModel"] = Field(
default=None, title="The pipeline build that was used for this run."
)
deployment: Optional["PipelineDeploymentResponseModel"] = Field(
default=None, title="The deployment that was used for this run."
)
steps: Dict[str, "StepRunResponseModel"] = Field(
default={}, title="The steps of this run."
)
@property
def artifacts(self) -> List["ArtifactResponseModel"]:
"""Get all artifacts that are outputs of steps of this pipeline run.
Returns:
All output artifacts of this pipeline run (including cached ones).
"""
from zenml.utils.artifact_utils import get_artifacts_of_pipeline_run
return get_artifacts_of_pipeline_run(self)
@property
def produced_artifacts(self) -> List["ArtifactResponseModel"]:
"""Get all artifacts produced during this pipeline run.
Returns:
A list of all artifacts produced during this pipeline run.
"""
from zenml.utils.artifact_utils import get_artifacts_of_pipeline_run
return get_artifacts_of_pipeline_run(self, only_produced=True)
def get_step(self, step: str) -> "StepRunResponseModel":
"""(Deprecated) Get a step by name.
Args:
step: Name of the step to get.
Returns:
The step with the given name.
"""
from zenml.logger import get_logger
logger = get_logger(__name__)
logger.warning(
"`run.get_step(<step_name>)` is deprecated and will be removed in "
"a future release. Please use `run.steps[<step_name>]` instead."
)
return self.steps[step]
artifacts: List[ArtifactResponseModel]
property
readonly
Get all artifacts that are outputs of steps of this pipeline run.
Returns:
Type | Description |
---|---|
List[ArtifactResponseModel] |
All output artifacts of this pipeline run (including cached ones). |
produced_artifacts: List[ArtifactResponseModel]
property
readonly
Get all artifacts produced during this pipeline run.
Returns:
Type | Description |
---|---|
List[ArtifactResponseModel] |
A list of all artifacts produced during this pipeline run. |
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
get_step(self, step)
(Deprecated) Get a step by name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step |
str |
Name of the step to get. |
required |
Returns:
Type | Description |
---|---|
StepRunResponseModel |
The step with the given name. |
Source code in zenml/models/pipeline_run_models.py
def get_step(self, step: str) -> "StepRunResponseModel":
"""(Deprecated) Get a step by name.
Args:
step: Name of the step to get.
Returns:
The step with the given name.
"""
from zenml.logger import get_logger
logger = get_logger(__name__)
logger.warning(
"`run.get_step(<step_name>)` is deprecated and will be removed in "
"a future release. Please use `run.steps[<step_name>]` instead."
)
return self.steps[step]
PipelineRunUpdateModel (BaseModel)
pydantic-model
Pipeline run update model.
Source code in zenml/models/pipeline_run_models.py
class PipelineRunUpdateModel(BaseModel):
"""Pipeline run update model."""
status: Optional[ExecutionStatus] = None
end_time: Optional[datetime] = None
role_models
Models representing roles that can be assigned to users or teams.
RoleBaseModel (BaseModel)
pydantic-model
Base model for roles.
Source code in zenml/models/role_models.py
class RoleBaseModel(BaseModel):
"""Base model for roles."""
name: str = Field(
title="The unique name of the role.",
max_length=STR_FIELD_MAX_LENGTH,
)
permissions: Set[PermissionType]
RoleFilterModel (BaseFilterModel)
pydantic-model
Model to enable advanced filtering of all Users.
Source code in zenml/models/role_models.py
class RoleFilterModel(BaseFilterModel):
"""Model to enable advanced filtering of all Users."""
name: Optional[str] = Field(
default=None,
description="Name of the role",
)
name: str
pydantic-field
Name of the role
RoleRequestModel (RoleBaseModel, BaseRequestModel)
pydantic-model
Request model for roles.
Source code in zenml/models/role_models.py
class RoleRequestModel(RoleBaseModel, BaseRequestModel):
"""Request model for roles."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
RoleResponseModel (RoleBaseModel, BaseResponseModel)
pydantic-model
Response model for roles.
Source code in zenml/models/role_models.py
class RoleResponseModel(RoleBaseModel, BaseResponseModel):
"""Response model for roles."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
RoleUpdateModel (RoleRequestModel)
pydantic-model
Update model for roles.
Source code in zenml/models/role_models.py
class RoleUpdateModel(RoleRequestModel):
"""Update model for roles."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
run_metadata_models
Models representing run metadata.
RunMetadataBaseModel (BaseModel)
pydantic-model
Base model for run metadata.
Source code in zenml/models/run_metadata_models.py
class RunMetadataBaseModel(BaseModel):
"""Base model for run metadata."""
pipeline_run_id: Optional[UUID] = Field(
title="The ID of the pipeline run that this metadata belongs to.",
)
step_run_id: Optional[UUID]
artifact_id: Optional[UUID]
stack_component_id: Optional[UUID]
key: str = Field(
title="The key of the metadata.",
max_length=STR_FIELD_MAX_LENGTH,
)
value: MetadataType = Field(
title="The value of the metadata.",
max_length=TEXT_FIELD_MAX_LENGTH,
)
type: MetadataTypeEnum = Field(
title="The type of the metadata.",
max_length=STR_FIELD_MAX_LENGTH,
)
RunMetadataFilterModel (WorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced filtering of run metadata.
Source code in zenml/models/run_metadata_models.py
class RunMetadataFilterModel(WorkspaceScopedFilterModel):
"""Model to enable advanced filtering of run metadata."""
pipeline_run_id: Optional[Union[str, UUID]] = None
step_run_id: Optional[Union[str, UUID]] = None
artifact_id: Optional[Union[str, UUID]] = None
stack_component_id: Optional[Union[str, UUID]] = None
key: Optional[str] = None
type: Optional[Union[str, MetadataTypeEnum]] = None
RunMetadataRequestModel (RunMetadataBaseModel, WorkspaceScopedRequestModel)
pydantic-model
Request model for run metadata.
Source code in zenml/models/run_metadata_models.py
class RunMetadataRequestModel(
RunMetadataBaseModel, WorkspaceScopedRequestModel
):
"""Request model for run metadata."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
RunMetadataResponseModel (RunMetadataBaseModel, WorkspaceScopedResponseModel)
pydantic-model
Response model for run metadata.
Source code in zenml/models/run_metadata_models.py
class RunMetadataResponseModel(
RunMetadataBaseModel, WorkspaceScopedResponseModel
):
"""Response model for run metadata."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
schedule_model
Model definition for pipeline run schedules.
ScheduleBaseModel (Schedule, BaseModel)
pydantic-model
Domain model for schedules.
Source code in zenml/models/schedule_model.py
class ScheduleBaseModel(Schedule, BaseModel):
"""Domain model for schedules."""
ANALYTICS_FIELDS: ClassVar[List[str]] = ["id"]
name: str
active: bool
orchestrator_id: Optional[UUID]
pipeline_id: Optional[UUID]
ScheduleFilterModel (ShareableWorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced filtering of all Users.
Source code in zenml/models/schedule_model.py
class ScheduleFilterModel(ShareableWorkspaceScopedFilterModel):
"""Model to enable advanced filtering of all Users."""
workspace_id: Optional[Union[UUID, str]] = Field(
default=None, description="Workspace scope of the schedule."
)
user_id: Optional[Union[UUID, str]] = Field(
default=None, description="User that created the schedule"
)
pipeline_id: Optional[Union[UUID, str]] = Field(
default=None, description="Pipeline that the schedule is attached to."
)
orchestrator_id: Optional[Union[UUID, str]] = Field(
default=None,
description="Orchestrator that the schedule is attached to.",
)
active: Optional[bool] = Field(
default=None,
description="If the schedule is active",
)
cron_expression: Optional[str] = Field(
default=None,
description="The cron expression, describing the schedule",
)
start_time: Optional[Union[datetime, str]] = Field(
default=None, description="Start time"
)
end_time: Optional[Union[datetime, str]] = Field(
default=None, description="End time"
)
interval_second: Optional[Optional[float]] = Field(
default=None,
description="The repetition interval in seconds",
)
catchup: Optional[bool] = Field(
default=None,
description="Whether or not the schedule is set to catchup past missed "
"events",
)
name: Optional[str] = Field(
default=None,
description="Name of the schedule",
)
active: bool
pydantic-field
If the schedule is active
catchup: bool
pydantic-field
Whether or not the schedule is set to catchup past missed events
cron_expression: str
pydantic-field
The cron expression, describing the schedule
end_time: Union[datetime.datetime, str]
pydantic-field
End time
interval_second: float
pydantic-field
The repetition interval in seconds
name: str
pydantic-field
Name of the schedule
orchestrator_id: Union[uuid.UUID, str]
pydantic-field
Orchestrator that the schedule is attached to.
pipeline_id: Union[uuid.UUID, str]
pydantic-field
Pipeline that the schedule is attached to.
start_time: Union[datetime.datetime, str]
pydantic-field
Start time
user_id: Union[uuid.UUID, str]
pydantic-field
User that created the schedule
workspace_id: Union[uuid.UUID, str]
pydantic-field
Workspace scope of the schedule.
ScheduleRequestModel (ScheduleBaseModel, WorkspaceScopedRequestModel)
pydantic-model
Schedule request model.
Source code in zenml/models/schedule_model.py
class ScheduleRequestModel(ScheduleBaseModel, WorkspaceScopedRequestModel):
"""Schedule request model."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
ScheduleResponseModel (ScheduleBaseModel, WorkspaceScopedResponseModel)
pydantic-model
Schedule response model with workspace and user hydrated.
Source code in zenml/models/schedule_model.py
class ScheduleResponseModel(ScheduleBaseModel, WorkspaceScopedResponseModel):
"""Schedule response model with workspace and user hydrated."""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
ScheduleUpdateModel (BaseModel)
pydantic-model
Schedule update model.
Source code in zenml/models/schedule_model.py
class ScheduleUpdateModel(BaseModel):
"""Schedule update model."""
name: Optional[str] = None
active: Optional[bool] = None
cron_expression: Optional[str] = None
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
interval_second: Optional[timedelta] = None
catchup: Optional[bool] = None
secret_models
Models representing secrets.
SecretBaseModel (BaseModel)
pydantic-model
Base model for secrets.
Source code in zenml/models/secret_models.py
class SecretBaseModel(BaseModel):
"""Base model for secrets."""
name: str = Field(
title="The name of the secret.",
max_length=STR_FIELD_MAX_LENGTH,
)
scope: SecretScope = Field(
SecretScope.WORKSPACE, title="The scope of the secret."
)
values: Dict[str, Optional[SecretStr]] = Field(
default_factory=dict, title="The values stored in this secret."
)
@property
def secret_values(self) -> Dict[str, str]:
"""A dictionary with all un-obfuscated values stored in this secret.
The values are returned as strings, not SecretStr. If a value is
None, it is not included in the returned dictionary. This is to enable
the use of None values in the update model to indicate that a secret
value should be deleted.
Returns:
A dictionary containing the secret's values.
"""
return {
k: v.get_secret_value()
for k, v in self.values.items()
if v is not None
}
@property
def has_missing_values(self) -> bool:
"""Returns True if the secret has missing values (i.e. None).
Values can be missing from a secret for example if the user retrieves a
secret but does not have the permission to view the secret values.
Returns:
True if the secret has any values set to None.
"""
return any(v is None for v in self.values.values())
def add_secret(self, key: str, value: str) -> None:
"""Adds a secret value to the secret.
Args:
key: The key of the secret value.
value: The secret value.
"""
self.values[key] = SecretStr(value)
def remove_secret(self, key: str) -> None:
"""Removes a secret value from the secret.
Args:
key: The key of the secret value.
"""
del self.values[key]
def remove_secrets(self) -> None:
"""Removes all secret values from the secret but keep the keys."""
self.values = {k: None for k in self.values.keys()}
has_missing_values: bool
property
readonly
Returns True if the secret has missing values (i.e. None).
Values can be missing from a secret for example if the user retrieves a secret but does not have the permission to view the secret values.
Returns:
Type | Description |
---|---|
bool |
True if the secret has any values set to None. |
secret_values: Dict[str, str]
property
readonly
A dictionary with all un-obfuscated values stored in this secret.
The values are returned as strings, not SecretStr. If a value is None, it is not included in the returned dictionary. This is to enable the use of None values in the update model to indicate that a secret value should be deleted.
Returns:
Type | Description |
---|---|
Dict[str, str] |
A dictionary containing the secret's values. |
add_secret(self, key, value)
Adds a secret value to the secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
The key of the secret value. |
required |
value |
str |
The secret value. |
required |
Source code in zenml/models/secret_models.py
def add_secret(self, key: str, value: str) -> None:
"""Adds a secret value to the secret.
Args:
key: The key of the secret value.
value: The secret value.
"""
self.values[key] = SecretStr(value)
remove_secret(self, key)
Removes a secret value from the secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
The key of the secret value. |
required |
Source code in zenml/models/secret_models.py
def remove_secret(self, key: str) -> None:
"""Removes a secret value from the secret.
Args:
key: The key of the secret value.
"""
del self.values[key]
remove_secrets(self)
Removes all secret values from the secret but keep the keys.
Source code in zenml/models/secret_models.py
def remove_secrets(self) -> None:
"""Removes all secret values from the secret but keep the keys."""
self.values = {k: None for k in self.values.keys()}
SecretFilterModel (WorkspaceScopedFilterModel)
pydantic-model
Model to enable advanced filtering of all Secrets.
Source code in zenml/models/secret_models.py
class SecretFilterModel(WorkspaceScopedFilterModel):
"""Model to enable advanced filtering of all Secrets."""
FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*WorkspaceScopedFilterModel.FILTER_EXCLUDE_FIELDS,
"values",
]
name: Optional[str] = Field(
default=None,
description="Name of the secret",
)
scope: Optional[Union[SecretScope, str]] = Field(
default=None,
description="Scope in which to filter secrets",
)
workspace_id: Optional[Union[UUID, str]] = Field(
default=None, description="Workspace of the Secret"
)
user_id: Optional[Union[UUID, str]] = Field(
default=None, description="User that created the Secret"
)
@staticmethod
def _get_filtering_value(value: Optional[Any]) -> str:
"""Convert the value to a string that can be used for lexicographical filtering and sorting.
Args:
value: The value to convert.
Returns:
The value converted to string format that can be used for
lexicographical sorting and filtering.
"""
if value is None:
return ""
str_value = str(value)
if isinstance(value, datetime):
str_value = value.strftime("%Y-%m-%d %H:%M:%S")
return str_value
def secret_matches(self, secret: SecretResponseModel) -> bool:
"""Checks if a secret matches the filter criteria.
Args:
secret: The secret to check.
Returns:
True if the secret matches the filter criteria, False otherwise.
"""
for filter in self.list_of_filters:
column_value: Optional[Any] = None
if filter.column == "workspace_id":
column_value = secret.workspace.id
elif filter.column == "user_id":
column_value = secret.user.id if secret.user else None
else:
column_value = getattr(secret, filter.column)
# Convert the values to strings for lexicographical comparison.
str_column_value = self._get_filtering_value(column_value)
str_filter_value = self._get_filtering_value(filter.value)
# Compare the lexicographical values according to the operation.
if filter.operation == GenericFilterOps.EQUALS:
result = str_column_value == str_filter_value
elif filter.operation == GenericFilterOps.CONTAINS:
result = str_filter_value in str_column_value
elif filter.operation == GenericFilterOps.STARTSWITH:
result = str_column_value.startswith(str_filter_value)
elif filter.operation == GenericFilterOps.ENDSWITH:
result = str_column_value.endswith(str_filter_value)
elif filter.operation == GenericFilterOps.GT:
result = str_column_value > str_filter_value
elif filter.operation == GenericFilterOps.GTE:
result = str_column_value >= str_filter_value
elif filter.operation == GenericFilterOps.LT:
result = str_column_value < str_filter_value
elif filter.operation == GenericFilterOps.LTE:
result = str_column_value <= str_filter_value
# Exit early if the result is False for AND and True for OR
if self.logical_operator == LogicalOperators.AND:
if not result:
return False
else:
if result:
return True
# If we get here, all filters have been checked and the result is
# True for AND and False for OR
if self.logical_operator == LogicalOperators.AND:
return True
else:
return False
def sort_secrets(
self, secrets: List[SecretResponseModel]
) -> List[SecretResponseModel]:
"""Sorts a list of secrets according to the filter criteria.
Args:
secrets: The list of secrets to sort.
Returns:
The sorted list of secrets.
"""
column, sort_op = self.sorting_params
sorted_secrets = sorted(
secrets,
key=lambda secret: self._get_filtering_value(
getattr(secret, column)
),
reverse=sort_op == SorterOps.DESCENDING,
)
return sorted_secrets
name: str
pydantic-field
Name of the secret
scope: Union[zenml.enums.SecretScope, str]
pydantic-field
Scope in which to filter secrets
user_id: Union[uuid.UUID, str]
pydantic-field
User that created the Secret
workspace_id: Union[uuid.UUID, str]
pydantic-field
Workspace of the Secret
secret_matches(self, secret)
Checks if a secret matches the filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
SecretResponseModel |
The secret to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the secret matches the filter criteria, False otherwise. |
Source code in zenml/models/secret_models.py
def secret_matches(self, secret: SecretResponseModel) -> bool:
"""Checks if a secret matches the filter criteria.
Args:
secret: The secret to check.
Returns:
True if the secret matches the filter criteria, False otherwise.
"""
for filter in self.list_of_filters:
column_value: Optional[Any] = None
if filter.column == "workspace_id":
column_value = secret.workspace.id
elif filter.column == "user_id":
column_value = secret.user.id if secret.user else None
else:
column_value = getattr(secret, filter.column)
# Convert the values to strings for lexicographical comparison.
str_column_value = self._get_filtering_value(column_value)
str_filter_value = self._get_filtering_value(filter.value)
# Compare the lexicographical values according to the operation.
if filter.operation == GenericFilterOps.EQUALS:
result = str_column_value == str_filter_value
elif filter.operation == GenericFilterOps.CONTAINS:
result = str_filter_value in str_column_value
elif filter.operation == GenericFilterOps.STARTSWITH:
result = str_column_value.startswith(str_filter_value)
elif filter.operation == GenericFilterOps.ENDSWITH:
result = str_column_value.endswith(str_filter_value)
elif filter.operation == GenericFilterOps.GT:
result = str_column_value > str_filter_value
elif filter.operation == GenericFilterOps.GTE:
result = str_column_value >= str_filter_value
elif filter.operation == GenericFilterOps.LT:
result = str_column_value < str_filter_value
elif filter.operation == GenericFilterOps.LTE:
result = str_column_value <= str_filter_value
# Exit early if the result is False for AND and True for OR
if self.logical_operator == LogicalOperators.AND:
if not result:
return False
else:
if result:
return True
# If we get here, all filters have been checked and the result is
# True for AND and False for OR
if self.logical_operator == LogicalOperators.AND:
return True
else:
return False
sort_secrets(self, secrets)
Sorts a list of secrets according to the filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secrets |
List[zenml.models.secret_models.SecretResponseModel] |
The list of secrets to sort. |
required |
Returns:
Type | Description |
---|---|
List[zenml.models.secret_models.SecretResponseModel] |
The sorted list of secrets. |
Source code in zenml/models/secret_models.py
def sort_secrets(
self, secrets: List[SecretResponseModel]
) -> List[SecretResponseModel]:
"""Sorts a list of secrets according to the filter criteria.
Args:
secrets: The list of secrets to sort.
Returns:
The sorted list of secrets.
"""
column, sort_op = self.sorting_params
sorted_secrets = sorted(
secrets,
key=lambda secret: self._get_filtering_value(
getattr(secret, column)
),
reverse=sort_op == SorterOps.DESCENDING,
)
return sorted_secrets
SecretRequestModel (SecretBaseModel, WorkspaceScopedRequestModel)
pydantic-model
Secret request model.
Source code in zenml/models/secret_models.py
class SecretRequestModel(SecretBaseModel, WorkspaceScopedRequestModel):
"""Secret request model."""
ANALYTICS_FIELDS: ClassVar[List[str]] = ["scope"]
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
SecretResponseModel (SecretBaseModel, WorkspaceScopedResponseModel)
pydantic-model
Secret response model with user and workspace hydrated.
Source code in zenml/models/secret_models.py
class SecretResponseModel(SecretBaseModel, WorkspaceScopedResponseModel):
"""Secret response model with user and workspace hydrated."""
ANALYTICS_FIELDS: ClassVar[List[str]] = ["scope"]
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
SecretUpdateModel (SecretRequestModel)
pydantic-model
Secret update model.
Source code in zenml/models/secret_models.py
class SecretUpdateModel(SecretRequestModel):
"""Secret update model."""
scope: Optional[SecretScope] = Field( # type: ignore[assignment]
default=None, title="The scope of the secret."
)
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
server_models
Model definitions for ZenML servers.
ServerDatabaseType (StrEnum)
Enum for server database types.
Source code in zenml/models/server_models.py
class ServerDatabaseType(StrEnum):
"""Enum for server database types."""
SQLITE = "sqlite"
MYSQL = "mysql"
OTHER = "other"
ServerDeploymentType (StrEnum)
Enum for server deployment types.
Source code in zenml/models/server_models.py
class ServerDeploymentType(StrEnum):
"""Enum for server deployment types."""
LOCAL = "local"
DOCKER = "docker"
KUBERNETES = "kubernetes"
AWS = "aws"
GCP = "gcp"
AZURE = "azure"
ALPHA = "alpha"
OTHER = "other"
HF_SPACES = "hf_spaces"
SANDBOX = "sandbox"
CLOUD = "cloud"
ServerModel (BaseModel)
pydantic-model
Domain model for ZenML servers.
Source code in zenml/models/server_models.py
class ServerModel(BaseModel):
"""Domain model for ZenML servers."""
id: UUID = Field(default_factory=uuid4, title="The unique server id.")
version: str = Field(
title="The ZenML version that the server is running.",
)
debug: bool = Field(
False, title="Flag to indicate whether ZenML is running on debug mode."
)
deployment_type: ServerDeploymentType = Field(
ServerDeploymentType.OTHER,
title="The ZenML server deployment type.",
)
database_type: ServerDatabaseType = Field(
ServerDatabaseType.OTHER,
title="The database type that the server is using.",
)
secrets_store_type: SecretsStoreType = Field(
SecretsStoreType.NONE,
title="The type of secrets store that the server is using.",
)
def is_local(self) -> bool:
"""Return whether the server is running locally.
Returns:
True if the server is running locally, False otherwise.
"""
from zenml.config.global_config import GlobalConfiguration
# Local ZenML servers are identifiable by the fact that their
# server ID is the same as the local client (user) ID.
return self.id == GlobalConfiguration().user_id
is_local(self)
Return whether the server is running locally.
Returns:
Type | Description |
---|---|
bool |
True if the server is running locally, False otherwise. |
Source code in zenml/models/server_models.py
def is_local(self) -> bool:
"""Return whether the server is running locally.
Returns:
True if the server is running locally, False otherwise.
"""
from zenml.config.global_config import GlobalConfiguration
# Local ZenML servers are identifiable by the fact that their
# server ID is the same as the local client (user) ID.
return self.id == GlobalConfiguration().user_id
service_connector_models
Model definitions for ZenML service connectors.
AuthenticationMethodModel (BaseModel)
pydantic-model
Authentication method specification.
Describes the schema for the configuration and secrets that need to be provided to configure an authentication method.
Source code in zenml/models/service_connector_models.py
class AuthenticationMethodModel(BaseModel):
"""Authentication method specification.
Describes the schema for the configuration and secrets that need to be
provided to configure an authentication method.
"""
name: str = Field(
title="User readable name for the authentication method.",
)
auth_method: str = Field(
title="The name of the authentication method.",
max_length=STR_FIELD_MAX_LENGTH,
)
description: str = Field(
default="",
title="A description of the authentication method.",
)
config_schema: Dict[str, Any] = Field(
default_factory=dict,
title="The JSON schema of the configuration for this authentication "
"method.",
)
min_expiration_seconds: Optional[int] = Field(
default=None,
title="The minimum number of seconds that the authentication "
"session can be configured to be valid for. Set to None for "
"authentication sessions and long-lived credentials that don't expire.",
)
max_expiration_seconds: Optional[int] = Field(
default=None,
title="The maximum number of seconds that the authentication "
"session can be configured to be valid for. Set to None for "
"authentication sessions and long-lived credentials that don't expire.",
)
default_expiration_seconds: Optional[int] = Field(
default=None,
title="The default number of seconds that the authentication "
"session is valid for. Set to None for authentication sessions and "
"long-lived credentials that don't expire.",
)
_config_class: Optional[Type[BaseModel]] = None
def __init__(
self, config_class: Optional[Type[BaseModel]] = None, **values: Any
):
"""Initialize the authentication method.
Args:
config_class: The configuration class for the authentication
method.
**values: The data to initialize the authentication method with.
"""
if config_class:
values["config_schema"] = json.loads(config_class.schema_json())
super().__init__(**values)
self._config_class = config_class
@property
def config_class(self) -> Optional[Type[BaseModel]]:
"""Get the configuration class for the authentication method.
Returns:
The configuration class for the authentication method.
"""
return