Models
zenml.models
special
Pydantic models for the various concepts in ZenML.
v2
special
base
special
base
Base model definitions.
BaseDatedResponseBody (BaseResponseBody)
Base body model for entities that track a creation and update timestamp.
Used as a base class for all body models associated with responses. Features a creation and update timestamp.
Source code in zenml/models/v2/base/base.py
class BaseDatedResponseBody(BaseResponseBody):
"""Base body model for entities that track a creation and update timestamp.
Used as a base class for all body models associated with responses.
Features a creation and update timestamp.
"""
created: datetime = Field(
title="The timestamp when this resource was created."
)
updated: datetime = Field(
title="The timestamp when this resource was last updated."
)
BaseIdentifiedResponse (BaseResponse[~AnyDatedBody, ~AnyMetadata, ~AnyResources], Generic)
Base domain model for resources with DB representation.
Source code in zenml/models/v2/base/base.py
class BaseIdentifiedResponse(
BaseResponse[AnyDatedBody, AnyMetadata, AnyResources],
Generic[AnyDatedBody, AnyMetadata, AnyResources],
):
"""Base domain model for resources with DB representation."""
id: UUID = Field(title="The unique resource id.")
permission_denied: bool = False
# Helper functions
def __hash__(self) -> int:
"""Implementation of hash magic method.
Returns:
Hash of the UUID.
"""
return hash((type(self),) + tuple([self.id]))
def __eq__(self, other: Any) -> bool:
"""Implementation of equality magic method.
Args:
other: The other object to compare to.
Returns:
True if the other object is of the same type and has the same UUID.
"""
if isinstance(other, type(self)):
return self.id == other.id
else:
return False
def _validate_hydrated_version(
self,
hydrated_model: "BaseResponse[AnyDatedBody, AnyMetadata, AnyResources]",
) -> None:
"""Helper method to validate the values within the hydrated version.
Args:
hydrated_model: the hydrated version of the model.
Raises:
HydrationError: if the hydrated version has different values set
for either the name of the body fields and the
_method_body_mutation is set to ResponseBodyUpdate.DENY.
"""
super()._validate_hydrated_version(hydrated_model)
assert isinstance(hydrated_model, type(self))
# Check if the ID is the same
if self.id != hydrated_model.id:
raise HydrationError(
"The hydrated version of the model does not have the same id."
)
def get_hydrated_version(
self,
) -> "BaseIdentifiedResponse[AnyDatedBody, AnyMetadata, AnyResources]":
"""Abstract method to fetch the hydrated version of the model.
Raises:
NotImplementedError: in case the method is not implemented.
"""
raise NotImplementedError(
"Please implement a `get_hydrated_version` method before "
"using/hydrating the model."
)
def get_body(self) -> "AnyDatedBody":
"""Fetch the body of the entity.
Returns:
The body field of the response.
Raises:
IllegalOperationError: If the user lacks permission to access the
entity represented by this response.
"""
if self.permission_denied:
raise IllegalOperationError(
f"Missing permissions to access {type(self).__name__} with "
f"ID {self.id}."
)
return super().get_body()
def get_metadata(self) -> "AnyMetadata":
"""Fetch the metadata of the entity.
Returns:
The metadata field of the response.
Raises:
IllegalOperationError: If the user lacks permission to access this
entity represented by this response.
"""
if self.permission_denied:
raise IllegalOperationError(
f"Missing permissions to access {type(self).__name__} with "
f"ID {self.id}."
)
return super().get_metadata()
# Analytics
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for base response models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["entity_id"] = self.id
return metadata
# Body and metadata properties
@property
def created(self) -> datetime:
"""The `created` property.
Returns:
the value of the property.
"""
return self.get_body().created
@property
def updated(self) -> datetime:
"""The `updated` property.
Returns:
the value of the property.
"""
return self.get_body().updated
created: datetime
property
readonly
The created
property.
Returns:
Type | Description |
---|---|
datetime |
the value of the property. |
updated: datetime
property
readonly
The updated
property.
Returns:
Type | Description |
---|---|
datetime |
the value of the property. |
__eq__(self, other)
special
Implementation of equality magic method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
Any |
The other object to compare to. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the other object is of the same type and has the same UUID. |
Source code in zenml/models/v2/base/base.py
def __eq__(self, other: Any) -> bool:
"""Implementation of equality magic method.
Args:
other: The other object to compare to.
Returns:
True if the other object is of the same type and has the same UUID.
"""
if isinstance(other, type(self)):
return self.id == other.id
else:
return False
__hash__(self)
special
Implementation of hash magic method.
Returns:
Type | Description |
---|---|
int |
Hash of the UUID. |
Source code in zenml/models/v2/base/base.py
def __hash__(self) -> int:
"""Implementation of hash magic method.
Returns:
Hash of the UUID.
"""
return hash((type(self),) + tuple([self.id]))
get_analytics_metadata(self)
Fetches the analytics metadata for base response models.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The analytics metadata. |
Source code in zenml/models/v2/base/base.py
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for base response models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["entity_id"] = self.id
return metadata
get_body(self)
Fetch the body of the entity.
Returns:
Type | Description |
---|---|
AnyDatedBody |
The body field of the response. |
Exceptions:
Type | Description |
---|---|
IllegalOperationError |
If the user lacks permission to access the entity represented by this response. |
Source code in zenml/models/v2/base/base.py
def get_body(self) -> "AnyDatedBody":
"""Fetch the body of the entity.
Returns:
The body field of the response.
Raises:
IllegalOperationError: If the user lacks permission to access the
entity represented by this response.
"""
if self.permission_denied:
raise IllegalOperationError(
f"Missing permissions to access {type(self).__name__} with "
f"ID {self.id}."
)
return super().get_body()
get_hydrated_version(self)
Abstract method to fetch the hydrated version of the model.
Exceptions:
Type | Description |
---|---|
NotImplementedError |
in case the method is not implemented. |
Source code in zenml/models/v2/base/base.py
def get_hydrated_version(
self,
) -> "BaseIdentifiedResponse[AnyDatedBody, AnyMetadata, AnyResources]":
"""Abstract method to fetch the hydrated version of the model.
Raises:
NotImplementedError: in case the method is not implemented.
"""
raise NotImplementedError(
"Please implement a `get_hydrated_version` method before "
"using/hydrating the model."
)
get_metadata(self)
Fetch the metadata of the entity.
Returns:
Type | Description |
---|---|
AnyMetadata |
The metadata field of the response. |
Exceptions:
Type | Description |
---|---|
IllegalOperationError |
If the user lacks permission to access this entity represented by this response. |
Source code in zenml/models/v2/base/base.py
def get_metadata(self) -> "AnyMetadata":
"""Fetch the metadata of the entity.
Returns:
The metadata field of the response.
Raises:
IllegalOperationError: If the user lacks permission to access this
entity represented by this response.
"""
if self.permission_denied:
raise IllegalOperationError(
f"Missing permissions to access {type(self).__name__} with "
f"ID {self.id}."
)
return super().get_metadata()
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
BaseIdentifiedResponse[APIKeyResponseBody, APIKeyResponseMetadata, APIKeyResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
BaseIdentifiedResponse[ArtifactResponseBody, ArtifactResponseMetadata, ArtifactResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
BaseIdentifiedResponse[ArtifactVisualizationResponseBody, ArtifactVisualizationResponseMetadata, ArtifactVisualizationResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
BaseIdentifiedResponse[CodeReferenceResponseBody, CodeReferenceResponseMetadata, CodeReferenceResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
BaseIdentifiedResponse[LogsResponseBody, LogsResponseMetadata, LogsResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
BaseIdentifiedResponse[ModelVersionArtifactResponseBody, BaseResponseMetadata, ModelVersionArtifactResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
BaseIdentifiedResponse[ModelVersionPipelineRunResponseBody, BaseResponseMetadata, ModelVersionPipelineRunResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
BaseIdentifiedResponse[ServiceAccountResponseBody, ServiceAccountResponseMetadata, ServiceAccountResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
BaseIdentifiedResponse[TagResourceResponseBody, BaseResponseMetadata, TagResourceResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
BaseIdentifiedResponse[TagResponseBody, BaseResponseMetadata, TagResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
BaseIdentifiedResponse[TriggerExecutionResponseBody, TriggerExecutionResponseMetadata, TriggerExecutionResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
BaseIdentifiedResponse[UserResponseBody, UserResponseMetadata, UserResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
BaseIdentifiedResponse[WorkspaceResponseBody, WorkspaceResponseMetadata, WorkspaceResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
BaseIdentifiedResponse[~UserBody, ~UserMetadata, ~UserResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
BaseRequest (BaseZenModel)
Base request model.
Used as a base class for all request models.
Source code in zenml/models/v2/base/base.py
class BaseRequest(BaseZenModel):
"""Base request model.
Used as a base class for all request models.
"""
BaseResponse (BaseZenModel, Generic)
Base domain model for all responses.
Source code in zenml/models/v2/base/base.py
class BaseResponse(BaseZenModel, Generic[AnyBody, AnyMetadata, AnyResources]):
"""Base domain model for all responses."""
# Body and metadata pair
body: Optional["AnyBody"] = Field(
default=None, title="The body of the resource."
)
metadata: Optional["AnyMetadata"] = Field(
default=None, title="The metadata related to this resource."
)
resources: Optional["AnyResources"] = Field(
default=None, title="The resources related to this resource."
)
_response_update_strategy: ResponseUpdateStrategy = (
ResponseUpdateStrategy.ALLOW
)
_warn_on_response_updates: bool = True
def _validate_hydrated_version(
self,
hydrated_model: "BaseResponse[AnyBody, AnyMetadata, AnyResources]",
) -> None:
"""Helper method to validate the values within the hydrated version.
Args:
hydrated_model: the hydrated version of the model.
Raises:
HydrationError: if the hydrated version has different values set
for either the name of the body fields and the
_method_body_mutation is set to ResponseBodyUpdate.DENY.
"""
# Check whether the metadata exists in the hydrated version
if hydrated_model.metadata is None:
raise HydrationError(
"The hydrated model does not have a metadata field."
)
# Check if the name has changed
if "name" in self.model_fields:
original_name = getattr(self, "name")
hydrated_name = getattr(hydrated_model, "name")
if original_name != hydrated_name:
if (
self._response_update_strategy
== ResponseUpdateStrategy.ALLOW
):
setattr(self, "name", hydrated_name)
if self._warn_on_response_updates:
logger.warning(
f"The name of the entity has changed from "
f"`{original_name}` to `{hydrated_name}`."
)
elif (
self._response_update_strategy
== ResponseUpdateStrategy.IGNORE
):
if self._warn_on_response_updates:
logger.warning(
f"Ignoring the name change in the hydrated version "
f"of the response: `{original_name}` to "
f"`{hydrated_name}`."
)
elif (
self._response_update_strategy
== ResponseUpdateStrategy.DENY
):
raise HydrationError(
f"Failing the hydration, because there is a change in "
f"the name of the entity: `{original_name}` to "
f"`{hydrated_name}`."
)
# Check all the fields in the body
for field in self.get_body().model_fields:
original_value = getattr(self.get_body(), field)
hydrated_value = getattr(hydrated_model.get_body(), field)
if original_value != hydrated_value:
if (
self._response_update_strategy
== ResponseUpdateStrategy.ALLOW
):
setattr(self.get_body(), field, hydrated_value)
if self._warn_on_response_updates:
logger.warning(
f"The field `{field}` in the body of the response "
f"has changed from `{original_value}` to "
f"`{hydrated_value}`."
)
elif (
self._response_update_strategy
== ResponseUpdateStrategy.IGNORE
):
if self._warn_on_response_updates:
logger.warning(
f"Ignoring the change in the hydrated version of "
f"the field `{field}`: `{original_value}` -> "
f"`{hydrated_value}`."
)
elif (
self._response_update_strategy
== ResponseUpdateStrategy.DENY
):
raise HydrationError(
f"Failing the hydration, because there is a change in "
f"the field `{field}`: `{original_value}` -> "
f"`{hydrated_value}`"
)
def get_hydrated_version(
self,
) -> "BaseResponse[AnyBody, AnyMetadata, AnyResources]":
"""Abstract method to fetch the hydrated version of the model.
Raises:
NotImplementedError: in case the method is not implemented.
"""
raise NotImplementedError(
"Please implement a `get_hydrated_version` method before "
"using/hydrating the model."
)
def get_body(self) -> "AnyBody":
"""Fetch the body of the entity.
Returns:
The body field of the response.
Raises:
RuntimeError: If the body was not included in the response.
"""
if not self.body:
raise RuntimeError(
f"Missing response body for {type(self).__name__}."
)
return self.body
def get_metadata(self) -> "AnyMetadata":
"""Fetch the metadata of the entity.
Returns:
The metadata field of the response.
"""
if self.metadata is None:
# If the metadata is not there, check the class first.
metadata_annotation = self.model_fields["metadata"].annotation
assert metadata_annotation is not None, (
"For each response model, an annotated metadata"
"field should exist."
)
# metadata is defined as:
# metadata: Optional[....ResponseMetadata] = Field(default=None)
# We need to find the actual class inside the Optional annotation.
from zenml.utils.typing_utils import get_args
metadata_type = get_args(metadata_annotation)[0]
assert issubclass(metadata_type, BaseResponseMetadata)
if len(metadata_type.model_fields):
# If the metadata class defines any fields, fetch the metadata
# through the hydrated version.
hydrated_version = self.get_hydrated_version()
self._validate_hydrated_version(hydrated_version)
self.metadata = hydrated_version.metadata
else:
# Otherwise, use the metadata class to create an empty metadata
# object.
self.metadata = metadata_type()
assert self.metadata is not None
return self.metadata
def get_resources(self) -> "AnyResources":
"""Fetch the resources related to this entity.
Returns:
The resources field of the response.
Raises:
RuntimeError: If the resources field was not included in the response.
"""
if self.resources is None:
# If the resources are not there, check the class first.
resources_annotation = self.model_fields["resources"].annotation
assert resources_annotation is not None, (
"For each response model, an annotated resources"
"field should exist."
)
# metadata is defined as:
# metadata: Optional[....ResponseMetadata] = Field(default=None)
# We need to find the actual class inside the Optional annotation.
from zenml.utils.typing_utils import get_args
resources_type = get_args(resources_annotation)[0]
assert issubclass(resources_type, BaseResponseResources)
if len(resources_type.model_fields):
# If the resources class defines any fields, fetch the resources
# through the hydrated version.
hydrated_version = self.get_hydrated_version()
self._validate_hydrated_version(hydrated_version)
self.resources = hydrated_version.resources
else:
# Otherwise, use the resources class to create an empty
# resources object.
self.resources = resources_type()
if self.resources is None:
raise RuntimeError(
f"Missing response resources for {type(self).__name__}."
)
return self.resources
get_body(self)
Fetch the body of the entity.
Returns:
Type | Description |
---|---|
AnyBody |
The body field of the response. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the body was not included in the response. |
Source code in zenml/models/v2/base/base.py
def get_body(self) -> "AnyBody":
"""Fetch the body of the entity.
Returns:
The body field of the response.
Raises:
RuntimeError: If the body was not included in the response.
"""
if not self.body:
raise RuntimeError(
f"Missing response body for {type(self).__name__}."
)
return self.body
get_hydrated_version(self)
Abstract method to fetch the hydrated version of the model.
Exceptions:
Type | Description |
---|---|
NotImplementedError |
in case the method is not implemented. |
Source code in zenml/models/v2/base/base.py
def get_hydrated_version(
self,
) -> "BaseResponse[AnyBody, AnyMetadata, AnyResources]":
"""Abstract method to fetch the hydrated version of the model.
Raises:
NotImplementedError: in case the method is not implemented.
"""
raise NotImplementedError(
"Please implement a `get_hydrated_version` method before "
"using/hydrating the model."
)
get_metadata(self)
Fetch the metadata of the entity.
Returns:
Type | Description |
---|---|
AnyMetadata |
The metadata field of the response. |
Source code in zenml/models/v2/base/base.py
def get_metadata(self) -> "AnyMetadata":
"""Fetch the metadata of the entity.
Returns:
The metadata field of the response.
"""
if self.metadata is None:
# If the metadata is not there, check the class first.
metadata_annotation = self.model_fields["metadata"].annotation
assert metadata_annotation is not None, (
"For each response model, an annotated metadata"
"field should exist."
)
# metadata is defined as:
# metadata: Optional[....ResponseMetadata] = Field(default=None)
# We need to find the actual class inside the Optional annotation.
from zenml.utils.typing_utils import get_args
metadata_type = get_args(metadata_annotation)[0]
assert issubclass(metadata_type, BaseResponseMetadata)
if len(metadata_type.model_fields):
# If the metadata class defines any fields, fetch the metadata
# through the hydrated version.
hydrated_version = self.get_hydrated_version()
self._validate_hydrated_version(hydrated_version)
self.metadata = hydrated_version.metadata
else:
# Otherwise, use the metadata class to create an empty metadata
# object.
self.metadata = metadata_type()
assert self.metadata is not None
return self.metadata
get_resources(self)
Fetch the resources related to this entity.
Returns:
Type | Description |
---|---|
AnyResources |
The resources field of the response. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the resources field was not included in the response. |
Source code in zenml/models/v2/base/base.py
def get_resources(self) -> "AnyResources":
"""Fetch the resources related to this entity.
Returns:
The resources field of the response.
Raises:
RuntimeError: If the resources field was not included in the response.
"""
if self.resources is None:
# If the resources are not there, check the class first.
resources_annotation = self.model_fields["resources"].annotation
assert resources_annotation is not None, (
"For each response model, an annotated resources"
"field should exist."
)
# metadata is defined as:
# metadata: Optional[....ResponseMetadata] = Field(default=None)
# We need to find the actual class inside the Optional annotation.
from zenml.utils.typing_utils import get_args
resources_type = get_args(resources_annotation)[0]
assert issubclass(resources_type, BaseResponseResources)
if len(resources_type.model_fields):
# If the resources class defines any fields, fetch the resources
# through the hydrated version.
hydrated_version = self.get_hydrated_version()
self._validate_hydrated_version(hydrated_version)
self.resources = hydrated_version.resources
else:
# Otherwise, use the resources class to create an empty
# resources object.
self.resources = resources_type()
if self.resources is None:
raise RuntimeError(
f"Missing response resources for {type(self).__name__}."
)
return self.resources
model_post_init(self, __context)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
self |
BaseModel |
The BaseModel instance. |
required |
__context |
Any |
The context. |
required |
Source code in zenml/models/v2/base/base.py
def init_private_attributes(self: BaseModel, __context: Any) -> None:
"""This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args:
self: The BaseModel instance.
__context: The context.
"""
if getattr(self, '__pydantic_private__', None) is None:
pydantic_private = {}
for name, private_attr in self.__private_attributes__.items():
default = private_attr.get_default()
if default is not PydanticUndefined:
pydantic_private[name] = default
object_setattr(self, '__pydantic_private__', pydantic_private)
BaseResponseBody (BaseZenModel)
Base body model.
Source code in zenml/models/v2/base/base.py
class BaseResponseBody(BaseZenModel):
"""Base body model."""
BaseResponseMetadata (BaseZenModel)
Base metadata model.
Used as a base class for all metadata models associated with responses.
Source code in zenml/models/v2/base/base.py
class BaseResponseMetadata(BaseZenModel):
"""Base metadata model.
Used as a base class for all metadata models associated with responses.
"""
BaseResponseResources (BaseZenModel)
Base resources model.
Used as a base class for all resource models associated with responses.
Source code in zenml/models/v2/base/base.py
class BaseResponseResources(BaseZenModel):
"""Base resources model.
Used as a base class for all resource models associated with responses.
"""
model_config = ConfigDict(extra="allow")
BaseResponse[ServerSettingsResponseBody, ServerSettingsResponseMetadata, ServerSettingsResponseResources] (BaseResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
BaseResponse[~AnyDatedBody, ~AnyMetadata, ~AnyResources] (BaseResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
BaseResponse[~AnyPluginBody, ~AnyPluginMetadata, ~AnyPluginResources] (BaseResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
BaseUpdate (BaseZenModel)
Base update model.
Used as a base class for all update models.
Source code in zenml/models/v2/base/base.py
class BaseUpdate(BaseZenModel):
"""Base update model.
Used as a base class for all update models.
"""
model_config = ConfigDict(
# Ignore extras on all update models.
extra="ignore",
)
BaseZenModel (YAMLSerializationMixin, AnalyticsTrackedModelMixin)
Base model class for all ZenML models.
This class is used as a base class for all ZenML models. It provides functionality for tracking analytics events.
Source code in zenml/models/v2/base/base.py
class BaseZenModel(YAMLSerializationMixin, AnalyticsTrackedModelMixin):
"""Base model class for all ZenML models.
This class is used as a base class for all ZenML models. It provides
functionality for tracking analytics events.
"""
model_config = ConfigDict(
# Ignore extras on all models to support forwards and backwards
# compatibility (e.g. new fields in newer versions of ZenML servers
# are allowed to be passed to older versions of ZenML clients and
# vice versa but will be ignored).
extra="ignore",
)
base_plugin_flavor
Plugin flavor model definitions.
BasePluginFlavorResponse (BaseResponse[~AnyPluginBody, ~AnyPluginMetadata, ~AnyPluginResources], Generic)
Base response for all Plugin Flavors.
Source code in zenml/models/v2/base/base_plugin_flavor.py
class BasePluginFlavorResponse(
BaseResponse[AnyPluginBody, AnyPluginMetadata, AnyPluginResources],
Generic[AnyPluginBody, AnyPluginMetadata, AnyPluginResources],
):
"""Base response for all Plugin Flavors."""
name: str = Field(title="Name of the flavor.")
type: PluginType = Field(title="Type of the plugin.")
subtype: PluginSubType = Field(title="Subtype of the plugin.")
model_config = ConfigDict(extra="ignore")
def get_hydrated_version(
self,
) -> "BasePluginFlavorResponse[AnyPluginBody, AnyPluginMetadata, AnyPluginResources]":
"""Abstract method to fetch the hydrated version of the model.
Returns:
Hydrated version of the PluginFlavorResponse
"""
# TODO: shouldn't this call the Zen store ? The client should not have
# to know about the plugin flavor registry
from zenml.zen_server.utils import plugin_flavor_registry
plugin_flavor = plugin_flavor_registry().get_flavor_class(
name=self.name, _type=self.type, subtype=self.subtype
)
return plugin_flavor.get_flavor_response_model(hydrate=True)
get_hydrated_version(self)
Abstract method to fetch the hydrated version of the model.
Returns:
Type | Description |
---|---|
BasePluginFlavorResponse[AnyPluginBody, AnyPluginMetadata, AnyPluginResources] |
Hydrated version of the PluginFlavorResponse |
Source code in zenml/models/v2/base/base_plugin_flavor.py
def get_hydrated_version(
self,
) -> "BasePluginFlavorResponse[AnyPluginBody, AnyPluginMetadata, AnyPluginResources]":
"""Abstract method to fetch the hydrated version of the model.
Returns:
Hydrated version of the PluginFlavorResponse
"""
# TODO: shouldn't this call the Zen store ? The client should not have
# to know about the plugin flavor registry
from zenml.zen_server.utils import plugin_flavor_registry
plugin_flavor = plugin_flavor_registry().get_flavor_class(
name=self.name, _type=self.type, subtype=self.subtype
)
return plugin_flavor.get_flavor_response_model(hydrate=True)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/base_plugin_flavor.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
BasePluginFlavorResponse[ActionFlavorResponseBody, ActionFlavorResponseMetadata, ActionFlavorResponseResources] (BasePluginFlavorResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/base_plugin_flavor.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
BasePluginFlavorResponse[EventSourceFlavorResponseBody, EventSourceFlavorResponseMetadata, EventSourceFlavorResponseResources] (BasePluginFlavorResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/base_plugin_flavor.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
BasePluginResponseBody (BaseResponseBody)
Response body for plugins.
Source code in zenml/models/v2/base/base_plugin_flavor.py
class BasePluginResponseBody(BaseResponseBody):
"""Response body for plugins."""
BasePluginResponseMetadata (BaseResponseMetadata)
Response metadata for plugins.
Source code in zenml/models/v2/base/base_plugin_flavor.py
class BasePluginResponseMetadata(BaseResponseMetadata):
"""Response metadata for plugins."""
BasePluginResponseResources (BaseResponseResources)
Response resources for plugins.
Source code in zenml/models/v2/base/base_plugin_flavor.py
class BasePluginResponseResources(BaseResponseResources):
"""Response resources for plugins."""
filter
Base filter model definitions.
BaseFilter (BaseModel)
Class to unify all filter, paginate and sort request parameters.
This Model allows fine-grained filtering, sorting and pagination of resources.
Usage example for subclasses of this class:
ResourceListModel(
name="contains:default",
workspace="default"
count_steps="gte:5"
sort_by="created",
page=2,
size=20
)
Source code in zenml/models/v2/base/filter.py
class BaseFilter(BaseModel):
"""Class to unify all filter, paginate and sort request parameters.
This Model allows fine-grained filtering, sorting and pagination of
resources.
Usage example for subclasses of this class:
```
ResourceListModel(
name="contains:default",
workspace="default"
count_steps="gte:5"
sort_by="created",
page=2,
size=20
)
```
"""
# List of fields that cannot be used as filters.
FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
"sort_by",
"page",
"size",
"logical_operator",
]
CUSTOM_SORTING_OPTIONS: ClassVar[List[str]] = []
# List of fields that are not even mentioned as options in the CLI.
CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = []
# List of fields that are wrapped with `fastapi.Query(default)` in API.
API_MULTI_INPUT_PARAMS: ClassVar[List[str]] = []
sort_by: str = Field(
default="created", description="Which column to sort by."
)
logical_operator: LogicalOperators = Field(
default=LogicalOperators.AND,
description="Which logical operator to use between all filters "
"['and', 'or']",
)
page: int = Field(
default=PAGINATION_STARTING_PAGE, ge=1, description="Page number"
)
size: int = Field(
default=PAGE_SIZE_DEFAULT,
ge=1,
le=PAGE_SIZE_MAXIMUM,
description="Page size",
)
id: Optional[Union[UUID, str]] = Field(
default=None,
description="Id for this resource",
union_mode="left_to_right",
)
created: Optional[Union[datetime, str]] = Field(
default=None, description="Created", union_mode="left_to_right"
)
updated: Optional[Union[datetime, str]] = Field(
default=None, description="Updated", union_mode="left_to_right"
)
_rbac_configuration: Optional[
Tuple[UUID, Dict[str, Optional[Set[UUID]]]]
] = None
@field_validator("sort_by", mode="before")
@classmethod
def validate_sort_by(cls, value: Any) -> Any:
"""Validate that the sort_column is a valid column with a valid operand.
Args:
value: The sort_by field value.
Returns:
The validated sort_by field value.
Raises:
ValidationError: If the sort_by field is not a string.
ValueError: If the resource can't be sorted by this field.
"""
# Somehow pydantic allows you to pass in int values, which will be
# interpreted as string, however within the validator they are still
# integers, which don't have a .split() method
if not isinstance(value, str):
raise ValidationError(
f"str type expected for the sort_by field. "
f"Received a {type(value)}"
)
column = value
split_value = value.split(":", 1)
if len(split_value) == 2:
column = split_value[1]
if split_value[0] not in SorterOps.values():
logger.warning(
"Invalid operand used for column sorting. "
"Only the following operands are supported `%s`. "
"Defaulting to 'asc' on column `%s`.",
SorterOps.values(),
column,
)
value = column
if column in cls.FILTER_EXCLUDE_FIELDS:
raise ValueError(
f"This resource can not be sorted by this field: '{value}'"
)
elif column in cls.model_fields:
return value
elif column in cls.CUSTOM_SORTING_OPTIONS:
return value
else:
raise ValueError(
"You can only sort by valid fields of this resource"
)
@model_validator(mode="before")
@classmethod
@before_validator_handler
def filter_ops(cls, data: Dict[str, Any]) -> Dict[str, Any]:
"""Parse incoming filters to ensure all filters are legal.
Args:
data: The values of the class.
Returns:
The values of the class.
"""
cls._generate_filter_list(data)
return data
@property
def list_of_filters(self) -> List[Filter]:
"""Converts the class variables into a list of usable Filter Models.
Returns:
A list of Filter models.
"""
return self._generate_filter_list(
{key: getattr(self, key) for key in self.model_fields}
)
@property
def sorting_params(self) -> Tuple[str, SorterOps]:
"""Converts the class variables into a list of usable Filter Models.
Returns:
A tuple of the column to sort by and the sorting operand.
"""
column = self.sort_by
# The default sorting operand is asc
operator = SorterOps.ASCENDING
# Check if user explicitly set an operand
split_value = self.sort_by.split(":", 1)
if len(split_value) == 2:
column = split_value[1]
operator = SorterOps(split_value[0])
return column, operator
def configure_rbac(
self,
authenticated_user_id: UUID,
**column_allowed_ids: Optional[Set[UUID]],
) -> None:
"""Configure RBAC allowed column values.
Args:
authenticated_user_id: ID of the authenticated user. All entities
owned by this user will be included.
column_allowed_ids: Set of IDs per column to limit the query to.
If given, the remaining filters will be applied to entities
within this set only. If `None`, the remaining filters will
be applied to all entries in the table.
"""
self._rbac_configuration = (authenticated_user_id, column_allowed_ids)
def generate_rbac_filter(
self,
table: Type["AnySchema"],
) -> Optional["ColumnElement[bool]"]:
"""Generates an optional RBAC filter.
Args:
table: The query table.
Returns:
The RBAC filter.
"""
from sqlmodel import or_
if not self._rbac_configuration:
return None
expressions = []
for column_name, allowed_ids in self._rbac_configuration[1].items():
if allowed_ids is not None:
expression = getattr(table, column_name).in_(allowed_ids)
expressions.append(expression)
if expressions and hasattr(table, "user_id"):
# If `expressions` is not empty, we do not have full access to all
# rows of the table. In this case, we also include rows which the
# user owns.
# Unowned entities are considered server-owned and can be seen
# by anyone
expressions.append(getattr(table, "user_id").is_(None))
# The authenticated user owns this entity
expressions.append(
getattr(table, "user_id") == self._rbac_configuration[0]
)
if expressions:
return or_(*expressions)
else:
return None
@classmethod
def _generate_filter_list(cls, values: Dict[str, Any]) -> List[Filter]:
"""Create a list of filters from a (column, value) dictionary.
Args:
values: A dictionary of column names and values to filter on.
Returns:
A list of filters.
"""
list_of_filters: List[Filter] = []
for key, value in values.items():
# Ignore excluded filters
if key in cls.FILTER_EXCLUDE_FIELDS:
continue
# Skip filtering for None values
if value is None:
continue
# Determine the operator and filter value
value, operator = cls._resolve_operator(value)
# Define the filter
filter = cls._define_filter(
column=key, value=value, operator=operator
)
list_of_filters.append(filter)
return list_of_filters
@staticmethod
def _resolve_operator(value: Any) -> Tuple[Any, GenericFilterOps]:
"""Determine the operator and filter value from a user-provided value.
If the user-provided value is a string of the form "operator:value",
then the operator is extracted and the value is returned. Otherwise,
`GenericFilterOps.EQUALS` is used as default operator and the value
is returned as-is.
Args:
value: The user-provided value.
Returns:
A tuple of the filter value and the operator.
"""
operator = GenericFilterOps.EQUALS # Default operator
if isinstance(value, str):
split_value = value.split(":", 1)
if (
len(split_value) == 2
and split_value[0] in GenericFilterOps.values()
):
value = split_value[1]
operator = GenericFilterOps(split_value[0])
return value, operator
@classmethod
def _define_filter(
cls, column: str, value: Any, operator: GenericFilterOps
) -> Filter:
"""Define a filter for a given column.
Args:
column: The column to filter on.
value: The value by which to filter.
operator: The operator to use for filtering.
Returns:
A Filter object.
"""
# Create datetime filters
if cls.is_datetime_field(column):
return cls._define_datetime_filter(
column=column,
value=value,
operator=operator,
)
# Create UUID filters
if cls.is_uuid_field(column):
return cls._define_uuid_filter(
column=column,
value=value,
operator=operator,
)
# Create int filters
if cls.is_int_field(column):
return NumericFilter(
operation=GenericFilterOps(operator),
column=column,
value=int(value),
)
# Create bool filters
if cls.is_bool_field(column):
return cls._define_bool_filter(
column=column,
value=value,
operator=operator,
)
# Create str filters
if cls.is_str_field(column):
return StrFilter(
operation=GenericFilterOps(operator),
column=column,
value=value,
)
# Handle unsupported datatypes
logger.warning(
f"The Datatype {cls.model_fields[column].annotation} might not be "
"supported for filtering. Defaulting to a string filter."
)
return StrFilter(
operation=GenericFilterOps(operator),
column=column,
value=str(value),
)
@classmethod
def check_field_annotation(cls, k: str, type_: Any) -> bool:
"""Checks whether a model field has a certain annotation.
Args:
k: The name of the field.
type_: The type to check.
Raises:
ValueError: if the model field within does not have an annotation.
Returns:
True if the annotation of the field matches the given type, False
otherwise.
"""
try:
annotation = cls.model_fields[k].annotation
if annotation is not None:
return (
issubclass(type_, get_args(annotation))
or annotation is type_
)
else:
raise ValueError(
f"The field '{k}' inside the model {cls.__name__} "
"does not have an annotation."
)
except TypeError:
return False
@classmethod
def is_datetime_field(cls, k: str) -> bool:
"""Checks if it's a datetime field.
Args:
k: The key to check.
Returns:
True if the field is a datetime field, False otherwise.
"""
return cls.check_field_annotation(k=k, type_=datetime)
@classmethod
def is_uuid_field(cls, k: str) -> bool:
"""Checks if it's a UUID field.
Args:
k: The key to check.
Returns:
True if the field is a UUID field, False otherwise.
"""
return cls.check_field_annotation(k=k, type_=UUID)
@classmethod
def is_int_field(cls, k: str) -> bool:
"""Checks if it's an int field.
Args:
k: The key to check.
Returns:
True if the field is an int field, False otherwise.
"""
return cls.check_field_annotation(k=k, type_=int)
@classmethod
def is_bool_field(cls, k: str) -> bool:
"""Checks if it's a bool field.
Args:
k: The key to check.
Returns:
True if the field is a bool field, False otherwise.
"""
return cls.check_field_annotation(k=k, type_=bool)
@classmethod
def is_str_field(cls, k: str) -> bool:
"""Checks if it's a string field.
Args:
k: The key to check.
Returns:
True if the field is a string field, False otherwise.
"""
return cls.check_field_annotation(k=k, type_=str)
@classmethod
def is_sort_by_field(cls, k: str) -> bool:
"""Checks if it's a sort by field.
Args:
k: The key to check.
Returns:
True if the field is a sort by field, False otherwise.
"""
return cls.check_field_annotation(k=k, type_=str) and k == "sort_by"
@staticmethod
def _define_datetime_filter(
column: str, value: Any, operator: GenericFilterOps
) -> NumericFilter:
"""Define a datetime filter for a given column.
Args:
column: The column to filter on.
value: The datetime value by which to filter.
operator: The operator to use for filtering.
Returns:
A Filter object.
Raises:
ValueError: If the value is not a valid datetime.
"""
try:
if isinstance(value, datetime):
datetime_value = value
else:
datetime_value = datetime.strptime(
value, FILTERING_DATETIME_FORMAT
)
except ValueError as e:
raise ValueError(
"The datetime filter only works with values in the following "
f"format: {FILTERING_DATETIME_FORMAT}"
) from e
datetime_filter = NumericFilter(
operation=GenericFilterOps(operator),
column=column,
value=datetime_value,
)
return datetime_filter
@staticmethod
def _define_uuid_filter(
column: str, value: Any, operator: GenericFilterOps
) -> UUIDFilter:
"""Define a UUID filter for a given column.
Args:
column: The column to filter on.
value: The UUID value by which to filter.
operator: The operator to use for filtering.
Returns:
A Filter object.
Raises:
ValueError: If the value is not a valid UUID.
"""
# For equality checks, ensure that the value is a valid UUID.
if operator == GenericFilterOps.EQUALS and not isinstance(value, UUID):
try:
UUID(value)
except ValueError as e:
raise ValueError(
"Invalid value passed as UUID query parameter."
) from e
# Cast the value to string for further comparisons.
value = str(value)
# Generate the filter.
uuid_filter = UUIDFilter(
operation=GenericFilterOps(operator),
column=column,
value=value,
)
return uuid_filter
@staticmethod
def _define_bool_filter(
column: str, value: Any, operator: GenericFilterOps
) -> BoolFilter:
"""Define a bool filter for a given column.
Args:
column: The column to filter on.
value: The bool value by which to filter.
operator: The operator to use for filtering.
Returns:
A Filter object.
"""
if GenericFilterOps(operator) != GenericFilterOps.EQUALS:
logger.warning(
"Boolean filters do not support any"
"operation except for equals. Defaulting"
"to an `equals` comparison."
)
return BoolFilter(
operation=GenericFilterOps.EQUALS,
column=column,
value=bool(value),
)
@property
def offset(self) -> int:
"""Returns the offset needed for the query on the data persistence layer.
Returns:
The offset for the query.
"""
return self.size * (self.page - 1)
def generate_filter(
self, table: Type[SQLModel]
) -> Union["ColumnElement[bool]"]:
"""Generate the filter for the query.
Args:
table: The Table that is being queried from.
Returns:
The filter expression for the query.
Raises:
RuntimeError: If a valid logical operator is not supplied.
"""
from sqlmodel import and_, or_
filters = []
for column_filter in self.list_of_filters:
filters.append(
column_filter.generate_query_conditions(table=table)
)
for custom_filter in self.get_custom_filters():
filters.append(custom_filter)
if self.logical_operator == LogicalOperators.OR:
return or_(False, *filters)
elif self.logical_operator == LogicalOperators.AND:
return and_(True, *filters)
else:
raise RuntimeError("No valid logical operator was supplied.")
def get_custom_filters(self) -> List["ColumnElement[bool]"]:
"""Get custom filters.
This can be overridden by subclasses to define custom filters that are
not based on the columns of the underlying table.
Returns:
A list of custom filters.
"""
return []
def apply_filter(
self,
query: AnyQuery,
table: Type["AnySchema"],
) -> AnyQuery:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
rbac_filter = self.generate_rbac_filter(table=table)
if rbac_filter is not None:
query = query.where(rbac_filter)
filters = self.generate_filter(table=table)
if filters is not None:
query = query.where(filters)
return query
def apply_sorting(
self,
query: AnyQuery,
table: Type["AnySchema"],
) -> AnyQuery:
"""Apply sorting to the query.
Args:
query: The query to which to apply the sorting.
table: The query table.
Returns:
The query with sorting applied.
"""
column, operand = self.sorting_params
if operand == SorterOps.DESCENDING:
sort_clause = desc(getattr(table, column)) # type: ignore[var-annotated]
else:
sort_clause = asc(getattr(table, column))
# We always add the `id` column as a tiebreaker to ensure a stable,
# repeatable order of items, otherwise subsequent pages might contain
# the same items.
query = query.order_by(sort_clause, asc(table.id)) # type: ignore[arg-type]
return query
list_of_filters: List[zenml.models.v2.base.filter.Filter]
property
readonly
Converts the class variables into a list of usable Filter Models.
Returns:
Type | Description |
---|---|
List[zenml.models.v2.base.filter.Filter] |
A list of Filter models. |
offset: int
property
readonly
Returns the offset needed for the query on the data persistence layer.
Returns:
Type | Description |
---|---|
int |
The offset for the query. |
sorting_params: Tuple[str, zenml.enums.SorterOps]
property
readonly
Converts the class variables into a list of usable Filter Models.
Returns:
Type | Description |
---|---|
Tuple[str, zenml.enums.SorterOps] |
A tuple of the column to sort by and the sorting operand. |
apply_filter(self, query, table)
Applies the filter to a query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
~AnyQuery |
The query to which to apply the filter. |
required |
table |
Type[AnySchema] |
The query table. |
required |
Returns:
Type | Description |
---|---|
~AnyQuery |
The query with filter applied. |
Source code in zenml/models/v2/base/filter.py
def apply_filter(
self,
query: AnyQuery,
table: Type["AnySchema"],
) -> AnyQuery:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
rbac_filter = self.generate_rbac_filter(table=table)
if rbac_filter is not None:
query = query.where(rbac_filter)
filters = self.generate_filter(table=table)
if filters is not None:
query = query.where(filters)
return query
apply_sorting(self, query, table)
Apply sorting to the query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
~AnyQuery |
The query to which to apply the sorting. |
required |
table |
Type[AnySchema] |
The query table. |
required |
Returns:
Type | Description |
---|---|
~AnyQuery |
The query with sorting applied. |
Source code in zenml/models/v2/base/filter.py
def apply_sorting(
self,
query: AnyQuery,
table: Type["AnySchema"],
) -> AnyQuery:
"""Apply sorting to the query.
Args:
query: The query to which to apply the sorting.
table: The query table.
Returns:
The query with sorting applied.
"""
column, operand = self.sorting_params
if operand == SorterOps.DESCENDING:
sort_clause = desc(getattr(table, column)) # type: ignore[var-annotated]
else:
sort_clause = asc(getattr(table, column))
# We always add the `id` column as a tiebreaker to ensure a stable,
# repeatable order of items, otherwise subsequent pages might contain
# the same items.
query = query.order_by(sort_clause, asc(table.id)) # type: ignore[arg-type]
return query
check_field_annotation(k, type_)
classmethod
Checks whether a model field has a certain annotation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The name of the field. |
required |
type_ |
Any |
The type to check. |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
if the model field within does not have an annotation. |
Returns:
Type | Description |
---|---|
bool |
True if the annotation of the field matches the given type, False otherwise. |
Source code in zenml/models/v2/base/filter.py
@classmethod
def check_field_annotation(cls, k: str, type_: Any) -> bool:
"""Checks whether a model field has a certain annotation.
Args:
k: The name of the field.
type_: The type to check.
Raises:
ValueError: if the model field within does not have an annotation.
Returns:
True if the annotation of the field matches the given type, False
otherwise.
"""
try:
annotation = cls.model_fields[k].annotation
if annotation is not None:
return (
issubclass(type_, get_args(annotation))
or annotation is type_
)
else:
raise ValueError(
f"The field '{k}' inside the model {cls.__name__} "
"does not have an annotation."
)
except TypeError:
return False
configure_rbac(self, authenticated_user_id, **column_allowed_ids)
Configure RBAC allowed column values.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
authenticated_user_id |
UUID |
ID of the authenticated user. All entities owned by this user will be included. |
required |
column_allowed_ids |
Optional[Set[uuid.UUID]] |
Set of IDs per column to limit the query to.
If given, the remaining filters will be applied to entities
within this set only. If |
{} |
Source code in zenml/models/v2/base/filter.py
def configure_rbac(
self,
authenticated_user_id: UUID,
**column_allowed_ids: Optional[Set[UUID]],
) -> None:
"""Configure RBAC allowed column values.
Args:
authenticated_user_id: ID of the authenticated user. All entities
owned by this user will be included.
column_allowed_ids: Set of IDs per column to limit the query to.
If given, the remaining filters will be applied to entities
within this set only. If `None`, the remaining filters will
be applied to all entries in the table.
"""
self._rbac_configuration = (authenticated_user_id, column_allowed_ids)
filter_ops(data, validation_info)
classmethod
Wrapper method to handle the raw data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cls |
the class handler |
required | |
data |
Any |
the raw input data |
required |
validation_info |
ValidationInfo |
the context of the validation. |
required |
Returns:
Type | Description |
---|---|
Any |
the validated data |
Source code in zenml/models/v2/base/filter.py
def before_validator(
cls: Type[BaseModel], data: Any, validation_info: ValidationInfo
) -> Any:
"""Wrapper method to handle the raw data.
Args:
cls: the class handler
data: the raw input data
validation_info: the context of the validation.
Returns:
the validated data
"""
data = model_validator_data_handler(
raw_data=data, base_class=cls, validation_info=validation_info
)
return method(cls=cls, data=data)
generate_filter(self, table)
Generate the filter for the query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
Type[sqlmodel.main.SQLModel] |
The Table that is being queried from. |
required |
Returns:
Type | Description |
---|---|
ColumnElement[bool] |
The filter expression for the query. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If a valid logical operator is not supplied. |
Source code in zenml/models/v2/base/filter.py
def generate_filter(
self, table: Type[SQLModel]
) -> Union["ColumnElement[bool]"]:
"""Generate the filter for the query.
Args:
table: The Table that is being queried from.
Returns:
The filter expression for the query.
Raises:
RuntimeError: If a valid logical operator is not supplied.
"""
from sqlmodel import and_, or_
filters = []
for column_filter in self.list_of_filters:
filters.append(
column_filter.generate_query_conditions(table=table)
)
for custom_filter in self.get_custom_filters():
filters.append(custom_filter)
if self.logical_operator == LogicalOperators.OR:
return or_(False, *filters)
elif self.logical_operator == LogicalOperators.AND:
return and_(True, *filters)
else:
raise RuntimeError("No valid logical operator was supplied.")
generate_rbac_filter(self, table)
Generates an optional RBAC filter.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
Type[AnySchema] |
The query table. |
required |
Returns:
Type | Description |
---|---|
Optional[ColumnElement[bool]] |
The RBAC filter. |
Source code in zenml/models/v2/base/filter.py
def generate_rbac_filter(
self,
table: Type["AnySchema"],
) -> Optional["ColumnElement[bool]"]:
"""Generates an optional RBAC filter.
Args:
table: The query table.
Returns:
The RBAC filter.
"""
from sqlmodel import or_
if not self._rbac_configuration:
return None
expressions = []
for column_name, allowed_ids in self._rbac_configuration[1].items():
if allowed_ids is not None:
expression = getattr(table, column_name).in_(allowed_ids)
expressions.append(expression)
if expressions and hasattr(table, "user_id"):
# If `expressions` is not empty, we do not have full access to all
# rows of the table. In this case, we also include rows which the
# user owns.
# Unowned entities are considered server-owned and can be seen
# by anyone
expressions.append(getattr(table, "user_id").is_(None))
# The authenticated user owns this entity
expressions.append(
getattr(table, "user_id") == self._rbac_configuration[0]
)
if expressions:
return or_(*expressions)
else:
return None
get_custom_filters(self)
Get custom filters.
This can be overridden by subclasses to define custom filters that are not based on the columns of the underlying table.
Returns:
Type | Description |
---|---|
List[ColumnElement[bool]] |
A list of custom filters. |
Source code in zenml/models/v2/base/filter.py
def get_custom_filters(self) -> List["ColumnElement[bool]"]:
"""Get custom filters.
This can be overridden by subclasses to define custom filters that are
not based on the columns of the underlying table.
Returns:
A list of custom filters.
"""
return []
is_bool_field(k)
classmethod
Checks if it's a bool field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a bool field, False otherwise. |
Source code in zenml/models/v2/base/filter.py
@classmethod
def is_bool_field(cls, k: str) -> bool:
"""Checks if it's a bool field.
Args:
k: The key to check.
Returns:
True if the field is a bool field, False otherwise.
"""
return cls.check_field_annotation(k=k, type_=bool)
is_datetime_field(k)
classmethod
Checks if it's a datetime field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a datetime field, False otherwise. |
Source code in zenml/models/v2/base/filter.py
@classmethod
def is_datetime_field(cls, k: str) -> bool:
"""Checks if it's a datetime field.
Args:
k: The key to check.
Returns:
True if the field is a datetime field, False otherwise.
"""
return cls.check_field_annotation(k=k, type_=datetime)
is_int_field(k)
classmethod
Checks if it's an int field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is an int field, False otherwise. |
Source code in zenml/models/v2/base/filter.py
@classmethod
def is_int_field(cls, k: str) -> bool:
"""Checks if it's an int field.
Args:
k: The key to check.
Returns:
True if the field is an int field, False otherwise.
"""
return cls.check_field_annotation(k=k, type_=int)
is_sort_by_field(k)
classmethod
Checks if it's a sort by field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a sort by field, False otherwise. |
Source code in zenml/models/v2/base/filter.py
@classmethod
def is_sort_by_field(cls, k: str) -> bool:
"""Checks if it's a sort by field.
Args:
k: The key to check.
Returns:
True if the field is a sort by field, False otherwise.
"""
return cls.check_field_annotation(k=k, type_=str) and k == "sort_by"
is_str_field(k)
classmethod
Checks if it's a string field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a string field, False otherwise. |
Source code in zenml/models/v2/base/filter.py
@classmethod
def is_str_field(cls, k: str) -> bool:
"""Checks if it's a string field.
Args:
k: The key to check.
Returns:
True if the field is a string field, False otherwise.
"""
return cls.check_field_annotation(k=k, type_=str)
is_uuid_field(k)
classmethod
Checks if it's a UUID field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a UUID field, False otherwise. |
Source code in zenml/models/v2/base/filter.py
@classmethod
def is_uuid_field(cls, k: str) -> bool:
"""Checks if it's a UUID field.
Args:
k: The key to check.
Returns:
True if the field is a UUID field, False otherwise.
"""
return cls.check_field_annotation(k=k, type_=UUID)
model_post_init(self, __context)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
self |
BaseModel |
The BaseModel instance. |
required |
__context |
Any |
The context. |
required |
Source code in zenml/models/v2/base/filter.py
def init_private_attributes(self: BaseModel, __context: Any) -> None:
"""This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args:
self: The BaseModel instance.
__context: The context.
"""
if getattr(self, '__pydantic_private__', None) is None:
pydantic_private = {}
for name, private_attr in self.__private_attributes__.items():
default = private_attr.get_default()
if default is not PydanticUndefined:
pydantic_private[name] = default
object_setattr(self, '__pydantic_private__', pydantic_private)
validate_sort_by(value)
classmethod
Validate that the sort_column is a valid column with a valid operand.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The sort_by field value. |
required |
Returns:
Type | Description |
---|---|
Any |
The validated sort_by field value. |
Exceptions:
Type | Description |
---|---|
ValidationError |
If the sort_by field is not a string. |
ValueError |
If the resource can't be sorted by this field. |
Source code in zenml/models/v2/base/filter.py
@field_validator("sort_by", mode="before")
@classmethod
def validate_sort_by(cls, value: Any) -> Any:
"""Validate that the sort_column is a valid column with a valid operand.
Args:
value: The sort_by field value.
Returns:
The validated sort_by field value.
Raises:
ValidationError: If the sort_by field is not a string.
ValueError: If the resource can't be sorted by this field.
"""
# Somehow pydantic allows you to pass in int values, which will be
# interpreted as string, however within the validator they are still
# integers, which don't have a .split() method
if not isinstance(value, str):
raise ValidationError(
f"str type expected for the sort_by field. "
f"Received a {type(value)}"
)
column = value
split_value = value.split(":", 1)
if len(split_value) == 2:
column = split_value[1]
if split_value[0] not in SorterOps.values():
logger.warning(
"Invalid operand used for column sorting. "
"Only the following operands are supported `%s`. "
"Defaulting to 'asc' on column `%s`.",
SorterOps.values(),
column,
)
value = column
if column in cls.FILTER_EXCLUDE_FIELDS:
raise ValueError(
f"This resource can not be sorted by this field: '{value}'"
)
elif column in cls.model_fields:
return value
elif column in cls.CUSTOM_SORTING_OPTIONS:
return value
else:
raise ValueError(
"You can only sort by valid fields of this resource"
)
BoolFilter (Filter)
Filter for all Boolean fields.
Source code in zenml/models/v2/base/filter.py
class BoolFilter(Filter):
"""Filter for all Boolean fields."""
ALLOWED_OPS: ClassVar[List[str]] = [GenericFilterOps.EQUALS]
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions for a boolean column.
Args:
column: The boolean column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
return column == self.value
generate_query_conditions_from_column(self, column)
Generate query conditions for a boolean column.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
column |
Any |
The boolean column of an SQLModel table on which to filter. |
required |
Returns:
Type | Description |
---|---|
Any |
A list of query conditions. |
Source code in zenml/models/v2/base/filter.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions for a boolean column.
Args:
column: The boolean column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
return column == self.value
Filter (BaseModel, ABC)
Filter for all fields.
A Filter is a combination of a column, a value that the user uses to
filter on this column and an operation to use. The easiest example
would be user equals aria
with column=user
, value=aria
and the
operation=equals
.
All subclasses of this class will support different sets of operations. This operation set is defined in the ALLOWED_OPS class variable.
Source code in zenml/models/v2/base/filter.py
class Filter(BaseModel, ABC):
"""Filter for all fields.
A Filter is a combination of a column, a value that the user uses to
filter on this column and an operation to use. The easiest example
would be `user equals aria` with column=`user`, value=`aria` and the
operation=`equals`.
All subclasses of this class will support different sets of operations.
This operation set is defined in the ALLOWED_OPS class variable.
"""
ALLOWED_OPS: ClassVar[List[str]] = []
operation: GenericFilterOps
column: str
value: Optional[Any] = None
@field_validator("operation", mode="before")
@classmethod
def validate_operation(cls, value: Any) -> Any:
"""Validate that the operation is a valid op for the field type.
Args:
value: The operation of this filter.
Returns:
The operation if it is valid.
Raises:
ValueError: If the operation is not valid for this field type.
"""
if value not in cls.ALLOWED_OPS:
raise ValueError(
f"This datatype can not be filtered using this operation: "
f"'{value}'. The allowed operations are: {cls.ALLOWED_OPS}"
)
else:
return value
def generate_query_conditions(
self,
table: Type[SQLModel],
) -> Union["ColumnElement[bool]"]:
"""Generate the query conditions for the database.
This method converts the Filter class into an appropriate SQLModel
query condition, to be used when filtering on the Database.
Args:
table: The SQLModel table to use for the query creation
Returns:
A list of conditions that will be combined using the `and` operation
"""
column = getattr(table, self.column)
conditions = self.generate_query_conditions_from_column(column)
return conditions # type:ignore[no-any-return]
@abstractmethod
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions given the corresponding database column.
This method should be overridden by subclasses to define how each
supported operation in `self.ALLOWED_OPS` can be used to filter the
given column by `self.value`.
Args:
column: The column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
generate_query_conditions(self, table)
Generate the query conditions for the database.
This method converts the Filter class into an appropriate SQLModel query condition, to be used when filtering on the Database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
Type[sqlmodel.main.SQLModel] |
The SQLModel table to use for the query creation |
required |
Returns:
Type | Description |
---|---|
ColumnElement[bool] |
A list of conditions that will be combined using the |
Source code in zenml/models/v2/base/filter.py
def generate_query_conditions(
self,
table: Type[SQLModel],
) -> Union["ColumnElement[bool]"]:
"""Generate the query conditions for the database.
This method converts the Filter class into an appropriate SQLModel
query condition, to be used when filtering on the Database.
Args:
table: The SQLModel table to use for the query creation
Returns:
A list of conditions that will be combined using the `and` operation
"""
column = getattr(table, self.column)
conditions = self.generate_query_conditions_from_column(column)
return conditions # type:ignore[no-any-return]
generate_query_conditions_from_column(self, column)
Generate query conditions given the corresponding database column.
This method should be overridden by subclasses to define how each
supported operation in self.ALLOWED_OPS
can be used to filter the
given column by self.value
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
column |
Any |
The column of an SQLModel table on which to filter. |
required |
Returns:
Type | Description |
---|---|
Any |
A list of query conditions. |
Source code in zenml/models/v2/base/filter.py
@abstractmethod
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions given the corresponding database column.
This method should be overridden by subclasses to define how each
supported operation in `self.ALLOWED_OPS` can be used to filter the
given column by `self.value`.
Args:
column: The column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
validate_operation(value)
classmethod
Validate that the operation is a valid op for the field type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The operation of this filter. |
required |
Returns:
Type | Description |
---|---|
Any |
The operation if it is valid. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the operation is not valid for this field type. |
Source code in zenml/models/v2/base/filter.py
@field_validator("operation", mode="before")
@classmethod
def validate_operation(cls, value: Any) -> Any:
"""Validate that the operation is a valid op for the field type.
Args:
value: The operation of this filter.
Returns:
The operation if it is valid.
Raises:
ValueError: If the operation is not valid for this field type.
"""
if value not in cls.ALLOWED_OPS:
raise ValueError(
f"This datatype can not be filtered using this operation: "
f"'{value}'. The allowed operations are: {cls.ALLOWED_OPS}"
)
else:
return value
NumericFilter (Filter)
Filter for all numeric fields.
Source code in zenml/models/v2/base/filter.py
class NumericFilter(Filter):
"""Filter for all numeric fields."""
value: Union[float, datetime] = Field(union_mode="left_to_right")
ALLOWED_OPS: ClassVar[List[str]] = [
GenericFilterOps.EQUALS,
GenericFilterOps.GT,
GenericFilterOps.GTE,
GenericFilterOps.LT,
GenericFilterOps.LTE,
]
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions for a UUID column.
Args:
column: The UUID column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
if self.operation == GenericFilterOps.GTE:
return column >= self.value
if self.operation == GenericFilterOps.GT:
return column > self.value
if self.operation == GenericFilterOps.LTE:
return column <= self.value
if self.operation == GenericFilterOps.LT:
return column < self.value
return column == self.value
generate_query_conditions_from_column(self, column)
Generate query conditions for a UUID column.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
column |
Any |
The UUID column of an SQLModel table on which to filter. |
required |
Returns:
Type | Description |
---|---|
Any |
A list of query conditions. |
Source code in zenml/models/v2/base/filter.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions for a UUID column.
Args:
column: The UUID column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
if self.operation == GenericFilterOps.GTE:
return column >= self.value
if self.operation == GenericFilterOps.GT:
return column > self.value
if self.operation == GenericFilterOps.LTE:
return column <= self.value
if self.operation == GenericFilterOps.LT:
return column < self.value
return column == self.value
StrFilter (Filter)
Filter for all string fields.
Source code in zenml/models/v2/base/filter.py
class StrFilter(Filter):
"""Filter for all string fields."""
ALLOWED_OPS: ClassVar[List[str]] = [
GenericFilterOps.EQUALS,
GenericFilterOps.STARTSWITH,
GenericFilterOps.CONTAINS,
GenericFilterOps.ENDSWITH,
]
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions for a string column.
Args:
column: The string column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
if self.operation == GenericFilterOps.CONTAINS:
return column.like(f"%{self.value}%")
if self.operation == GenericFilterOps.STARTSWITH:
return column.startswith(f"{self.value}")
if self.operation == GenericFilterOps.ENDSWITH:
return column.endswith(f"{self.value}")
return column == self.value
generate_query_conditions_from_column(self, column)
Generate query conditions for a string column.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
column |
Any |
The string column of an SQLModel table on which to filter. |
required |
Returns:
Type | Description |
---|---|
Any |
A list of query conditions. |
Source code in zenml/models/v2/base/filter.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions for a string column.
Args:
column: The string column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
if self.operation == GenericFilterOps.CONTAINS:
return column.like(f"%{self.value}%")
if self.operation == GenericFilterOps.STARTSWITH:
return column.startswith(f"{self.value}")
if self.operation == GenericFilterOps.ENDSWITH:
return column.endswith(f"{self.value}")
return column == self.value
UUIDFilter (StrFilter)
Filter for all uuid fields which are mostly treated like strings.
Source code in zenml/models/v2/base/filter.py
class UUIDFilter(StrFilter):
"""Filter for all uuid fields which are mostly treated like strings."""
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions for a UUID column.
Args:
column: The UUID column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
import sqlalchemy
from sqlalchemy_utils.functions import cast_if
# For equality checks, compare the UUID directly
if self.operation == GenericFilterOps.EQUALS:
return column == self.value
# For all other operations, cast and handle the column as string
return super().generate_query_conditions_from_column(
column=cast_if(column, sqlalchemy.String)
)
generate_query_conditions_from_column(self, column)
Generate query conditions for a UUID column.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
column |
Any |
The UUID column of an SQLModel table on which to filter. |
required |
Returns:
Type | Description |
---|---|
Any |
A list of query conditions. |
Source code in zenml/models/v2/base/filter.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
"""Generate query conditions for a UUID column.
Args:
column: The UUID column of an SQLModel table on which to filter.
Returns:
A list of query conditions.
"""
import sqlalchemy
from sqlalchemy_utils.functions import cast_if
# For equality checks, compare the UUID directly
if self.operation == GenericFilterOps.EQUALS:
return column == self.value
# For all other operations, cast and handle the column as string
return super().generate_query_conditions_from_column(
column=cast_if(column, sqlalchemy.String)
)
page
Page model definitions.
Page (BaseModel, Generic)
Return Model for List Models to accommodate pagination.
Source code in zenml/models/v2/base/page.py
class Page(BaseModel, Generic[B]):
"""Return Model for List Models to accommodate pagination."""
index: PositiveInt
max_size: PositiveInt
total_pages: NonNegativeInt
total: NonNegativeInt
items: List[B]
__params_type__ = BaseFilter
@property
def size(self) -> int:
"""Return the item count of the page.
Returns:
The amount of items in the page.
"""
return len(self.items)
def __len__(self) -> int:
"""Return the item count of the page.
This enables `len(page)`.
Returns:
The amount of items in the page.
"""
return len(self.items)
def __getitem__(self, index: int) -> B:
"""Return the item at the given index.
This enables `page[index]`.
Args:
index: The index to get the item from.
Returns:
The item at the given index.
"""
return self.items[index]
def __iter__(self) -> Generator[B, None, None]: # type: ignore[override]
"""Return an iterator over the items in the page.
This enables `for item in page` loops, but breaks `dict(page)`.
Yields:
An iterator over the items in the page.
"""
for item in self.items.__iter__():
yield item
def __contains__(self, item: B) -> bool:
"""Returns whether the page contains a specific item.
This enables `item in page` checks.
Args:
item: The item to check for.
Returns:
Whether the item is in the page.
"""
return item in self.items
size: int
property
readonly
Return the item count of the page.
Returns:
Type | Description |
---|---|
int |
The amount of items in the page. |
__params_type__ (BaseModel)
Class to unify all filter, paginate and sort request parameters.
This Model allows fine-grained filtering, sorting and pagination of resources.
Usage example for subclasses of this class:
ResourceListModel(
name="contains:default",
workspace="default"
count_steps="gte:5"
sort_by="created",
page=2,
size=20
)
Source code in zenml/models/v2/base/page.py
class BaseFilter(BaseModel):
"""Class to unify all filter, paginate and sort request parameters.
This Model allows fine-grained filtering, sorting and pagination of
resources.
Usage example for subclasses of this class:
```
ResourceListModel(
name="contains:default",
workspace="default"
count_steps="gte:5"
sort_by="created",
page=2,
size=20
)
```
"""
# List of fields that cannot be used as filters.
FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
"sort_by",
"page",
"size",
"logical_operator",
]
CUSTOM_SORTING_OPTIONS: ClassVar[List[str]] = []
# List of fields that are not even mentioned as options in the CLI.
CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = []
# List of fields that are wrapped with `fastapi.Query(default)` in API.
API_MULTI_INPUT_PARAMS: ClassVar[List[str]] = []
sort_by: str = Field(
default="created", description="Which column to sort by."
)
logical_operator: LogicalOperators = Field(
default=LogicalOperators.AND,
description="Which logical operator to use between all filters "
"['and', 'or']",
)
page: int = Field(
default=PAGINATION_STARTING_PAGE, ge=1, description="Page number"
)
size: int = Field(
default=PAGE_SIZE_DEFAULT,
ge=1,
le=PAGE_SIZE_MAXIMUM,
description="Page size",
)
id: Optional[Union[UUID, str]] = Field(
default=None,
description="Id for this resource",
union_mode="left_to_right",
)
created: Optional[Union[datetime, str]] = Field(
default=None, description="Created", union_mode="left_to_right"
)
updated: Optional[Union[datetime, str]] = Field(
default=None, description="Updated", union_mode="left_to_right"
)
_rbac_configuration: Optional[
Tuple[UUID, Dict[str, Optional[Set[UUID]]]]
] = None
@field_validator("sort_by", mode="before")
@classmethod
def validate_sort_by(cls, value: Any) -> Any:
"""Validate that the sort_column is a valid column with a valid operand.
Args:
value: The sort_by field value.
Returns:
The validated sort_by field value.
Raises:
ValidationError: If the sort_by field is not a string.
ValueError: If the resource can't be sorted by this field.
"""
# Somehow pydantic allows you to pass in int values, which will be
# interpreted as string, however within the validator they are still
# integers, which don't have a .split() method
if not isinstance(value, str):
raise ValidationError(
f"str type expected for the sort_by field. "
f"Received a {type(value)}"
)
column = value
split_value = value.split(":", 1)
if len(split_value) == 2:
column = split_value[1]
if split_value[0] not in SorterOps.values():
logger.warning(
"Invalid operand used for column sorting. "
"Only the following operands are supported `%s`. "
"Defaulting to 'asc' on column `%s`.",
SorterOps.values(),
column,
)
value = column
if column in cls.FILTER_EXCLUDE_FIELDS:
raise ValueError(
f"This resource can not be sorted by this field: '{value}'"
)
elif column in cls.model_fields:
return value
elif column in cls.CUSTOM_SORTING_OPTIONS:
return value
else:
raise ValueError(
"You can only sort by valid fields of this resource"
)
@model_validator(mode="before")
@classmethod
@before_validator_handler
def filter_ops(cls, data: Dict[str, Any]) -> Dict[str, Any]:
"""Parse incoming filters to ensure all filters are legal.
Args:
data: The values of the class.
Returns:
The values of the class.
"""
cls._generate_filter_list(data)
return data
@property
def list_of_filters(self) -> List[Filter]:
"""Converts the class variables into a list of usable Filter Models.
Returns:
A list of Filter models.
"""
return self._generate_filter_list(
{key: getattr(self, key) for key in self.model_fields}
)
@property
def sorting_params(self) -> Tuple[str, SorterOps]:
"""Converts the class variables into a list of usable Filter Models.
Returns:
A tuple of the column to sort by and the sorting operand.
"""
column = self.sort_by
# The default sorting operand is asc
operator = SorterOps.ASCENDING
# Check if user explicitly set an operand
split_value = self.sort_by.split(":", 1)
if len(split_value) == 2:
column = split_value[1]
operator = SorterOps(split_value[0])
return column, operator
def configure_rbac(
self,
authenticated_user_id: UUID,
**column_allowed_ids: Optional[Set[UUID]],
) -> None:
"""Configure RBAC allowed column values.
Args:
authenticated_user_id: ID of the authenticated user. All entities
owned by this user will be included.
column_allowed_ids: Set of IDs per column to limit the query to.
If given, the remaining filters will be applied to entities
within this set only. If `None`, the remaining filters will
be applied to all entries in the table.
"""
self._rbac_configuration = (authenticated_user_id, column_allowed_ids)
def generate_rbac_filter(
self,
table: Type["AnySchema"],
) -> Optional["ColumnElement[bool]"]:
"""Generates an optional RBAC filter.
Args:
table: The query table.
Returns:
The RBAC filter.
"""
from sqlmodel import or_
if not self._rbac_configuration:
return None
expressions = []
for column_name, allowed_ids in self._rbac_configuration[1].items():
if allowed_ids is not None:
expression = getattr(table, column_name).in_(allowed_ids)
expressions.append(expression)
if expressions and hasattr(table, "user_id"):
# If `expressions` is not empty, we do not have full access to all
# rows of the table. In this case, we also include rows which the
# user owns.
# Unowned entities are considered server-owned and can be seen
# by anyone
expressions.append(getattr(table, "user_id").is_(None))
# The authenticated user owns this entity
expressions.append(
getattr(table, "user_id") == self._rbac_configuration[0]
)
if expressions:
return or_(*expressions)
else:
return None
@classmethod
def _generate_filter_list(cls, values: Dict[str, Any]) -> List[Filter]:
"""Create a list of filters from a (column, value) dictionary.
Args:
values: A dictionary of column names and values to filter on.
Returns:
A list of filters.
"""
list_of_filters: List[Filter] = []
for key, value in values.items():
# Ignore excluded filters
if key in cls.FILTER_EXCLUDE_FIELDS:
continue
# Skip filtering for None values
if value is None:
continue
# Determine the operator and filter value
value, operator = cls._resolve_operator(value)
# Define the filter
filter = cls._define_filter(
column=key, value=value, operator=operator
)
list_of_filters.append(filter)
return list_of_filters
@staticmethod
def _resolve_operator(value: Any) -> Tuple[Any, GenericFilterOps]:
"""Determine the operator and filter value from a user-provided value.
If the user-provided value is a string of the form "operator:value",
then the operator is extracted and the value is returned. Otherwise,
`GenericFilterOps.EQUALS` is used as default operator and the value
is returned as-is.
Args:
value: The user-provided value.
Returns:
A tuple of the filter value and the operator.
"""
operator = GenericFilterOps.EQUALS # Default operator
if isinstance(value, str):
split_value = value.split(":", 1)
if (
len(split_value) == 2
and split_value[0] in GenericFilterOps.values()
):
value = split_value[1]
operator = GenericFilterOps(split_value[0])
return value, operator
@classmethod
def _define_filter(
cls, column: str, value: Any, operator: GenericFilterOps
) -> Filter:
"""Define a filter for a given column.
Args:
column: The column to filter on.
value: The value by which to filter.
operator: The operator to use for filtering.
Returns:
A Filter object.
"""
# Create datetime filters
if cls.is_datetime_field(column):
return cls._define_datetime_filter(
column=column,
value=value,
operator=operator,
)
# Create UUID filters
if cls.is_uuid_field(column):
return cls._define_uuid_filter(
column=column,
value=value,
operator=operator,
)
# Create int filters
if cls.is_int_field(column):
return NumericFilter(
operation=GenericFilterOps(operator),
column=column,
value=int(value),
)
# Create bool filters
if cls.is_bool_field(column):
return cls._define_bool_filter(
column=column,
value=value,
operator=operator,
)
# Create str filters
if cls.is_str_field(column):
return StrFilter(
operation=GenericFilterOps(operator),
column=column,
value=value,
)
# Handle unsupported datatypes
logger.warning(
f"The Datatype {cls.model_fields[column].annotation} might not be "
"supported for filtering. Defaulting to a string filter."
)
return StrFilter(
operation=GenericFilterOps(operator),
column=column,
value=str(value),
)
@classmethod
def check_field_annotation(cls, k: str, type_: Any) -> bool:
"""Checks whether a model field has a certain annotation.
Args:
k: The name of the field.
type_: The type to check.
Raises:
ValueError: if the model field within does not have an annotation.
Returns:
True if the annotation of the field matches the given type, False
otherwise.
"""
try:
annotation = cls.model_fields[k].annotation
if annotation is not None:
return (
issubclass(type_, get_args(annotation))
or annotation is type_
)
else:
raise ValueError(
f"The field '{k}' inside the model {cls.__name__} "
"does not have an annotation."
)
except TypeError:
return False
@classmethod
def is_datetime_field(cls, k: str) -> bool:
"""Checks if it's a datetime field.
Args:
k: The key to check.
Returns:
True if the field is a datetime field, False otherwise.
"""
return cls.check_field_annotation(k=k, type_=datetime)
@classmethod
def is_uuid_field(cls, k: str) -> bool:
"""Checks if it's a UUID field.
Args:
k: The key to check.
Returns:
True if the field is a UUID field, False otherwise.
"""
return cls.check_field_annotation(k=k, type_=UUID)
@classmethod
def is_int_field(cls, k: str) -> bool:
"""Checks if it's an int field.
Args:
k: The key to check.
Returns:
True if the field is an int field, False otherwise.
"""
return cls.check_field_annotation(k=k, type_=int)
@classmethod
def is_bool_field(cls, k: str) -> bool:
"""Checks if it's a bool field.
Args:
k: The key to check.
Returns:
True if the field is a bool field, False otherwise.
"""
return cls.check_field_annotation(k=k, type_=bool)
@classmethod
def is_str_field(cls, k: str) -> bool:
"""Checks if it's a string field.
Args:
k: The key to check.
Returns:
True if the field is a string field, False otherwise.
"""
return cls.check_field_annotation(k=k, type_=str)
@classmethod
def is_sort_by_field(cls, k: str) -> bool:
"""Checks if it's a sort by field.
Args:
k: The key to check.
Returns:
True if the field is a sort by field, False otherwise.
"""
return cls.check_field_annotation(k=k, type_=str) and k == "sort_by"
@staticmethod
def _define_datetime_filter(
column: str, value: Any, operator: GenericFilterOps
) -> NumericFilter:
"""Define a datetime filter for a given column.
Args:
column: The column to filter on.
value: The datetime value by which to filter.
operator: The operator to use for filtering.
Returns:
A Filter object.
Raises:
ValueError: If the value is not a valid datetime.
"""
try:
if isinstance(value, datetime):
datetime_value = value
else:
datetime_value = datetime.strptime(
value, FILTERING_DATETIME_FORMAT
)
except ValueError as e:
raise ValueError(
"The datetime filter only works with values in the following "
f"format: {FILTERING_DATETIME_FORMAT}"
) from e
datetime_filter = NumericFilter(
operation=GenericFilterOps(operator),
column=column,
value=datetime_value,
)
return datetime_filter
@staticmethod
def _define_uuid_filter(
column: str, value: Any, operator: GenericFilterOps
) -> UUIDFilter:
"""Define a UUID filter for a given column.
Args:
column: The column to filter on.
value: The UUID value by which to filter.
operator: The operator to use for filtering.
Returns:
A Filter object.
Raises:
ValueError: If the value is not a valid UUID.
"""
# For equality checks, ensure that the value is a valid UUID.
if operator == GenericFilterOps.EQUALS and not isinstance(value, UUID):
try:
UUID(value)
except ValueError as e:
raise ValueError(
"Invalid value passed as UUID query parameter."
) from e
# Cast the value to string for further comparisons.
value = str(value)
# Generate the filter.
uuid_filter = UUIDFilter(
operation=GenericFilterOps(operator),
column=column,
value=value,
)
return uuid_filter
@staticmethod
def _define_bool_filter(
column: str, value: Any, operator: GenericFilterOps
) -> BoolFilter:
"""Define a bool filter for a given column.
Args:
column: The column to filter on.
value: The bool value by which to filter.
operator: The operator to use for filtering.
Returns:
A Filter object.
"""
if GenericFilterOps(operator) != GenericFilterOps.EQUALS:
logger.warning(
"Boolean filters do not support any"
"operation except for equals. Defaulting"
"to an `equals` comparison."
)
return BoolFilter(
operation=GenericFilterOps.EQUALS,
column=column,
value=bool(value),
)
@property
def offset(self) -> int:
"""Returns the offset needed for the query on the data persistence layer.
Returns:
The offset for the query.
"""
return self.size * (self.page - 1)
def generate_filter(
self, table: Type[SQLModel]
) -> Union["ColumnElement[bool]"]:
"""Generate the filter for the query.
Args:
table: The Table that is being queried from.
Returns:
The filter expression for the query.
Raises:
RuntimeError: If a valid logical operator is not supplied.
"""
from sqlmodel import and_, or_
filters = []
for column_filter in self.list_of_filters:
filters.append(
column_filter.generate_query_conditions(table=table)
)
for custom_filter in self.get_custom_filters():
filters.append(custom_filter)
if self.logical_operator == LogicalOperators.OR:
return or_(False, *filters)
elif self.logical_operator == LogicalOperators.AND:
return and_(True, *filters)
else:
raise RuntimeError("No valid logical operator was supplied.")
def get_custom_filters(self) -> List["ColumnElement[bool]"]:
"""Get custom filters.
This can be overridden by subclasses to define custom filters that are
not based on the columns of the underlying table.
Returns:
A list of custom filters.
"""
return []
def apply_filter(
self,
query: AnyQuery,
table: Type["AnySchema"],
) -> AnyQuery:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
rbac_filter = self.generate_rbac_filter(table=table)
if rbac_filter is not None:
query = query.where(rbac_filter)
filters = self.generate_filter(table=table)
if filters is not None:
query = query.where(filters)
return query
def apply_sorting(
self,
query: AnyQuery,
table: Type["AnySchema"],
) -> AnyQuery:
"""Apply sorting to the query.
Args:
query: The query to which to apply the sorting.
table: The query table.
Returns:
The query with sorting applied.
"""
column, operand = self.sorting_params
if operand == SorterOps.DESCENDING:
sort_clause = desc(getattr(table, column)) # type: ignore[var-annotated]
else:
sort_clause = asc(getattr(table, column))
# We always add the `id` column as a tiebreaker to ensure a stable,
# repeatable order of items, otherwise subsequent pages might contain
# the same items.
query = query.order_by(sort_clause, asc(table.id)) # type: ignore[arg-type]
return query
list_of_filters: List[zenml.models.v2.base.filter.Filter]
property
readonly
Converts the class variables into a list of usable Filter Models.
Returns:
Type | Description |
---|---|
List[zenml.models.v2.base.filter.Filter] |
A list of Filter models. |
offset: int
property
readonly
Returns the offset needed for the query on the data persistence layer.
Returns:
Type | Description |
---|---|
int |
The offset for the query. |
sorting_params: Tuple[str, zenml.enums.SorterOps]
property
readonly
Converts the class variables into a list of usable Filter Models.
Returns:
Type | Description |
---|---|
Tuple[str, zenml.enums.SorterOps] |
A tuple of the column to sort by and the sorting operand. |
apply_filter(self, query, table)
Applies the filter to a query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
~AnyQuery |
The query to which to apply the filter. |
required |
table |
Type[AnySchema] |
The query table. |
required |
Returns:
Type | Description |
---|---|
~AnyQuery |
The query with filter applied. |
Source code in zenml/models/v2/base/page.py
def apply_filter(
self,
query: AnyQuery,
table: Type["AnySchema"],
) -> AnyQuery:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
rbac_filter = self.generate_rbac_filter(table=table)
if rbac_filter is not None:
query = query.where(rbac_filter)
filters = self.generate_filter(table=table)
if filters is not None:
query = query.where(filters)
return query
apply_sorting(self, query, table)
Apply sorting to the query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
~AnyQuery |
The query to which to apply the sorting. |
required |
table |
Type[AnySchema] |
The query table. |
required |
Returns:
Type | Description |
---|---|
~AnyQuery |
The query with sorting applied. |
Source code in zenml/models/v2/base/page.py
def apply_sorting(
self,
query: AnyQuery,
table: Type["AnySchema"],
) -> AnyQuery:
"""Apply sorting to the query.
Args:
query: The query to which to apply the sorting.
table: The query table.
Returns:
The query with sorting applied.
"""
column, operand = self.sorting_params
if operand == SorterOps.DESCENDING:
sort_clause = desc(getattr(table, column)) # type: ignore[var-annotated]
else:
sort_clause = asc(getattr(table, column))
# We always add the `id` column as a tiebreaker to ensure a stable,
# repeatable order of items, otherwise subsequent pages might contain
# the same items.
query = query.order_by(sort_clause, asc(table.id)) # type: ignore[arg-type]
return query
check_field_annotation(k, type_)
classmethod
Checks whether a model field has a certain annotation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The name of the field. |
required |
type_ |
Any |
The type to check. |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
if the model field within does not have an annotation. |
Returns:
Type | Description |
---|---|
bool |
True if the annotation of the field matches the given type, False otherwise. |
Source code in zenml/models/v2/base/page.py
@classmethod
def check_field_annotation(cls, k: str, type_: Any) -> bool:
"""Checks whether a model field has a certain annotation.
Args:
k: The name of the field.
type_: The type to check.
Raises:
ValueError: if the model field within does not have an annotation.
Returns:
True if the annotation of the field matches the given type, False
otherwise.
"""
try:
annotation = cls.model_fields[k].annotation
if annotation is not None:
return (
issubclass(type_, get_args(annotation))
or annotation is type_
)
else:
raise ValueError(
f"The field '{k}' inside the model {cls.__name__} "
"does not have an annotation."
)
except TypeError:
return False
configure_rbac(self, authenticated_user_id, **column_allowed_ids)
Configure RBAC allowed column values.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
authenticated_user_id |
UUID |
ID of the authenticated user. All entities owned by this user will be included. |
required |
column_allowed_ids |
Optional[Set[uuid.UUID]] |
Set of IDs per column to limit the query to.
If given, the remaining filters will be applied to entities
within this set only. If |
{} |
Source code in zenml/models/v2/base/page.py
def configure_rbac(
self,
authenticated_user_id: UUID,
**column_allowed_ids: Optional[Set[UUID]],
) -> None:
"""Configure RBAC allowed column values.
Args:
authenticated_user_id: ID of the authenticated user. All entities
owned by this user will be included.
column_allowed_ids: Set of IDs per column to limit the query to.
If given, the remaining filters will be applied to entities
within this set only. If `None`, the remaining filters will
be applied to all entries in the table.
"""
self._rbac_configuration = (authenticated_user_id, column_allowed_ids)
filter_ops(data, validation_info)
classmethod
Wrapper method to handle the raw data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cls |
the class handler |
required | |
data |
Any |
the raw input data |
required |
validation_info |
ValidationInfo |
the context of the validation. |
required |
Returns:
Type | Description |
---|---|
Any |
the validated data |
Source code in zenml/models/v2/base/page.py
def before_validator(
cls: Type[BaseModel], data: Any, validation_info: ValidationInfo
) -> Any:
"""Wrapper method to handle the raw data.
Args:
cls: the class handler
data: the raw input data
validation_info: the context of the validation.
Returns:
the validated data
"""
data = model_validator_data_handler(
raw_data=data, base_class=cls, validation_info=validation_info
)
return method(cls=cls, data=data)
generate_filter(self, table)
Generate the filter for the query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
Type[sqlmodel.main.SQLModel] |
The Table that is being queried from. |
required |
Returns:
Type | Description |
---|---|
ColumnElement[bool] |
The filter expression for the query. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If a valid logical operator is not supplied. |
Source code in zenml/models/v2/base/page.py
def generate_filter(
self, table: Type[SQLModel]
) -> Union["ColumnElement[bool]"]:
"""Generate the filter for the query.
Args:
table: The Table that is being queried from.
Returns:
The filter expression for the query.
Raises:
RuntimeError: If a valid logical operator is not supplied.
"""
from sqlmodel import and_, or_
filters = []
for column_filter in self.list_of_filters:
filters.append(
column_filter.generate_query_conditions(table=table)
)
for custom_filter in self.get_custom_filters():
filters.append(custom_filter)
if self.logical_operator == LogicalOperators.OR:
return or_(False, *filters)
elif self.logical_operator == LogicalOperators.AND:
return and_(True, *filters)
else:
raise RuntimeError("No valid logical operator was supplied.")
generate_rbac_filter(self, table)
Generates an optional RBAC filter.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
Type[AnySchema] |
The query table. |
required |
Returns:
Type | Description |
---|---|
Optional[ColumnElement[bool]] |
The RBAC filter. |
Source code in zenml/models/v2/base/page.py
def generate_rbac_filter(
self,
table: Type["AnySchema"],
) -> Optional["ColumnElement[bool]"]:
"""Generates an optional RBAC filter.
Args:
table: The query table.
Returns:
The RBAC filter.
"""
from sqlmodel import or_
if not self._rbac_configuration:
return None
expressions = []
for column_name, allowed_ids in self._rbac_configuration[1].items():
if allowed_ids is not None:
expression = getattr(table, column_name).in_(allowed_ids)
expressions.append(expression)
if expressions and hasattr(table, "user_id"):
# If `expressions` is not empty, we do not have full access to all
# rows of the table. In this case, we also include rows which the
# user owns.
# Unowned entities are considered server-owned and can be seen
# by anyone
expressions.append(getattr(table, "user_id").is_(None))
# The authenticated user owns this entity
expressions.append(
getattr(table, "user_id") == self._rbac_configuration[0]
)
if expressions:
return or_(*expressions)
else:
return None
get_custom_filters(self)
Get custom filters.
This can be overridden by subclasses to define custom filters that are not based on the columns of the underlying table.
Returns:
Type | Description |
---|---|
List[ColumnElement[bool]] |
A list of custom filters. |
Source code in zenml/models/v2/base/page.py
def get_custom_filters(self) -> List["ColumnElement[bool]"]:
"""Get custom filters.
This can be overridden by subclasses to define custom filters that are
not based on the columns of the underlying table.
Returns:
A list of custom filters.
"""
return []
is_bool_field(k)
classmethod
Checks if it's a bool field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a bool field, False otherwise. |
Source code in zenml/models/v2/base/page.py
@classmethod
def is_bool_field(cls, k: str) -> bool:
"""Checks if it's a bool field.
Args:
k: The key to check.
Returns:
True if the field is a bool field, False otherwise.
"""
return cls.check_field_annotation(k=k, type_=bool)
is_datetime_field(k)
classmethod
Checks if it's a datetime field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a datetime field, False otherwise. |
Source code in zenml/models/v2/base/page.py
@classmethod
def is_datetime_field(cls, k: str) -> bool:
"""Checks if it's a datetime field.
Args:
k: The key to check.
Returns:
True if the field is a datetime field, False otherwise.
"""
return cls.check_field_annotation(k=k, type_=datetime)
is_int_field(k)
classmethod
Checks if it's an int field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is an int field, False otherwise. |
Source code in zenml/models/v2/base/page.py
@classmethod
def is_int_field(cls, k: str) -> bool:
"""Checks if it's an int field.
Args:
k: The key to check.
Returns:
True if the field is an int field, False otherwise.
"""
return cls.check_field_annotation(k=k, type_=int)
is_sort_by_field(k)
classmethod
Checks if it's a sort by field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a sort by field, False otherwise. |
Source code in zenml/models/v2/base/page.py
@classmethod
def is_sort_by_field(cls, k: str) -> bool:
"""Checks if it's a sort by field.
Args:
k: The key to check.
Returns:
True if the field is a sort by field, False otherwise.
"""
return cls.check_field_annotation(k=k, type_=str) and k == "sort_by"
is_str_field(k)
classmethod
Checks if it's a string field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a string field, False otherwise. |
Source code in zenml/models/v2/base/page.py
@classmethod
def is_str_field(cls, k: str) -> bool:
"""Checks if it's a string field.
Args:
k: The key to check.
Returns:
True if the field is a string field, False otherwise.
"""
return cls.check_field_annotation(k=k, type_=str)
is_uuid_field(k)
classmethod
Checks if it's a UUID field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
k |
str |
The key to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the field is a UUID field, False otherwise. |
Source code in zenml/models/v2/base/page.py
@classmethod
def is_uuid_field(cls, k: str) -> bool:
"""Checks if it's a UUID field.
Args:
k: The key to check.
Returns:
True if the field is a UUID field, False otherwise.
"""
return cls.check_field_annotation(k=k, type_=UUID)
model_post_init(self, __context)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
self |
BaseModel |
The BaseModel instance. |
required |
__context |
Any |
The context. |
required |
Source code in zenml/models/v2/base/page.py
def init_private_attributes(self: BaseModel, __context: Any) -> None:
"""This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args:
self: The BaseModel instance.
__context: The context.
"""
if getattr(self, '__pydantic_private__', None) is None:
pydantic_private = {}
for name, private_attr in self.__private_attributes__.items():
default = private_attr.get_default()
if default is not PydanticUndefined:
pydantic_private[name] = default
object_setattr(self, '__pydantic_private__', pydantic_private)
validate_sort_by(value)
classmethod
Validate that the sort_column is a valid column with a valid operand.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The sort_by field value. |
required |
Returns:
Type | Description |
---|---|
Any |
The validated sort_by field value. |
Exceptions:
Type | Description |
---|---|
ValidationError |
If the sort_by field is not a string. |
ValueError |
If the resource can't be sorted by this field. |
Source code in zenml/models/v2/base/page.py
@field_validator("sort_by", mode="before")
@classmethod
def validate_sort_by(cls, value: Any) -> Any:
"""Validate that the sort_column is a valid column with a valid operand.
Args:
value: The sort_by field value.
Returns:
The validated sort_by field value.
Raises:
ValidationError: If the sort_by field is not a string.
ValueError: If the resource can't be sorted by this field.
"""
# Somehow pydantic allows you to pass in int values, which will be
# interpreted as string, however within the validator they are still
# integers, which don't have a .split() method
if not isinstance(value, str):
raise ValidationError(
f"str type expected for the sort_by field. "
f"Received a {type(value)}"
)
column = value
split_value = value.split(":", 1)
if len(split_value) == 2:
column = split_value[1]
if split_value[0] not in SorterOps.values():
logger.warning(
"Invalid operand used for column sorting. "
"Only the following operands are supported `%s`. "
"Defaulting to 'asc' on column `%s`.",
SorterOps.values(),
column,
)
value = column
if column in cls.FILTER_EXCLUDE_FIELDS:
raise ValueError(
f"This resource can not be sorted by this field: '{value}'"
)
elif column in cls.model_fields:
return value
elif column in cls.CUSTOM_SORTING_OPTIONS:
return value
else:
raise ValueError(
"You can only sort by valid fields of this resource"
)
__contains__(self, item)
special
Returns whether the page contains a specific item.
This enables item in page
checks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item |
~B |
The item to check for. |
required |
Returns:
Type | Description |
---|---|
bool |
Whether the item is in the page. |
Source code in zenml/models/v2/base/page.py
def __contains__(self, item: B) -> bool:
"""Returns whether the page contains a specific item.
This enables `item in page` checks.
Args:
item: The item to check for.
Returns:
Whether the item is in the page.
"""
return item in self.items
__getitem__(self, index)
special
Return the item at the given index.
This enables page[index]
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
index |
int |
The index to get the item from. |
required |
Returns:
Type | Description |
---|---|
~B |
The item at the given index. |
Source code in zenml/models/v2/base/page.py
def __getitem__(self, index: int) -> B:
"""Return the item at the given index.
This enables `page[index]`.
Args:
index: The index to get the item from.
Returns:
The item at the given index.
"""
return self.items[index]
__iter__(self)
special
Return an iterator over the items in the page.
This enables for item in page
loops, but breaks dict(page)
.
Yields:
Type | Description |
---|---|
Generator[~B, NoneType, NoneType] |
An iterator over the items in the page. |
Source code in zenml/models/v2/base/page.py
def __iter__(self) -> Generator[B, None, None]: # type: ignore[override]
"""Return an iterator over the items in the page.
This enables `for item in page` loops, but breaks `dict(page)`.
Yields:
An iterator over the items in the page.
"""
for item in self.items.__iter__():
yield item
__len__(self)
special
Return the item count of the page.
This enables len(page)
.
Returns:
Type | Description |
---|---|
int |
The amount of items in the page. |
Source code in zenml/models/v2/base/page.py
def __len__(self) -> int:
"""Return the item count of the page.
This enables `len(page)`.
Returns:
The amount of items in the page.
"""
return len(self.items)
scoped
Scoped model definitions.
UserScopedFilter (BaseFilter)
Model to enable advanced user-based scoping.
Source code in zenml/models/v2/base/scoped.py
class UserScopedFilter(BaseFilter):
"""Model to enable advanced user-based scoping."""
FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*BaseFilter.FILTER_EXCLUDE_FIELDS,
"scope_user",
]
CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*BaseFilter.CLI_EXCLUDE_FIELDS,
"scope_user",
]
scope_user: Optional[UUID] = Field(
default=None,
description="The user to scope this query to.",
)
def set_scope_user(self, user_id: UUID) -> None:
"""Set the user that is performing the filtering to scope the response.
Args:
user_id: The user ID to scope the response to.
"""
self.scope_user = user_id
def apply_filter(
self,
query: AnyQuery,
table: Type["AnySchema"],
) -> AnyQuery:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
query = super().apply_filter(query=query, table=table)
if self.scope_user:
query = query.where(getattr(table, "user_id") == self.scope_user)
return query
apply_filter(self, query, table)
Applies the filter to a query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
~AnyQuery |
The query to which to apply the filter. |
required |
table |
Type[AnySchema] |
The query table. |
required |
Returns:
Type | Description |
---|---|
~AnyQuery |
The query with filter applied. |
Source code in zenml/models/v2/base/scoped.py
def apply_filter(
self,
query: AnyQuery,
table: Type["AnySchema"],
) -> AnyQuery:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
query = super().apply_filter(query=query, table=table)
if self.scope_user:
query = query.where(getattr(table, "user_id") == self.scope_user)
return query
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
set_scope_user(self, user_id)
Set the user that is performing the filtering to scope the response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
UUID |
The user ID to scope the response to. |
required |
Source code in zenml/models/v2/base/scoped.py
def set_scope_user(self, user_id: UUID) -> None:
"""Set the user that is performing the filtering to scope the response.
Args:
user_id: The user ID to scope the response to.
"""
self.scope_user = user_id
UserScopedRequest (BaseRequest)
Base user-owned request model.
Used as a base class for all domain models that are "owned" by a user.
Source code in zenml/models/v2/base/scoped.py
class UserScopedRequest(BaseRequest):
"""Base user-owned request model.
Used as a base class for all domain models that are "owned" by a user.
"""
user: UUID = Field(title="The id of the user that created this resource.")
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for user scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["user_id"] = self.user
return metadata
get_analytics_metadata(self)
Fetches the analytics metadata for user scoped models.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The analytics metadata. |
Source code in zenml/models/v2/base/scoped.py
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for user scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["user_id"] = self.user
return metadata
UserScopedResponse (BaseIdentifiedResponse[~UserBody, ~UserMetadata, ~UserResources], Generic)
Base user-owned model.
Used as a base class for all domain models that are "owned" by a user.
Source code in zenml/models/v2/base/scoped.py
class UserScopedResponse(
BaseIdentifiedResponse[UserBody, UserMetadata, UserResources],
Generic[UserBody, UserMetadata, UserResources],
):
"""Base user-owned model.
Used as a base class for all domain models that are "owned" by a user.
"""
# Analytics
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for user scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
if self.user is not None:
metadata["user_id"] = self.user.id
return metadata
# Body and metadata properties
@property
def user(self) -> Optional["UserResponse"]:
"""The `user` property.
Returns:
the value of the property.
"""
return self.get_body().user
user: Optional[UserResponse]
property
readonly
The user
property.
Returns:
Type | Description |
---|---|
Optional[UserResponse] |
the value of the property. |
get_analytics_metadata(self)
Fetches the analytics metadata for user scoped models.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The analytics metadata. |
Source code in zenml/models/v2/base/scoped.py
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for user scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
if self.user is not None:
metadata["user_id"] = self.user.id
return metadata
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
UserScopedResponseBody (BaseDatedResponseBody)
Base user-owned body.
Source code in zenml/models/v2/base/scoped.py
class UserScopedResponseBody(BaseDatedResponseBody):
"""Base user-owned body."""
user: Optional["UserResponse"] = Field(
title="The user who created this resource.", default=None
)
UserScopedResponseMetadata (BaseResponseMetadata)
Base user-owned metadata.
Source code in zenml/models/v2/base/scoped.py
class UserScopedResponseMetadata(BaseResponseMetadata):
"""Base user-owned metadata."""
UserScopedResponseResources (BaseResponseResources)
Base class for all resource models associated with the user.
Source code in zenml/models/v2/base/scoped.py
class UserScopedResponseResources(BaseResponseResources):
"""Base class for all resource models associated with the user."""
UserScopedResponse[FlavorResponseBody, FlavorResponseMetadata, FlavorResponseResources] (UserScopedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
UserScopedResponse[OAuthDeviceResponseBody, OAuthDeviceResponseMetadata, OAuthDeviceResponseResources] (UserScopedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
UserScopedResponse[~WorkspaceBody, ~WorkspaceMetadata, ~WorkspaceResources] (UserScopedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
WorkspaceScopedFilter (BaseFilter)
Model to enable advanced scoping with workspace.
Source code in zenml/models/v2/base/scoped.py
class WorkspaceScopedFilter(BaseFilter):
"""Model to enable advanced scoping with workspace."""
FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*BaseFilter.FILTER_EXCLUDE_FIELDS,
"scope_workspace",
]
CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*BaseFilter.CLI_EXCLUDE_FIELDS,
"scope_workspace",
]
scope_workspace: Optional[UUID] = Field(
default=None,
description="The workspace to scope this query to.",
)
def set_scope_workspace(self, workspace_id: UUID) -> None:
"""Set the workspace to scope this response.
Args:
workspace_id: The workspace to scope this response to.
"""
self.scope_workspace = workspace_id
def apply_filter(
self,
query: AnyQuery,
table: Type["AnySchema"],
) -> AnyQuery:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
from sqlmodel import or_
query = super().apply_filter(query=query, table=table)
if self.scope_workspace:
scope_filter = or_(
getattr(table, "workspace_id") == self.scope_workspace,
getattr(table, "workspace_id").is_(None),
)
query = query.where(scope_filter)
return query
apply_filter(self, query, table)
Applies the filter to a query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
~AnyQuery |
The query to which to apply the filter. |
required |
table |
Type[AnySchema] |
The query table. |
required |
Returns:
Type | Description |
---|---|
~AnyQuery |
The query with filter applied. |
Source code in zenml/models/v2/base/scoped.py
def apply_filter(
self,
query: AnyQuery,
table: Type["AnySchema"],
) -> AnyQuery:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
from sqlmodel import or_
query = super().apply_filter(query=query, table=table)
if self.scope_workspace:
scope_filter = or_(
getattr(table, "workspace_id") == self.scope_workspace,
getattr(table, "workspace_id").is_(None),
)
query = query.where(scope_filter)
return query
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
set_scope_workspace(self, workspace_id)
Set the workspace to scope this response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workspace_id |
UUID |
The workspace to scope this response to. |
required |
Source code in zenml/models/v2/base/scoped.py
def set_scope_workspace(self, workspace_id: UUID) -> None:
"""Set the workspace to scope this response.
Args:
workspace_id: The workspace to scope this response to.
"""
self.scope_workspace = workspace_id
WorkspaceScopedRequest (UserScopedRequest)
Base workspace-scoped request domain model.
Used as a base class for all domain models that are workspace-scoped.
Source code in zenml/models/v2/base/scoped.py
class WorkspaceScopedRequest(UserScopedRequest):
"""Base workspace-scoped request domain model.
Used as a base class for all domain models that are workspace-scoped.
"""
workspace: UUID = Field(
title="The workspace to which this resource belongs."
)
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for workspace scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["workspace_id"] = self.workspace
return metadata
get_analytics_metadata(self)
Fetches the analytics metadata for workspace scoped models.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The analytics metadata. |
Source code in zenml/models/v2/base/scoped.py
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for workspace scoped models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["workspace_id"] = self.workspace
return metadata
WorkspaceScopedResponse (UserScopedResponse[~WorkspaceBody, ~WorkspaceMetadata, ~WorkspaceResources], Generic)
Base workspace-scoped domain model.
Used as a base class for all domain models that are workspace-scoped.
Source code in zenml/models/v2/base/scoped.py
class WorkspaceScopedResponse(
UserScopedResponse[WorkspaceBody, WorkspaceMetadata, WorkspaceResources],
Generic[WorkspaceBody, WorkspaceMetadata, WorkspaceResources],
):
"""Base workspace-scoped domain model.
Used as a base class for all domain models that are workspace-scoped.
"""
# Body and metadata properties
@property
def workspace(self) -> "WorkspaceResponse":
"""The workspace property.
Returns:
the value of the property.
"""
return self.get_metadata().workspace
workspace: WorkspaceResponse
property
readonly
The workspace property.
Returns:
Type | Description |
---|---|
WorkspaceResponse |
the value of the property. |
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
WorkspaceScopedResponseBody (UserScopedResponseBody)
Base workspace-scoped body.
Source code in zenml/models/v2/base/scoped.py
class WorkspaceScopedResponseBody(UserScopedResponseBody):
"""Base workspace-scoped body."""
WorkspaceScopedResponseMetadata (UserScopedResponseMetadata)
Base workspace-scoped metadata.
Source code in zenml/models/v2/base/scoped.py
class WorkspaceScopedResponseMetadata(UserScopedResponseMetadata):
"""Base workspace-scoped metadata."""
workspace: "WorkspaceResponse" = Field(
title="The workspace of this resource."
)
WorkspaceScopedResponseResources (UserScopedResponseResources)
Base workspace-scoped resources.
Source code in zenml/models/v2/base/scoped.py
class WorkspaceScopedResponseResources(UserScopedResponseResources):
"""Base workspace-scoped resources."""
WorkspaceScopedResponse[ActionResponseBody, ActionResponseMetadata, ActionResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
WorkspaceScopedResponse[ArtifactVersionResponseBody, ArtifactVersionResponseMetadata, ArtifactVersionResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
WorkspaceScopedResponse[CodeRepositoryResponseBody, CodeRepositoryResponseMetadata, CodeRepositoryResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
WorkspaceScopedResponse[ComponentResponseBody, ComponentResponseMetadata, ComponentResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
WorkspaceScopedResponse[EventSourceResponseBody, EventSourceResponseMetadata, EventSourceResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
WorkspaceScopedResponse[ModelResponseBody, ModelResponseMetadata, ModelResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
WorkspaceScopedResponse[ModelVersionResponseBody, ModelVersionResponseMetadata, ModelVersionResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
WorkspaceScopedResponse[PipelineBuildResponseBody, PipelineBuildResponseMetadata, PipelineBuildResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
WorkspaceScopedResponse[PipelineDeploymentResponseBody, PipelineDeploymentResponseMetadata, PipelineDeploymentResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
WorkspaceScopedResponse[PipelineResponseBody, PipelineResponseMetadata, PipelineResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
WorkspaceScopedResponse[PipelineRunResponseBody, PipelineRunResponseMetadata, PipelineRunResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
WorkspaceScopedResponse[RunMetadataResponseBody, RunMetadataResponseMetadata, RunMetadataResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
WorkspaceScopedResponse[RunTemplateResponseBody, RunTemplateResponseMetadata, RunTemplateResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
WorkspaceScopedResponse[ScheduleResponseBody, ScheduleResponseMetadata, ScheduleResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
WorkspaceScopedResponse[SecretResponseBody, SecretResponseMetadata, SecretResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
WorkspaceScopedResponse[ServiceConnectorResponseBody, ServiceConnectorResponseMetadata, ServiceConnectorResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
WorkspaceScopedResponse[ServiceResponseBody, ServiceResponseMetadata, ServiceResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
WorkspaceScopedResponse[StackResponseBody, StackResponseMetadata, StackResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
WorkspaceScopedResponse[StepRunResponseBody, StepRunResponseMetadata, StepRunResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
WorkspaceScopedResponse[TriggerResponseBody, TriggerResponseMetadata, TriggerResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
WorkspaceScopedTaggableFilter (WorkspaceScopedFilter)
Model to enable advanced scoping with workspace and tagging.
Source code in zenml/models/v2/base/scoped.py
class WorkspaceScopedTaggableFilter(WorkspaceScopedFilter):
"""Model to enable advanced scoping with workspace and tagging."""
tag: Optional[str] = Field(
description="Tag to apply to the filter query.", default=None
)
FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*WorkspaceScopedFilter.FILTER_EXCLUDE_FIELDS,
"tag",
]
def apply_filter(
self,
query: AnyQuery,
table: Type["AnySchema"],
) -> AnyQuery:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
from zenml.zen_stores.schemas import TagResourceSchema
query = super().apply_filter(query=query, table=table)
if self.tag:
query = (
query.join(getattr(table, "tags"))
.join(TagResourceSchema.tag)
.distinct()
)
return query
def get_custom_filters(self) -> List["ColumnElement[bool]"]:
"""Get custom tag filters.
Returns:
A list of custom filters.
"""
from zenml.zen_stores.schemas import TagSchema
custom_filters = super().get_custom_filters()
if self.tag:
custom_filters.append(col(TagSchema.name) == self.tag)
return custom_filters
apply_filter(self, query, table)
Applies the filter to a query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
~AnyQuery |
The query to which to apply the filter. |
required |
table |
Type[AnySchema] |
The query table. |
required |
Returns:
Type | Description |
---|---|
~AnyQuery |
The query with filter applied. |
Source code in zenml/models/v2/base/scoped.py
def apply_filter(
self,
query: AnyQuery,
table: Type["AnySchema"],
) -> AnyQuery:
"""Applies the filter to a query.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
from zenml.zen_stores.schemas import TagResourceSchema
query = super().apply_filter(query=query, table=table)
if self.tag:
query = (
query.join(getattr(table, "tags"))
.join(TagResourceSchema.tag)
.distinct()
)
return query
get_custom_filters(self)
Get custom tag filters.
Returns:
Type | Description |
---|---|
List[ColumnElement[bool]] |
A list of custom filters. |
Source code in zenml/models/v2/base/scoped.py
def get_custom_filters(self) -> List["ColumnElement[bool]"]:
"""Get custom tag filters.
Returns:
A list of custom filters.
"""
from zenml.zen_stores.schemas import TagSchema
custom_filters = super().get_custom_filters()
if self.tag:
custom_filters.append(col(TagSchema.name) == self.tag)
return custom_filters
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
core
special
action
Collection of all models concerning actions.
ActionFilter (WorkspaceScopedFilter)
Model to enable advanced filtering of all actions.
Source code in zenml/models/v2/core/action.py
class ActionFilter(WorkspaceScopedFilter):
"""Model to enable advanced filtering of all actions."""
name: Optional[str] = Field(
default=None,
description="Name of the action.",
)
flavor: Optional[str] = Field(
default=None,
title="The flavor of the action.",
)
plugin_subtype: Optional[str] = Field(
default=None,
title="The subtype of the action.",
)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/core/action.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
ActionRequest (WorkspaceScopedRequest)
Model for creating a new action.
Source code in zenml/models/v2/core/action.py
class ActionRequest(WorkspaceScopedRequest):
"""Model for creating a new action."""
name: str = Field(
title="The name of the action.", max_length=STR_FIELD_MAX_LENGTH
)
description: str = Field(
default="",
title="The description of the action",
max_length=STR_FIELD_MAX_LENGTH,
)
flavor: str = Field(
title="The flavor of the action.",
max_length=STR_FIELD_MAX_LENGTH,
)
plugin_subtype: PluginSubType = Field(
title="The subtype of the action.",
max_length=STR_FIELD_MAX_LENGTH,
)
configuration: Dict[str, Any] = Field(
title="The configuration for the action.",
)
service_account_id: UUID = Field(
title="The service account that is used to execute the action.",
)
auth_window: Optional[int] = Field(
default=None,
title="The time window in minutes for which the service account is "
"authorized to execute the action. Set this to 0 to authorize the "
"service account indefinitely (not recommended). If not set, a "
"default value defined for each individual action type is used.",
)
ActionResponse (WorkspaceScopedResponse[ActionResponseBody, ActionResponseMetadata, ActionResponseResources])
Response model for actions.
Source code in zenml/models/v2/core/action.py
class ActionResponse(
WorkspaceScopedResponse[
ActionResponseBody, ActionResponseMetadata, ActionResponseResources
]
):
"""Response model for actions."""
name: str = Field(
title="The name of the action.",
max_length=STR_FIELD_MAX_LENGTH,
)
def get_hydrated_version(self) -> "ActionResponse":
"""Get the hydrated version of this action.
Returns:
An instance of the same entity with the metadata field attached.
"""
from zenml.client import Client
return Client().zen_store.get_action(self.id)
# Body and metadata properties
@property
def flavor(self) -> str:
"""The `flavor` property.
Returns:
the value of the property.
"""
return self.get_body().flavor
@property
def plugin_subtype(self) -> PluginSubType:
"""The `plugin_subtype` property.
Returns:
the value of the property.
"""
return self.get_body().plugin_subtype
@property
def description(self) -> str:
"""The `description` property.
Returns:
the value of the property.
"""
return self.get_metadata().description
@property
def auth_window(self) -> int:
"""The `auth_window` property.
Returns:
the value of the property.
"""
return self.get_metadata().auth_window
@property
def configuration(self) -> Dict[str, Any]:
"""The `configuration` property.
Returns:
the value of the property.
"""
return self.get_metadata().configuration
def set_configuration(self, configuration: Dict[str, Any]) -> None:
"""Set the `configuration` property.
Args:
configuration: The value to set.
"""
self.get_metadata().configuration = configuration
# Resource properties
@property
def service_account(self) -> "UserResponse":
"""The `service_account` property.
Returns:
the value of the property.
"""
return self.get_resources().service_account
auth_window: int
property
readonly
The auth_window
property.
Returns:
Type | Description |
---|---|
int |
the value of the property. |
configuration: Dict[str, Any]
property
readonly
The configuration
property.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
the value of the property. |
description: str
property
readonly
The description
property.
Returns:
Type | Description |
---|---|
str |
the value of the property. |
flavor: str
property
readonly
The flavor
property.
Returns:
Type | Description |
---|---|
str |
the value of the property. |
plugin_subtype: PluginSubType
property
readonly
The plugin_subtype
property.
Returns:
Type | Description |
---|---|
PluginSubType |
the value of the property. |
service_account: UserResponse
property
readonly
The service_account
property.
Returns:
Type | Description |
---|---|
UserResponse |
the value of the property. |
get_hydrated_version(self)
Get the hydrated version of this action.
Returns:
Type | Description |
---|---|
ActionResponse |
An instance of the same entity with the metadata field attached. |
Source code in zenml/models/v2/core/action.py
def get_hydrated_version(self) -> "ActionResponse":
"""Get the hydrated version of this action.
Returns:
An instance of the same entity with the metadata field attached.
"""
from zenml.client import Client
return Client().zen_store.get_action(self.id)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/core/action.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
set_configuration(self, configuration)
Set the configuration
property.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
configuration |
Dict[str, Any] |
The value to set. |
required |
Source code in zenml/models/v2/core/action.py
def set_configuration(self, configuration: Dict[str, Any]) -> None:
"""Set the `configuration` property.
Args:
configuration: The value to set.
"""
self.get_metadata().configuration = configuration
ActionResponseBody (WorkspaceScopedResponseBody)
Response body for actions.
Source code in zenml/models/v2/core/action.py
class ActionResponseBody(WorkspaceScopedResponseBody):
"""Response body for actions."""
flavor: str = Field(
title="The flavor of the action.",
max_length=STR_FIELD_MAX_LENGTH,
)
plugin_subtype: PluginSubType = Field(
title="The subtype of the action.",
max_length=STR_FIELD_MAX_LENGTH,
)
ActionResponseMetadata (WorkspaceScopedResponseMetadata)
Response metadata for actions.
Source code in zenml/models/v2/core/action.py
class ActionResponseMetadata(WorkspaceScopedResponseMetadata):
"""Response metadata for actions."""
description: str = Field(
default="",
title="The description of the action.",
max_length=STR_FIELD_MAX_LENGTH,
)
configuration: Dict[str, Any] = Field(
title="The configuration for the action.",
)
auth_window: int = Field(
title="The time window in minutes for which the service account is "
"authorized to execute the action."
)
ActionResponseResources (WorkspaceScopedResponseResources)
Class for all resource models associated with the action entity.
Source code in zenml/models/v2/core/action.py
class ActionResponseResources(WorkspaceScopedResponseResources):
"""Class for all resource models associated with the action entity."""
service_account: UserResponse = Field(
title="The service account that is used to execute the action.",
)
ActionUpdate (BaseUpdate)
Update model for actions.
Source code in zenml/models/v2/core/action.py
class ActionUpdate(BaseUpdate):
"""Update model for actions."""
name: Optional[str] = Field(
default=None,
title="The new name for the action.",
max_length=STR_FIELD_MAX_LENGTH,
)
description: Optional[str] = Field(
default=None,
title="The new description for the action.",
max_length=STR_FIELD_MAX_LENGTH,
)
configuration: Optional[Dict[str, Any]] = Field(
default=None,
title="The configuration for the action.",
)
service_account_id: Optional[UUID] = Field(
default=None,
title="The service account that is used to execute the action.",
)
auth_window: Optional[int] = Field(
default=None,
title="The time window in minutes for which the service account is "
"authorized to execute the action. Set this to 0 to authorize the "
"service account indefinitely (not recommended). If not set, a "
"default value defined for each individual action type is used.",
)
@classmethod
def from_response(cls, response: "ActionResponse") -> "ActionUpdate":
"""Create an update model from a response model.
Args:
response: The response model to create the update model from.
Returns:
The update model.
"""
return ActionUpdate(
configuration=copy.deepcopy(response.configuration),
)
from_response(response)
classmethod
Create an update model from a response model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
response |
ActionResponse |
The response model to create the update model from. |
required |
Returns:
Type | Description |
---|---|
ActionUpdate |
The update model. |
Source code in zenml/models/v2/core/action.py
@classmethod
def from_response(cls, response: "ActionResponse") -> "ActionUpdate":
"""Create an update model from a response model.
Args:
response: The response model to create the update model from.
Returns:
The update model.
"""
return ActionUpdate(
configuration=copy.deepcopy(response.configuration),
)
action_flavor
Action flavor model definitions.
ActionFlavorResponse (BasePluginFlavorResponse[ActionFlavorResponseBody, ActionFlavorResponseMetadata, ActionFlavorResponseResources])
Response model for Action Flavors.
Source code in zenml/models/v2/core/action_flavor.py
class ActionFlavorResponse(
BasePluginFlavorResponse[
ActionFlavorResponseBody,
ActionFlavorResponseMetadata,
ActionFlavorResponseResources,
]
):
"""Response model for Action Flavors."""
# Body and metadata properties
@property
def config_schema(self) -> Dict[str, Any]:
"""The `source_config_schema` property.
Returns:
the value of the property.
"""
return self.get_metadata().config_schema
config_schema: Dict[str, Any]
property
readonly
The source_config_schema
property.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
the value of the property. |
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/core/action_flavor.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
ActionFlavorResponseBody (BasePluginResponseBody)
Response body for action flavors.
Source code in zenml/models/v2/core/action_flavor.py
class ActionFlavorResponseBody(BasePluginResponseBody):
"""Response body for action flavors."""
ActionFlavorResponseMetadata (BasePluginResponseMetadata)
Response metadata for action flavors.
Source code in zenml/models/v2/core/action_flavor.py
class ActionFlavorResponseMetadata(BasePluginResponseMetadata):
"""Response metadata for action flavors."""
config_schema: Dict[str, Any]
ActionFlavorResponseResources (BasePluginResponseResources)
Response resources for action flavors.
Source code in zenml/models/v2/core/action_flavor.py
class ActionFlavorResponseResources(BasePluginResponseResources):
"""Response resources for action flavors."""
api_key
Models representing API keys.
APIKey (BaseModel)
Encoded model for API keys.
Source code in zenml/models/v2/core/api_key.py
class APIKey(BaseModel):
"""Encoded model for API keys."""
id: UUID
key: str
@classmethod
def decode_api_key(cls, encoded_key: str) -> "APIKey":
"""Decodes an API key from a base64 string.
Args:
encoded_key: The encoded API key.
Returns:
The decoded API key.
Raises:
ValueError: If the key is not valid.
"""
if encoded_key.startswith(ZENML_API_KEY_PREFIX):
encoded_key = encoded_key[len(ZENML_API_KEY_PREFIX) :]
try:
json_key = b64_decode(encoded_key)
return cls.model_validate_json(json_key)
except Exception:
raise ValueError("Invalid API key.")
def encode(self) -> str:
"""Encodes the API key in a base64 string that includes the key ID and prefix.
Returns:
The encoded API key.
"""
encoded_key = b64_encode(self.model_dump_json())
return f"{ZENML_API_KEY_PREFIX}{encoded_key}"
decode_api_key(encoded_key)
classmethod
Decodes an API key from a base64 string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
encoded_key |
str |
The encoded API key. |
required |
Returns:
Type | Description |
---|---|
APIKey |
The decoded API key. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the key is not valid. |
Source code in zenml/models/v2/core/api_key.py
@classmethod
def decode_api_key(cls, encoded_key: str) -> "APIKey":
"""Decodes an API key from a base64 string.
Args:
encoded_key: The encoded API key.
Returns:
The decoded API key.
Raises:
ValueError: If the key is not valid.
"""
if encoded_key.startswith(ZENML_API_KEY_PREFIX):
encoded_key = encoded_key[len(ZENML_API_KEY_PREFIX) :]
try:
json_key = b64_decode(encoded_key)
return cls.model_validate_json(json_key)
except Exception:
raise ValueError("Invalid API key.")
encode(self)
Encodes the API key in a base64 string that includes the key ID and prefix.
Returns:
Type | Description |
---|---|
str |
The encoded API key. |
Source code in zenml/models/v2/core/api_key.py
def encode(self) -> str:
"""Encodes the API key in a base64 string that includes the key ID and prefix.
Returns:
The encoded API key.
"""
encoded_key = b64_encode(self.model_dump_json())
return f"{ZENML_API_KEY_PREFIX}{encoded_key}"
APIKeyFilter (BaseFilter)
Filter model for API keys.
Source code in zenml/models/v2/core/api_key.py
class APIKeyFilter(BaseFilter):
"""Filter model for API keys."""
FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*BaseFilter.FILTER_EXCLUDE_FIELDS,
"service_account",
]
CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*BaseFilter.CLI_EXCLUDE_FIELDS,
"service_account",
]
service_account: Optional[UUID] = Field(
default=None,
description="The service account to scope this query to.",
)
name: Optional[str] = Field(
default=None,
description="Name of the API key",
)
description: Optional[str] = Field(
default=None,
title="Filter by the API key description.",
)
active: Optional[Union[bool, str]] = Field(
default=None,
title="Whether the API key is active.",
union_mode="left_to_right",
)
last_login: Optional[Union[datetime, str]] = Field(
default=None,
title="Time when the API key was last used to log in.",
union_mode="left_to_right",
)
last_rotated: Optional[Union[datetime, str]] = Field(
default=None,
title="Time when the API key was last rotated.",
union_mode="left_to_right",
)
def set_service_account(self, service_account_id: UUID) -> None:
"""Set the service account by which to scope this query.
Args:
service_account_id: The service account ID.
"""
self.service_account = service_account_id
def apply_filter(
self,
query: AnyQuery,
table: Type["AnySchema"],
) -> AnyQuery:
"""Override to apply the service account scope as an additional filter.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
query = super().apply_filter(query=query, table=table)
if self.service_account:
scope_filter = (
getattr(table, "service_account_id") == self.service_account
)
query = query.where(scope_filter)
return query
apply_filter(self, query, table)
Override to apply the service account scope as an additional filter.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
~AnyQuery |
The query to which to apply the filter. |
required |
table |
Type[AnySchema] |
The query table. |
required |
Returns:
Type | Description |
---|---|
~AnyQuery |
The query with filter applied. |
Source code in zenml/models/v2/core/api_key.py
def apply_filter(
self,
query: AnyQuery,
table: Type["AnySchema"],
) -> AnyQuery:
"""Override to apply the service account scope as an additional filter.
Args:
query: The query to which to apply the filter.
table: The query table.
Returns:
The query with filter applied.
"""
query = super().apply_filter(query=query, table=table)
if self.service_account:
scope_filter = (
getattr(table, "service_account_id") == self.service_account
)
query = query.where(scope_filter)
return query
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/core/api_key.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
set_service_account(self, service_account_id)
Set the service account by which to scope this query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service_account_id |
UUID |
The service account ID. |
required |
Source code in zenml/models/v2/core/api_key.py
def set_service_account(self, service_account_id: UUID) -> None:
"""Set the service account by which to scope this query.
Args:
service_account_id: The service account ID.
"""
self.service_account = service_account_id
APIKeyInternalResponse (APIKeyResponse)
Response model for API keys used internally.
Source code in zenml/models/v2/core/api_key.py
class APIKeyInternalResponse(APIKeyResponse):
"""Response model for API keys used internally."""
previous_key: Optional[str] = Field(
default=None,
title="The previous API key. Only set if the key was rotated.",
)
def verify_key(
self,
key: str,
) -> bool:
"""Verifies a given key against the stored (hashed) key(s).
Args:
key: Input key to be verified.
Returns:
True if the keys match.
"""
# even when the hashed key is not set, we still want to execute
# the hash verification to protect against response discrepancy
# attacks (https://cwe.mitre.org/data/definitions/204.html)
key_hash: Optional[str] = None
context = CryptContext(schemes=["bcrypt"], deprecated="auto")
if self.key is not None and self.active:
key_hash = self.key
result = context.verify(key, key_hash)
# same for the previous key, if set and if it's still valid
key_hash = None
if (
self.previous_key is not None
and self.last_rotated is not None
and self.active
and self.retain_period_minutes > 0
):
# check if the previous key is still valid
if datetime.utcnow() - self.last_rotated < timedelta(
minutes=self.retain_period_minutes
):
key_hash = self.previous_key
previous_result = context.verify(key, key_hash)
return result or previous_result
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/core/api_key.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
verify_key(self, key)
Verifies a given key against the stored (hashed) key(s).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
Input key to be verified. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the keys match. |
Source code in zenml/models/v2/core/api_key.py
def verify_key(
self,
key: str,
) -> bool:
"""Verifies a given key against the stored (hashed) key(s).
Args:
key: Input key to be verified.
Returns:
True if the keys match.
"""
# even when the hashed key is not set, we still want to execute
# the hash verification to protect against response discrepancy
# attacks (https://cwe.mitre.org/data/definitions/204.html)
key_hash: Optional[str] = None
context = CryptContext(schemes=["bcrypt"], deprecated="auto")
if self.key is not None and self.active:
key_hash = self.key
result = context.verify(key, key_hash)
# same for the previous key, if set and if it's still valid
key_hash = None
if (
self.previous_key is not None
and self.last_rotated is not None
and self.active
and self.retain_period_minutes > 0
):
# check if the previous key is still valid
if datetime.utcnow() - self.last_rotated < timedelta(
minutes=self.retain_period_minutes
):
key_hash = self.previous_key
previous_result = context.verify(key, key_hash)
return result or previous_result
APIKeyInternalUpdate (APIKeyUpdate)
Update model for API keys used internally.
Source code in zenml/models/v2/core/api_key.py
class APIKeyInternalUpdate(APIKeyUpdate):
"""Update model for API keys used internally."""
update_last_login: bool = Field(
default=False,
title="Whether to update the last login timestamp.",
)
APIKeyRequest (BaseRequest)
Request model for API keys.
Source code in zenml/models/v2/core/api_key.py
class APIKeyRequest(BaseRequest):
"""Request model for API keys."""
name: str = Field(
title="The name of the API Key.",
max_length=STR_FIELD_MAX_LENGTH,
)
description: Optional[str] = Field(
default=None,
title="The description of the API Key.",
max_length=TEXT_FIELD_MAX_LENGTH,
)
APIKeyResponse (BaseIdentifiedResponse[APIKeyResponseBody, APIKeyResponseMetadata, APIKeyResponseResources])
Response model for API keys.
Source code in zenml/models/v2/core/api_key.py
class APIKeyResponse(
BaseIdentifiedResponse[
APIKeyResponseBody, APIKeyResponseMetadata, APIKeyResponseResources
]
):
"""Response model for API keys."""
name: str = Field(
title="The name of the API Key.",
max_length=STR_FIELD_MAX_LENGTH,
)
_warn_on_response_updates = False
def get_hydrated_version(self) -> "APIKeyResponse":
"""Get the hydrated version of this API key.
Returns:
an instance of the same entity with the metadata field attached.
"""
from zenml.client import Client
return Client().zen_store.get_api_key(
service_account_id=self.service_account.id,
api_key_name_or_id=self.id,
)
# Helper functions
def set_key(self, key: str) -> None:
"""Sets the API key and encodes it.
Args:
key: The API key value to be set.
"""
self.get_body().key = APIKey(id=self.id, key=key).encode()
# Body and metadata properties
@property
def key(self) -> Optional[str]:
"""The `key` property.
Returns:
the value of the property.
"""
return self.get_body().key
@property
def active(self) -> bool:
"""The `active` property.
Returns:
the value of the property.
"""
return self.get_body().active
@property
def service_account(self) -> "ServiceAccountResponse":
"""The `service_account` property.
Returns:
the value of the property.
"""
return self.get_body().service_account
@property
def description(self) -> str:
"""The `description` property.
Returns:
the value of the property.
"""
return self.get_metadata().description
@property
def retain_period_minutes(self) -> int:
"""The `retain_period_minutes` property.
Returns:
the value of the property.
"""
return self.get_metadata().retain_period_minutes
@property
def last_login(self) -> Optional[datetime]:
"""The `last_login` property.
Returns:
the value of the property.
"""
return self.get_metadata().last_login
@property
def last_rotated(self) -> Optional[datetime]:
"""The `last_rotated` property.
Returns:
the value of the property.
"""
return self.get_metadata().last_rotated
active: bool
property
readonly
The active
property.
Returns:
Type | Description |
---|---|
bool |
the value of the property. |
description: str
property
readonly
The description
property.
Returns:
Type | Description |
---|---|
str |
the value of the property. |
key: Optional[str]
property
readonly
The key
property.
Returns:
Type | Description |
---|---|
Optional[str] |
the value of the property. |
last_login: Optional[datetime.datetime]
property
readonly
The last_login
property.
Returns:
Type | Description |
---|---|
Optional[datetime.datetime] |
the value of the property. |
last_rotated: Optional[datetime.datetime]
property
readonly
The last_rotated
property.
Returns:
Type | Description |
---|---|
Optional[datetime.datetime] |
the value of the property. |
retain_period_minutes: int
property
readonly
The retain_period_minutes
property.
Returns:
Type | Description |
---|---|
int |
the value of the property. |
service_account: ServiceAccountResponse
property
readonly
The service_account
property.
Returns:
Type | Description |
---|---|
ServiceAccountResponse |
the value of the property. |
get_hydrated_version(self)
Get the hydrated version of this API key.
Returns:
Type | Description |
---|---|
APIKeyResponse |
an instance of the same entity with the metadata field attached. |
Source code in zenml/models/v2/core/api_key.py
def get_hydrated_version(self) -> "APIKeyResponse":
"""Get the hydrated version of this API key.
Returns:
an instance of the same entity with the metadata field attached.
"""
from zenml.client import Client
return Client().zen_store.get_api_key(
service_account_id=self.service_account.id,
api_key_name_or_id=self.id,
)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/core/api_key.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
set_key(self, key)
Sets the API key and encodes it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
The API key value to be set. |
required |
Source code in zenml/models/v2/core/api_key.py
def set_key(self, key: str) -> None:
"""Sets the API key and encodes it.
Args:
key: The API key value to be set.
"""
self.get_body().key = APIKey(id=self.id, key=key).encode()
APIKeyResponseBody (BaseDatedResponseBody)
Response body for API keys.
Source code in zenml/models/v2/core/api_key.py
class APIKeyResponseBody(BaseDatedResponseBody):
"""Response body for API keys."""
key: Optional[str] = Field(
default=None,
title="The API key. Only set immediately after creation or rotation.",
)
active: bool = Field(
default=True,
title="Whether the API key is active.",
)
service_account: "ServiceAccountResponse" = Field(
title="The service account associated with this API key."
)
APIKeyResponseMetadata (BaseResponseMetadata)
Response metadata for API keys.
Source code in zenml/models/v2/core/api_key.py
class APIKeyResponseMetadata(BaseResponseMetadata):
"""Response metadata for API keys."""
description: str = Field(
default="",
title="The description of the API Key.",
max_length=TEXT_FIELD_MAX_LENGTH,
)
retain_period_minutes: int = Field(
title="Number of minutes for which the previous key is still valid "
"after it has been rotated.",
)
last_login: Optional[datetime] = Field(
default=None, title="Time when the API key was last used to log in."
)
last_rotated: Optional[datetime] = Field(
default=None, title="Time when the API key was last rotated."
)
APIKeyResponseResources (BaseResponseResources)
Class for all resource models associated with the APIKey entity.
Source code in zenml/models/v2/core/api_key.py
class APIKeyResponseResources(BaseResponseResources):
"""Class for all resource models associated with the APIKey entity."""
APIKeyRotateRequest (BaseModel)
Request model for API key rotation.
Source code in zenml/models/v2/core/api_key.py
class APIKeyRotateRequest(BaseModel):
"""Request model for API key rotation."""
retain_period_minutes: int = Field(
default=0,
title="Number of minutes for which the previous key is still valid "
"after it has been rotated.",
)
APIKeyUpdate (BaseUpdate)
Update model for API keys.
Source code in zenml/models/v2/core/api_key.py
class APIKeyUpdate(BaseUpdate):
"""Update model for API keys."""
name: Optional[str] = Field(
title="The name of the API Key.",
max_length=STR_FIELD_MAX_LENGTH,
default=None,
)
description: Optional[str] = Field(
title="The description of the API Key.",
max_length=TEXT_FIELD_MAX_LENGTH,
default=None,
)
active: Optional[bool] = Field(
title="Whether the API key is active.",
default=None,
)
artifact
Models representing artifacts.
ArtifactFilter (WorkspaceScopedTaggableFilter)
Model to enable advanced filtering of artifacts.
Source code in zenml/models/v2/core/artifact.py
class ArtifactFilter(WorkspaceScopedTaggableFilter):
"""Model to enable advanced filtering of artifacts."""
name: Optional[str] = None
has_custom_name: Optional[bool] = None
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/core/artifact.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
ArtifactRequest (BaseRequest)
Artifact request model.
Source code in zenml/models/v2/core/artifact.py
class ArtifactRequest(BaseRequest):
"""Artifact request model."""
name: str = Field(
title="Name of the artifact.",
max_length=STR_FIELD_MAX_LENGTH,
)
has_custom_name: bool = Field(
title="Whether the name is custom (True) or auto-generated (False).",
default=False,
)
tags: Optional[List[str]] = Field(
title="Artifact tags.",
description="Should be a list of plain strings, e.g., ['tag1', 'tag2']",
default=None,
)
ArtifactResponse (BaseIdentifiedResponse[ArtifactResponseBody, ArtifactResponseMetadata, ArtifactResponseResources])
Artifact response model.
Source code in zenml/models/v2/core/artifact.py
class ArtifactResponse(
BaseIdentifiedResponse[
ArtifactResponseBody,
ArtifactResponseMetadata,
ArtifactResponseResources,
]
):
"""Artifact response model."""
def get_hydrated_version(self) -> "ArtifactResponse":
"""Get the hydrated version of this artifact.
Returns:
an instance of the same entity with the metadata field attached.
"""
from zenml.client import Client
return Client().zen_store.get_artifact(self.id)
name: str = Field(
title="Name of the output in the parent step.",
max_length=STR_FIELD_MAX_LENGTH,
)
# Body and metadata properties
@property
def tags(self) -> List[TagResponse]:
"""The `tags` property.
Returns:
the value of the property.
"""
return self.get_body().tags
@property
def latest_version_name(self) -> Optional[str]:
"""The `latest_version_name` property.
Returns:
the value of the property.
"""
return self.get_body().latest_version_name
@property
def latest_version_id(self) -> Optional[UUID]:
"""The `latest_version_id` property.
Returns:
the value of the property.
"""
return self.get_body().latest_version_id
@property
def has_custom_name(self) -> bool:
"""The `has_custom_name` property.
Returns:
the value of the property.
"""
return self.get_metadata().has_custom_name
# Helper methods
@property
def versions(self) -> Dict[str, "ArtifactVersionResponse"]:
"""Get a list of all versions of this artifact.
Returns:
A list of all versions of this artifact.
"""
from zenml.client import Client
responses = Client().list_artifact_versions(name=self.name)
return {str(response.version): response for response in responses}
has_custom_name: bool
property
readonly
The has_custom_name
property.
Returns:
Type | Description |
---|---|
bool |
the value of the property. |
latest_version_id: Optional[uuid.UUID]
property
readonly
The latest_version_id
property.
Returns:
Type | Description |
---|---|
Optional[uuid.UUID] |
the value of the property. |
latest_version_name: Optional[str]
property
readonly
The latest_version_name
property.
Returns:
Type | Description |
---|---|
Optional[str] |
the value of the property. |
tags: List[zenml.models.v2.core.tag.TagResponse]
property
readonly
The tags
property.
Returns:
Type | Description |
---|---|
List[zenml.models.v2.core.tag.TagResponse] |
the value of the property. |
versions: Dict[str, ArtifactVersionResponse]
property
readonly
Get a list of all versions of this artifact.
Returns:
Type | Description |
---|---|
Dict[str, ArtifactVersionResponse] |
A list of all versions of this artifact. |
get_hydrated_version(self)
Get the hydrated version of this artifact.
Returns:
Type | Description |
---|---|
ArtifactResponse |
an instance of the same entity with the metadata field attached. |
Source code in zenml/models/v2/core/artifact.py
def get_hydrated_version(self) -> "ArtifactResponse":
"""Get the hydrated version of this artifact.
Returns:
an instance of the same entity with the metadata field attached.
"""
from zenml.client import Client
return Client().zen_store.get_artifact(self.id)
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/core/artifact.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
ArtifactResponseBody (BaseDatedResponseBody)
Response body for artifacts.
Source code in zenml/models/v2/core/artifact.py
class ArtifactResponseBody(BaseDatedResponseBody):
"""Response body for artifacts."""
tags: List[TagResponse] = Field(
title="Tags associated with the model",
)
latest_version_name: Optional[str] = None
latest_version_id: Optional[UUID] = None
ArtifactResponseMetadata (BaseResponseMetadata)
Response metadata for artifacts.
Source code in zenml/models/v2/core/artifact.py
class ArtifactResponseMetadata(BaseResponseMetadata):
"""Response metadata for artifacts."""
has_custom_name: bool = Field(
title="Whether the name is custom (True) or auto-generated (False).",
default=False,
)
ArtifactResponseResources (BaseResponseResources)
Class for all resource models associated with the Artifact Entity.
Source code in zenml/models/v2/core/artifact.py
class ArtifactResponseResources(BaseResponseResources):
"""Class for all resource models associated with the Artifact Entity."""
ArtifactUpdate (BaseModel)
Artifact update model.
Source code in zenml/models/v2/core/artifact.py
class ArtifactUpdate(BaseModel):
"""Artifact update model."""
name: Optional[str] = None
add_tags: Optional[List[str]] = None
remove_tags: Optional[List[str]] = None
has_custom_name: Optional[bool] = None
artifact_version
Models representing artifact versions.
ArtifactVersionFilter (WorkspaceScopedTaggableFilter)
Model to enable advanced filtering of artifact versions.
Source code in zenml/models/v2/core/artifact_version.py
class ArtifactVersionFilter(WorkspaceScopedTaggableFilter):
"""Model to enable advanced filtering of artifact versions."""
# `name` and `only_unused` refer to properties related to other entities
# rather than a field in the db, hence they need to be handled
# explicitly
FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
*WorkspaceScopedTaggableFilter.FILTER_EXCLUDE_FIELDS,
"name",
"only_unused",
"has_custom_name",
]
artifact_id: Optional[Union[UUID, str]] = Field(
default=None,
description="ID of the artifact to which this version belongs.",
union_mode="left_to_right",
)
name: Optional[str] = Field(
default=None,
description="Name of the artifact to which this version belongs.",
)
version: Optional[str] = Field(
default=None,
description="Version of the artifact",
)
version_number: Optional[Union[int, str]] = Field(
default=None,
description="Version of the artifact if it is an integer",
union_mode="left_to_right",
)
uri: Optional[str] = Field(
default=None,
description="Uri of the artifact",
)
materializer: Optional[str] = Field(
default=None,
description="Materializer used to produce the artifact",
)
type: Optional[str] = Field(
default=None,
description="Type of the artifact",
)
data_type: Optional[str] = Field(
default=None,
description="Datatype of the artifact",
)
artifact_store_id: Optional[Union[UUID, str]] = Field(
default=None,
description="Artifact store for this artifact",
union_mode="left_to_right",
)
workspace_id: Optional[Union[UUID, str]] = Field(
default=None,
description="Workspace for this artifact",
union_mode="left_to_right",
)
user_id: Optional[Union[UUID, str]] = Field(
default=None,
description="User that produced this artifact",
union_mode="left_to_right",
)
only_unused: Optional[bool] = Field(
default=False, description="Filter only for unused artifacts"
)
has_custom_name: Optional[bool] = Field(
default=None,
description="Filter only artifacts with/without custom names.",
)
def get_custom_filters(self) -> List[Union["ColumnElement[bool]"]]:
"""Get custom filters.
Returns:
A list of custom filters.
"""
custom_filters = super().get_custom_filters()
from sqlmodel import and_, select
from zenml.zen_stores.schemas.artifact_schemas import (
ArtifactSchema,
ArtifactVersionSchema,
)
from zenml.zen_stores.schemas.step_run_schemas import (
StepRunInputArtifactSchema,
StepRunOutputArtifactSchema,
)
if self.name:
value, filter_operator = self._resolve_operator(self.name)
filter_ = StrFilter(
operation=GenericFilterOps(filter_operator),
column="name",
value=value,
)
artifact_name_filter = and_(
ArtifactVersionSchema.artifact_id == ArtifactSchema.id,
filter_.generate_query_conditions(ArtifactSchema),
)
custom_filters.append(artifact_name_filter)
if self.only_unused:
unused_filter = and_(
ArtifactVersionSchema.id.notin_( # type: ignore[attr-defined]
select(StepRunOutputArtifactSchema.artifact_id)
),
ArtifactVersionSchema.id.notin_( # type: ignore[attr-defined]
select(StepRunInputArtifactSchema.artifact_id)
),
)
custom_filters.append(unused_filter)
if self.has_custom_name is not None:
custom_name_filter = and_(
ArtifactVersionSchema.artifact_id == ArtifactSchema.id,
ArtifactSchema.has_custom_name == self.has_custom_name,
)
custom_filters.append(custom_name_filter)
return custom_filters
get_custom_filters(self)
Get custom filters.
Returns:
Type | Description |
---|---|
List[ColumnElement[bool]] |
A list of custom filters. |
Source code in zenml/models/v2/core/artifact_version.py
def get_custom_filters(self) -> List[Union["ColumnElement[bool]"]]:
"""Get custom filters.
Returns:
A list of custom filters.
"""
custom_filters = super().get_custom_filters()
from sqlmodel import and_, select
from zenml.zen_stores.schemas.artifact_schemas import (
ArtifactSchema,
ArtifactVersionSchema,
)
from zenml.zen_stores.schemas.step_run_schemas import (
StepRunInputArtifactSchema,
StepRunOutputArtifactSchema,
)
if self.name:
value, filter_operator = self._resolve_operator(self.name)
filter_ = StrFilter(
operation=GenericFilterOps(filter_operator),
column="name",
value=value,
)
artifact_name_filter = and_(
ArtifactVersionSchema.artifact_id == ArtifactSchema.id,
filter_.generate_query_conditions(ArtifactSchema),
)
custom_filters.append(artifact_name_filter)
if self.only_unused:
unused_filter = and_(
ArtifactVersionSchema.id.notin_( # type: ignore[attr-defined]
select(StepRunOutputArtifactSchema.artifact_id)
),
ArtifactVersionSchema.id.notin_( # type: ignore[attr-defined]
select(StepRunInputArtifactSchema.artifact_id)
),
)
custom_filters.append(unused_filter)
if self.has_custom_name is not None:
custom_name_filter = and_(
ArtifactVersionSchema.artifact_id == ArtifactSchema.id,
ArtifactSchema.has_custom_name == self.has_custom_name,
)
custom_filters.append(custom_name_filter)
return custom_filters
model_post_init(self, _ModelMetaclass__context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/models/v2/core/artifact_version.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, __context)
original_model_post_init(self, __context)
ArtifactVersionRequest (WorkspaceScopedRequest)
Request model for artifact versions.
Source code in zenml/models/v2/core/artifact_version.py
class ArtifactVersionRequest(WorkspaceScopedRequest):
"""Request model for artifact versions."""
artifact_id: UUID = Field(
title="ID of the artifact to which this version belongs.",
)
version: Union[str, int] = Field(
title="Version of the artifact.", union_mode="left_to_right"
)
has_custom_name: bool = Field(
title="Whether the name is custom (True) or auto-generated (False).",
default=False,
)
type: ArtifactType = Field(title="Type of the artifact.")
artifact_store_id: Optional[UUID] = Field(
title="ID of the artifact store in which this artifact is stored.",
default=None,
)
uri: str = Field(
title="URI of the artifact.", max_length=TEXT_FIELD_MAX_LENGTH
)
materializer: SourceWithValidator = Field(
title="Materializer class to use for this artifact.",
)
data_type: SourceWithValidator = Field(
title="Data type of the artifact.",
)
tags: Optional[List[str]] = Field(
title="Tags of the artifact.",
description="Should be a list of plain strings, e.g., ['tag1', 'tag2']",
default=None,
)
visualizations: Optional[List["ArtifactVisualizationRequest"]] = Field(
default=None, title="Visualizations of the artifact."
)
@field_validator("version")
@classmethod
def str_field_max_length_check(cls, value: Any) -> Any:
"""Checks if the length of the value exceeds the maximum str length.
Args:
value: the value set in the field
Returns:
the value itself.
Raises:
AssertionError: if the length of the field is longer than the
maximum threshold.
"""
assert len(str(value)) < STR_FIELD_MAX_LENGTH, (
"The length of the value for this field can not "
f"exceed {STR_FIELD_MAX_LENGTH}"
)
return value
str_field_max_length_check(value)
classmethod
Checks if the length of the value exceeds the maximum str length.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
the value set in the field |
required |
Returns:
Type | Description |
---|---|
Any |
the value itself. |
Exceptions:
Type | Description |
---|---|
AssertionError |
if the length of the field is longer than the maximum threshold. |
Source code in zenml/models/v2/core/artifact_version.py
@field_validator("version")
@classmethod
def str_field_max_length_check(cls, value: Any) -> Any:
"""Checks if the length of the value exceeds the maximum str length.
Args:
value: the value set in the field
Returns:
the value itself.
Raises:
AssertionError: if the length of the field is longer than the
maximum threshold.
"""
assert len(str(value)) < STR_FIELD_MAX_LENGTH, (
"The length of the value for this field can not "
f"exceed {STR_FIELD_MAX_LENGTH}"
)
return value
ArtifactVersionResponse (WorkspaceScopedResponse[ArtifactVersionResponseBody, ArtifactVersionResponseMetadata, ArtifactVersionResponseResources])
Response model for artifact versions.
Source code in zenml/models/v2/core/artifact_version.py
class ArtifactVersionResponse(
WorkspaceScopedResponse[
ArtifactVersionResponseBody,
ArtifactVersionResponseMetadata,
ArtifactVersionResponseResources,
]
):
"""Response model for artifact versions."""
def get_hydrated_version(self) -> "ArtifactVersionResponse":
"""Get the hydrated version of this artifact version.
Returns:
an instance of the same entity with the metadata field attached.
"""
from zenml.client import Client
return Client().zen_store.get_artifact_version(self.id)
# Body and metadata properties
@property
def artifact(self) -> "ArtifactResponse":
"""The `artifact` property.
Returns:
the value of the property.
"""
return self.get_body().artifact
@property
def version(self) -> Union[str, int]:
"""The `version` property.
Returns:
the value of the property.
"""
return self.get_body().version
@property
def uri(self) -> str:
"""The `uri` property.
Returns:
the value of the property.
"""
return self.get_body().uri
@property
def type(self) -> ArtifactType:
"""The `type` property.
Returns:
the value of the property.
"""
return self.get_body().type
@property
def tags(self) -> List[TagResponse]:
"""The `tags` property.
Returns:
the value of the property.
"""
return self.get_body().tags
@property
def producer_pipeline_run_id(self) -> Optional[UUID]:
"""The `producer_pipeline_run_id` property.
Returns:
the value of the property.
"""
return self.get_body().producer_pipeline_run_id
@property
def artifact_store_id(self) -> Optional[UUID]:
"""The `artifact_store_id` property.
Returns:
the value of the property.
"""
return self.get_metadata().artifact_store_id
@property
def producer_step_run_id(self) -> Optional[UUID]:
"""The `producer_step_run_id` property.
Returns:
the value of the property.
"""
return self.get_metadata().producer_step_run_id
@property
def visualizations(
self,
) -> Optional[List["ArtifactVisualizationResponse"]]:
"""The `visualizations` property.
Returns:
the value of the property.
"""
return self.get_metadata().visualizations
@property
def run_metadata(self) -> Dict[str, "RunMetadataResponse"]:
"""The `metadata` property.
Returns:
the value of the property.
"""
return self.get_metadata().run_metadata
@property
def materializer(self) -> Source:
"""The `materializer` property.
Returns:
the value of the property.
"""
return self.get_body().materializer
@property
def data_type(self) -> Source:
"""The `data_type` property.
Returns:
the value of the property.
"""
return self.get_body().data_type
# Helper methods
@property
def name(self) -> str:
"""The `name` property.
Returns:
the value of the property.
"""
return self.artifact.name
@property
def step(self) -> "StepRunResponse":
"""Get the step that produced this artifact.
Returns:
The step that produced this artifact.
"""
from zenml.artifacts.utils import get_producer_step_of_artifact
return get_producer_step_of_artifact(self)
@property
def run(self) -> "PipelineRunResponse":
"""Get the pipeline run that produced this artifact.
Returns:
The pipeline run that produced this artifact.
"""
from zenml.client import Client
return Client().get_pipeline_run(self.step.pipeline_run_id)
def load(self) -> Any:
"""Materializes (loads) the data stored in this artifact.
Returns:
The materialized data.
"""
from zenml.artifacts.utils import load_artifact_from_response
return load_artifact_from_response(self)
def download_files(self, path: str, overwrite: bool = False) -> None:
"""Downloads data for an artifact with no materializing.
Any artifacts will be saved as a zip file to the given path.
Args:
path: The path to save the binary data to.
overwrite: Whether to overwrite the file if it already exists.
Raises:
ValueError: If the path does not end with '.zip'.
"""
if not path.endswith(".zip"):
raise ValueError(
"The path should end with '.zip' to save the binary data."
)
from zenml.artifacts.utils import (
download_artifact_files_from_response,
)
download_artifact_files_from_response(
self,
path=path,
overwrite=overwrite,
)
def read(self) -> Any:
"""(Deprecated) Materializes (loads) the data stored in this artifact.
Returns:
The materialized data.
"""
logger.warning(
"`artifact.read()` is deprecated and will be removed in a future "
"release. Please use `artifact.load()` instead."
)
return self.load()
def visualize(self, title: Optional[str] = None) -> None:
"""Visualize the artifact in notebook environments.
Args:
title: Optional title to show before the visualizations.
"""
from zenml.utils.visualization_utils import visualize_artifact
visualize_artifact(self, title=title)
artifact: ArtifactResponse
property
readonly
The artifact
property.
Returns:
Type | Description |
---|---|
ArtifactResponse |
the value of the property. |
artifact_store_id: Optional[uuid.UUID]
property
readonly
The artifact_store_id
property.
Returns:
Type | Description |
---|---|
Optional[uuid.UUID] |
the value of the property. |
data_type: Source
property
readonly
The data_type
property.
Returns:
Type | Description |
---|---|
Source |
the value of the property. |
materializer: Source
property
readonly
The materializer
property.
Returns:
Type | Description |
---|---|
Source |
the value of the property. |
name: str
property
readonly
The name
property.
Returns:
Type | Description |
---|---|
str |
the value of the property. |
producer_pipeline_run_id: Optional[uuid.UUID]
property
readonly
The producer_pipeline_run_id
property.
Returns:
Type | Description |
---|---|
Optional[uuid.UUID] |
the value of the property. |
producer_step_run_id: Optional[uuid.UUID]
property
readonly
The producer_step_run_id
property.
Returns:
Type | Description |
---|---|
Optional[uuid.UUID] |
the value of the property. |
run: PipelineRunResponse
property
readonly
Get the pipeline run that produced this artifact.
Returns:
Type | Description |
---|---|
PipelineRunResponse |
The pipeline run that produced this artifact. |
run_metadata: Dict[str, RunMetadataResponse]
property
readonly
The metadata
property.
Returns:
Type | Description |
---|---|
Dict[str, RunMetadataResponse] |
the value of the property. |
step: StepRunResponse
property
readonly
Get the step that produced this artifact.
Returns:
Type | Description |
---|---|
StepRunResponse |
The step that produced this artifact. |
tags: List[zenml.models.v2.core.tag.TagResponse]
property
readonly
The tags
property.
Returns:
Type | Description |
---|---|
List[zenml.models.v2.core.tag.TagResponse] |
the value of the property. |
type: ArtifactType
property
readonly
The type
property.
Returns:
Type | Description |
---|---|
ArtifactType |
the value of the property. |
uri: str
property
readonly
The uri
property.
Returns:
Type | Description |
---|---|
str |
the value of the property. |
version: Union[str, int]
property
readonly
The version
property.
Returns:
Type | Description |
---|---|
Union[str, int] |
the value of the property. |
visualizations: Optional[List[ArtifactVisualizationResponse]]
property
readonly
The visualizations
property.
Returns:
Type | Description |
---|---|
Optional[List[ArtifactVisualizationResponse]] |
the value of the property. |
download_files(self,