Skip to content

Models

zenml.models special

Pydantic models for the various concepts in ZenML.

v2 special

base special

base

Base model definitions.

BaseDatedResponseBody (BaseResponseBody)

Base body model for entities that track a creation and update timestamp.

Used as a base class for all body models associated with responses. Features a creation and update timestamp.

Source code in zenml/models/v2/base/base.py
class BaseDatedResponseBody(BaseResponseBody):
    """Base body model for entities that track a creation and update timestamp.

    Used as a base class for all body models associated with responses.
    Features a creation and update timestamp.
    """

    created: datetime = Field(
        title="The timestamp when this resource was created."
    )
    updated: datetime = Field(
        title="The timestamp when this resource was last updated."
    )
BaseIdentifiedResponse (BaseResponse[~AnyDatedBody, ~AnyMetadata, ~AnyResources], Generic)

Base domain model for resources with DB representation.

Source code in zenml/models/v2/base/base.py
class BaseIdentifiedResponse(
    BaseResponse[AnyDatedBody, AnyMetadata, AnyResources],
    Generic[AnyDatedBody, AnyMetadata, AnyResources],
):
    """Base domain model for resources with DB representation."""

    id: UUID = Field(title="The unique resource id.")

    permission_denied: bool = False

    # Helper functions
    def __hash__(self) -> int:
        """Implementation of hash magic method.

        Returns:
            Hash of the UUID.
        """
        return hash((type(self),) + tuple([self.id]))

    def __eq__(self, other: Any) -> bool:
        """Implementation of equality magic method.

        Args:
            other: The other object to compare to.

        Returns:
            True if the other object is of the same type and has the same UUID.
        """
        if isinstance(other, type(self)):
            return self.id == other.id
        else:
            return False

    def _validate_hydrated_version(
        self,
        hydrated_model: "BaseResponse[AnyDatedBody, AnyMetadata, AnyResources]",
    ) -> None:
        """Helper method to validate the values within the hydrated version.

        Args:
            hydrated_model: the hydrated version of the model.

        Raises:
            HydrationError: if the hydrated version has different values set
                for either the name of the body fields and the
                _method_body_mutation is set to ResponseBodyUpdate.DENY.
        """
        super()._validate_hydrated_version(hydrated_model)

        assert isinstance(hydrated_model, type(self))

        # Check if the ID is the same
        if self.id != hydrated_model.id:
            raise HydrationError(
                "The hydrated version of the model does not have the same id."
            )

    def get_hydrated_version(
        self,
    ) -> "BaseIdentifiedResponse[AnyDatedBody, AnyMetadata, AnyResources]":
        """Abstract method to fetch the hydrated version of the model.

        Raises:
            NotImplementedError: in case the method is not implemented.
        """
        raise NotImplementedError(
            "Please implement a `get_hydrated_version` method before "
            "using/hydrating the model."
        )

    def get_body(self) -> "AnyDatedBody":
        """Fetch the body of the entity.

        Returns:
            The body field of the response.

        Raises:
            IllegalOperationError: If the user lacks permission to access the
                entity represented by this response.
        """
        if self.permission_denied:
            raise IllegalOperationError(
                f"Missing permissions to access {type(self).__name__} with "
                f"ID {self.id}."
            )

        return super().get_body()

    def get_metadata(self) -> "AnyMetadata":
        """Fetch the metadata of the entity.

        Returns:
            The metadata field of the response.

        Raises:
            IllegalOperationError: If the user lacks permission to access this
                entity represented by this response.
        """
        if self.permission_denied:
            raise IllegalOperationError(
                f"Missing permissions to access {type(self).__name__} with "
                f"ID {self.id}."
            )

        return super().get_metadata()

    # Analytics
    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Fetches the analytics metadata for base response models.

        Returns:
            The analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        metadata["entity_id"] = self.id
        return metadata

    # Body and metadata properties
    @property
    def created(self) -> datetime:
        """The `created` property.

        Returns:
            the value of the property.
        """
        return self.get_body().created

    @property
    def updated(self) -> datetime:
        """The `updated` property.

        Returns:
            the value of the property.
        """
        return self.get_body().updated
created: datetime property readonly

The created property.

Returns:

Type Description
datetime

the value of the property.

updated: datetime property readonly

The updated property.

Returns:

Type Description
datetime

the value of the property.

__eq__(self, other) special

Implementation of equality magic method.

Parameters:

Name Type Description Default
other Any

The other object to compare to.

required

Returns:

Type Description
bool

True if the other object is of the same type and has the same UUID.

Source code in zenml/models/v2/base/base.py
def __eq__(self, other: Any) -> bool:
    """Implementation of equality magic method.

    Args:
        other: The other object to compare to.

    Returns:
        True if the other object is of the same type and has the same UUID.
    """
    if isinstance(other, type(self)):
        return self.id == other.id
    else:
        return False
__hash__(self) special

Implementation of hash magic method.

Returns:

Type Description
int

Hash of the UUID.

Source code in zenml/models/v2/base/base.py
def __hash__(self) -> int:
    """Implementation of hash magic method.

    Returns:
        Hash of the UUID.
    """
    return hash((type(self),) + tuple([self.id]))
get_analytics_metadata(self)

Fetches the analytics metadata for base response models.

Returns:

Type Description
Dict[str, Any]

The analytics metadata.

Source code in zenml/models/v2/base/base.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Fetches the analytics metadata for base response models.

    Returns:
        The analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    metadata["entity_id"] = self.id
    return metadata
get_body(self)

Fetch the body of the entity.

Returns:

Type Description
AnyDatedBody

The body field of the response.

Exceptions:

Type Description
IllegalOperationError

If the user lacks permission to access the entity represented by this response.

Source code in zenml/models/v2/base/base.py
def get_body(self) -> "AnyDatedBody":
    """Fetch the body of the entity.

    Returns:
        The body field of the response.

    Raises:
        IllegalOperationError: If the user lacks permission to access the
            entity represented by this response.
    """
    if self.permission_denied:
        raise IllegalOperationError(
            f"Missing permissions to access {type(self).__name__} with "
            f"ID {self.id}."
        )

    return super().get_body()
get_hydrated_version(self)

Abstract method to fetch the hydrated version of the model.

Exceptions:

Type Description
NotImplementedError

in case the method is not implemented.

Source code in zenml/models/v2/base/base.py
def get_hydrated_version(
    self,
) -> "BaseIdentifiedResponse[AnyDatedBody, AnyMetadata, AnyResources]":
    """Abstract method to fetch the hydrated version of the model.

    Raises:
        NotImplementedError: in case the method is not implemented.
    """
    raise NotImplementedError(
        "Please implement a `get_hydrated_version` method before "
        "using/hydrating the model."
    )
get_metadata(self)

Fetch the metadata of the entity.

Returns:

Type Description
AnyMetadata

The metadata field of the response.

Exceptions:

Type Description
IllegalOperationError

If the user lacks permission to access this entity represented by this response.

Source code in zenml/models/v2/base/base.py
def get_metadata(self) -> "AnyMetadata":
    """Fetch the metadata of the entity.

    Returns:
        The metadata field of the response.

    Raises:
        IllegalOperationError: If the user lacks permission to access this
            entity represented by this response.
    """
    if self.permission_denied:
        raise IllegalOperationError(
            f"Missing permissions to access {type(self).__name__} with "
            f"ID {self.id}."
        )

    return super().get_metadata()
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
BaseIdentifiedResponse[APIKeyResponseBody, APIKeyResponseMetadata, APIKeyResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
BaseIdentifiedResponse[ArtifactResponseBody, ArtifactResponseMetadata, ArtifactResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
BaseIdentifiedResponse[ArtifactVisualizationResponseBody, ArtifactVisualizationResponseMetadata, ArtifactVisualizationResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
BaseIdentifiedResponse[CodeReferenceResponseBody, CodeReferenceResponseMetadata, CodeReferenceResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
BaseIdentifiedResponse[LogsResponseBody, LogsResponseMetadata, LogsResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
BaseIdentifiedResponse[ModelVersionArtifactResponseBody, BaseResponseMetadata, ModelVersionArtifactResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
BaseIdentifiedResponse[ModelVersionPipelineRunResponseBody, BaseResponseMetadata, ModelVersionPipelineRunResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
BaseIdentifiedResponse[ServiceAccountResponseBody, ServiceAccountResponseMetadata, ServiceAccountResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
BaseIdentifiedResponse[TagResourceResponseBody, BaseResponseMetadata, TagResourceResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
BaseIdentifiedResponse[TagResponseBody, BaseResponseMetadata, TagResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
BaseIdentifiedResponse[TriggerExecutionResponseBody, TriggerExecutionResponseMetadata, TriggerExecutionResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
BaseIdentifiedResponse[UserResponseBody, UserResponseMetadata, UserResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
BaseIdentifiedResponse[WorkspaceResponseBody, WorkspaceResponseMetadata, WorkspaceResponseResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
BaseIdentifiedResponse[~UserBody, ~UserMetadata, ~UserResources] (BaseIdentifiedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
BaseRequest (BaseZenModel)

Base request model.

Used as a base class for all request models.

Source code in zenml/models/v2/base/base.py
class BaseRequest(BaseZenModel):
    """Base request model.

    Used as a base class for all request models.
    """
BaseResponse (BaseZenModel, Generic)

Base domain model for all responses.

Source code in zenml/models/v2/base/base.py
class BaseResponse(BaseZenModel, Generic[AnyBody, AnyMetadata, AnyResources]):
    """Base domain model for all responses."""

    # Body and metadata pair
    body: Optional["AnyBody"] = Field(
        default=None, title="The body of the resource."
    )
    metadata: Optional["AnyMetadata"] = Field(
        default=None, title="The metadata related to this resource."
    )
    resources: Optional["AnyResources"] = Field(
        default=None, title="The resources related to this resource."
    )

    _response_update_strategy: ResponseUpdateStrategy = (
        ResponseUpdateStrategy.ALLOW
    )
    _warn_on_response_updates: bool = True

    def _validate_hydrated_version(
        self,
        hydrated_model: "BaseResponse[AnyBody, AnyMetadata, AnyResources]",
    ) -> None:
        """Helper method to validate the values within the hydrated version.

        Args:
            hydrated_model: the hydrated version of the model.

        Raises:
            HydrationError: if the hydrated version has different values set
                for either the name of the body fields and the
                _method_body_mutation is set to ResponseBodyUpdate.DENY.
        """
        # Check whether the metadata exists in the hydrated version
        if hydrated_model.metadata is None:
            raise HydrationError(
                "The hydrated model does not have a metadata field."
            )

        # Check if the name has changed
        if "name" in self.model_fields:
            original_name = getattr(self, "name")
            hydrated_name = getattr(hydrated_model, "name")

            if original_name != hydrated_name:
                if (
                    self._response_update_strategy
                    == ResponseUpdateStrategy.ALLOW
                ):
                    setattr(self, "name", hydrated_name)

                    if self._warn_on_response_updates:
                        logger.warning(
                            f"The name of the entity has changed from "
                            f"`{original_name}` to `{hydrated_name}`."
                        )

                elif (
                    self._response_update_strategy
                    == ResponseUpdateStrategy.IGNORE
                ):
                    if self._warn_on_response_updates:
                        logger.warning(
                            f"Ignoring the name change in the hydrated version "
                            f"of the response: `{original_name}` to "
                            f"`{hydrated_name}`."
                        )
                elif (
                    self._response_update_strategy
                    == ResponseUpdateStrategy.DENY
                ):
                    raise HydrationError(
                        f"Failing the hydration, because there is a change in "
                        f"the name of the entity: `{original_name}` to "
                        f"`{hydrated_name}`."
                    )

        # Check all the fields in the body
        for field in self.get_body().model_fields:
            original_value = getattr(self.get_body(), field)
            hydrated_value = getattr(hydrated_model.get_body(), field)

            if original_value != hydrated_value:
                if (
                    self._response_update_strategy
                    == ResponseUpdateStrategy.ALLOW
                ):
                    setattr(self.get_body(), field, hydrated_value)

                    if self._warn_on_response_updates:
                        logger.warning(
                            f"The field `{field}` in the body of the response "
                            f"has changed from `{original_value}` to "
                            f"`{hydrated_value}`."
                        )

                elif (
                    self._response_update_strategy
                    == ResponseUpdateStrategy.IGNORE
                ):
                    if self._warn_on_response_updates:
                        logger.warning(
                            f"Ignoring the change in the hydrated version of "
                            f"the field `{field}`: `{original_value}` -> "
                            f"`{hydrated_value}`."
                        )
                elif (
                    self._response_update_strategy
                    == ResponseUpdateStrategy.DENY
                ):
                    raise HydrationError(
                        f"Failing the hydration, because there is a change in "
                        f"the field `{field}`: `{original_value}` -> "
                        f"`{hydrated_value}`"
                    )

    def get_hydrated_version(
        self,
    ) -> "BaseResponse[AnyBody, AnyMetadata, AnyResources]":
        """Abstract method to fetch the hydrated version of the model.

        Raises:
            NotImplementedError: in case the method is not implemented.
        """
        raise NotImplementedError(
            "Please implement a `get_hydrated_version` method before "
            "using/hydrating the model."
        )

    def get_body(self) -> "AnyBody":
        """Fetch the body of the entity.

        Returns:
            The body field of the response.

        Raises:
            RuntimeError: If the body was not included in the response.
        """
        if not self.body:
            raise RuntimeError(
                f"Missing response body for {type(self).__name__}."
            )

        return self.body

    def get_metadata(self) -> "AnyMetadata":
        """Fetch the metadata of the entity.

        Returns:
            The metadata field of the response.
        """
        if self.metadata is None:
            # If the metadata is not there, check the class first.
            metadata_annotation = self.model_fields["metadata"].annotation
            assert metadata_annotation is not None, (
                "For each response model, an annotated metadata"
                "field should exist."
            )

            # metadata is defined as:
            #   metadata: Optional[....ResponseMetadata] = Field(default=None)
            # We need to find the actual class inside the Optional annotation.
            from zenml.utils.typing_utils import get_args

            metadata_type = get_args(metadata_annotation)[0]
            assert issubclass(metadata_type, BaseResponseMetadata)

            if len(metadata_type.model_fields):
                # If the metadata class defines any fields, fetch the metadata
                # through the hydrated version.
                hydrated_version = self.get_hydrated_version()
                self._validate_hydrated_version(hydrated_version)
                self.metadata = hydrated_version.metadata
            else:
                # Otherwise, use the metadata class to create an empty metadata
                # object.
                self.metadata = metadata_type()

        assert self.metadata is not None

        return self.metadata

    def get_resources(self) -> "AnyResources":
        """Fetch the resources related to this entity.

        Returns:
            The resources field of the response.

        Raises:
            RuntimeError: If the resources field was not included in the response.
        """
        if self.resources is None:
            # If the resources are not there, check the class first.
            resources_annotation = self.model_fields["resources"].annotation
            assert resources_annotation is not None, (
                "For each response model, an annotated resources"
                "field should exist."
            )

            # metadata is defined as:
            #   metadata: Optional[....ResponseMetadata] = Field(default=None)
            # We need to find the actual class inside the Optional annotation.
            from zenml.utils.typing_utils import get_args

            resources_type = get_args(resources_annotation)[0]
            assert issubclass(resources_type, BaseResponseResources)

            if len(resources_type.model_fields):
                # If the resources class defines any fields, fetch the resources
                # through the hydrated version.
                hydrated_version = self.get_hydrated_version()
                self._validate_hydrated_version(hydrated_version)
                self.resources = hydrated_version.resources
            else:
                # Otherwise, use the resources class to create an empty
                # resources object.
                self.resources = resources_type()

        if self.resources is None:
            raise RuntimeError(
                f"Missing response resources for {type(self).__name__}."
            )

        return self.resources
get_body(self)

Fetch the body of the entity.

Returns:

Type Description
AnyBody

The body field of the response.

Exceptions:

Type Description
RuntimeError

If the body was not included in the response.

Source code in zenml/models/v2/base/base.py
def get_body(self) -> "AnyBody":
    """Fetch the body of the entity.

    Returns:
        The body field of the response.

    Raises:
        RuntimeError: If the body was not included in the response.
    """
    if not self.body:
        raise RuntimeError(
            f"Missing response body for {type(self).__name__}."
        )

    return self.body
get_hydrated_version(self)

Abstract method to fetch the hydrated version of the model.

Exceptions:

Type Description
NotImplementedError

in case the method is not implemented.

Source code in zenml/models/v2/base/base.py
def get_hydrated_version(
    self,
) -> "BaseResponse[AnyBody, AnyMetadata, AnyResources]":
    """Abstract method to fetch the hydrated version of the model.

    Raises:
        NotImplementedError: in case the method is not implemented.
    """
    raise NotImplementedError(
        "Please implement a `get_hydrated_version` method before "
        "using/hydrating the model."
    )
get_metadata(self)

Fetch the metadata of the entity.

Returns:

Type Description
AnyMetadata

The metadata field of the response.

Source code in zenml/models/v2/base/base.py
def get_metadata(self) -> "AnyMetadata":
    """Fetch the metadata of the entity.

    Returns:
        The metadata field of the response.
    """
    if self.metadata is None:
        # If the metadata is not there, check the class first.
        metadata_annotation = self.model_fields["metadata"].annotation
        assert metadata_annotation is not None, (
            "For each response model, an annotated metadata"
            "field should exist."
        )

        # metadata is defined as:
        #   metadata: Optional[....ResponseMetadata] = Field(default=None)
        # We need to find the actual class inside the Optional annotation.
        from zenml.utils.typing_utils import get_args

        metadata_type = get_args(metadata_annotation)[0]
        assert issubclass(metadata_type, BaseResponseMetadata)

        if len(metadata_type.model_fields):
            # If the metadata class defines any fields, fetch the metadata
            # through the hydrated version.
            hydrated_version = self.get_hydrated_version()
            self._validate_hydrated_version(hydrated_version)
            self.metadata = hydrated_version.metadata
        else:
            # Otherwise, use the metadata class to create an empty metadata
            # object.
            self.metadata = metadata_type()

    assert self.metadata is not None

    return self.metadata
get_resources(self)

Fetch the resources related to this entity.

Returns:

Type Description
AnyResources

The resources field of the response.

Exceptions:

Type Description
RuntimeError

If the resources field was not included in the response.

Source code in zenml/models/v2/base/base.py
def get_resources(self) -> "AnyResources":
    """Fetch the resources related to this entity.

    Returns:
        The resources field of the response.

    Raises:
        RuntimeError: If the resources field was not included in the response.
    """
    if self.resources is None:
        # If the resources are not there, check the class first.
        resources_annotation = self.model_fields["resources"].annotation
        assert resources_annotation is not None, (
            "For each response model, an annotated resources"
            "field should exist."
        )

        # metadata is defined as:
        #   metadata: Optional[....ResponseMetadata] = Field(default=None)
        # We need to find the actual class inside the Optional annotation.
        from zenml.utils.typing_utils import get_args

        resources_type = get_args(resources_annotation)[0]
        assert issubclass(resources_type, BaseResponseResources)

        if len(resources_type.model_fields):
            # If the resources class defines any fields, fetch the resources
            # through the hydrated version.
            hydrated_version = self.get_hydrated_version()
            self._validate_hydrated_version(hydrated_version)
            self.resources = hydrated_version.resources
        else:
            # Otherwise, use the resources class to create an empty
            # resources object.
            self.resources = resources_type()

    if self.resources is None:
        raise RuntimeError(
            f"Missing response resources for {type(self).__name__}."
        )

    return self.resources
model_post_init(self, __context)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Parameters:

Name Type Description Default
self BaseModel

The BaseModel instance.

required
__context Any

The context.

required
Source code in zenml/models/v2/base/base.py
def init_private_attributes(self: BaseModel, __context: Any) -> None:
    """This function is meant to behave like a BaseModel method to initialise private attributes.

    It takes context as an argument since that's what pydantic-core passes when calling it.

    Args:
        self: The BaseModel instance.
        __context: The context.
    """
    if getattr(self, '__pydantic_private__', None) is None:
        pydantic_private = {}
        for name, private_attr in self.__private_attributes__.items():
            default = private_attr.get_default()
            if default is not PydanticUndefined:
                pydantic_private[name] = default
        object_setattr(self, '__pydantic_private__', pydantic_private)
BaseResponseBody (BaseZenModel)

Base body model.

Source code in zenml/models/v2/base/base.py
class BaseResponseBody(BaseZenModel):
    """Base body model."""
BaseResponseMetadata (BaseZenModel)

Base metadata model.

Used as a base class for all metadata models associated with responses.

Source code in zenml/models/v2/base/base.py
class BaseResponseMetadata(BaseZenModel):
    """Base metadata model.

    Used as a base class for all metadata models associated with responses.
    """
BaseResponseResources (BaseZenModel)

Base resources model.

Used as a base class for all resource models associated with responses.

Source code in zenml/models/v2/base/base.py
class BaseResponseResources(BaseZenModel):
    """Base resources model.

    Used as a base class for all resource models associated with responses.
    """

    model_config = ConfigDict(extra="allow")
BaseResponse[ServerSettingsResponseBody, ServerSettingsResponseMetadata, ServerSettingsResponseResources] (BaseResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
BaseResponse[~AnyDatedBody, ~AnyMetadata, ~AnyResources] (BaseResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
BaseResponse[~AnyPluginBody, ~AnyPluginMetadata, ~AnyPluginResources] (BaseResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/base.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
BaseUpdate (BaseZenModel)

Base update model.

Used as a base class for all update models.

Source code in zenml/models/v2/base/base.py
class BaseUpdate(BaseZenModel):
    """Base update model.

    Used as a base class for all update models.
    """

    model_config = ConfigDict(
        # Ignore extras on all update models.
        extra="ignore",
    )
BaseZenModel (YAMLSerializationMixin, AnalyticsTrackedModelMixin)

Base model class for all ZenML models.

This class is used as a base class for all ZenML models. It provides functionality for tracking analytics events.

Source code in zenml/models/v2/base/base.py
class BaseZenModel(YAMLSerializationMixin, AnalyticsTrackedModelMixin):
    """Base model class for all ZenML models.

    This class is used as a base class for all ZenML models. It provides
    functionality for tracking analytics events.
    """

    model_config = ConfigDict(
        # Ignore extras on all models to support forwards and backwards
        # compatibility (e.g. new fields in newer versions of ZenML servers
        # are allowed to be passed to older versions of ZenML clients and
        # vice versa but will be ignored).
        extra="ignore",
    )
base_plugin_flavor

Plugin flavor model definitions.

BasePluginFlavorResponse (BaseResponse[~AnyPluginBody, ~AnyPluginMetadata, ~AnyPluginResources], Generic)

Base response for all Plugin Flavors.

Source code in zenml/models/v2/base/base_plugin_flavor.py
class BasePluginFlavorResponse(
    BaseResponse[AnyPluginBody, AnyPluginMetadata, AnyPluginResources],
    Generic[AnyPluginBody, AnyPluginMetadata, AnyPluginResources],
):
    """Base response for all Plugin Flavors."""

    name: str = Field(title="Name of the flavor.")
    type: PluginType = Field(title="Type of the plugin.")
    subtype: PluginSubType = Field(title="Subtype of the plugin.")
    model_config = ConfigDict(extra="ignore")

    def get_hydrated_version(
        self,
    ) -> "BasePluginFlavorResponse[AnyPluginBody, AnyPluginMetadata, AnyPluginResources]":
        """Abstract method to fetch the hydrated version of the model.

        Returns:
            Hydrated version of the PluginFlavorResponse
        """
        # TODO: shouldn't this call the Zen store ? The client should not have
        #  to know about the plugin flavor registry
        from zenml.zen_server.utils import plugin_flavor_registry

        plugin_flavor = plugin_flavor_registry().get_flavor_class(
            name=self.name, _type=self.type, subtype=self.subtype
        )
        return plugin_flavor.get_flavor_response_model(hydrate=True)
get_hydrated_version(self)

Abstract method to fetch the hydrated version of the model.

Returns:

Type Description
BasePluginFlavorResponse[AnyPluginBody, AnyPluginMetadata, AnyPluginResources]

Hydrated version of the PluginFlavorResponse

Source code in zenml/models/v2/base/base_plugin_flavor.py
def get_hydrated_version(
    self,
) -> "BasePluginFlavorResponse[AnyPluginBody, AnyPluginMetadata, AnyPluginResources]":
    """Abstract method to fetch the hydrated version of the model.

    Returns:
        Hydrated version of the PluginFlavorResponse
    """
    # TODO: shouldn't this call the Zen store ? The client should not have
    #  to know about the plugin flavor registry
    from zenml.zen_server.utils import plugin_flavor_registry

    plugin_flavor = plugin_flavor_registry().get_flavor_class(
        name=self.name, _type=self.type, subtype=self.subtype
    )
    return plugin_flavor.get_flavor_response_model(hydrate=True)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/base_plugin_flavor.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
BasePluginFlavorResponse[ActionFlavorResponseBody, ActionFlavorResponseMetadata, ActionFlavorResponseResources] (BasePluginFlavorResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/base_plugin_flavor.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
BasePluginFlavorResponse[EventSourceFlavorResponseBody, EventSourceFlavorResponseMetadata, EventSourceFlavorResponseResources] (BasePluginFlavorResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/base_plugin_flavor.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
BasePluginResponseBody (BaseResponseBody)

Response body for plugins.

Source code in zenml/models/v2/base/base_plugin_flavor.py
class BasePluginResponseBody(BaseResponseBody):
    """Response body for plugins."""
BasePluginResponseMetadata (BaseResponseMetadata)

Response metadata for plugins.

Source code in zenml/models/v2/base/base_plugin_flavor.py
class BasePluginResponseMetadata(BaseResponseMetadata):
    """Response metadata for plugins."""
BasePluginResponseResources (BaseResponseResources)

Response resources for plugins.

Source code in zenml/models/v2/base/base_plugin_flavor.py
class BasePluginResponseResources(BaseResponseResources):
    """Response resources for plugins."""
filter

Base filter model definitions.

BaseFilter (BaseModel)

Class to unify all filter, paginate and sort request parameters.

This Model allows fine-grained filtering, sorting and pagination of resources.

Usage example for subclasses of this class:

ResourceListModel(
    name="contains:default",
    workspace="default"
    count_steps="gte:5"
    sort_by="created",
    page=2,
    size=20
)
Source code in zenml/models/v2/base/filter.py
class BaseFilter(BaseModel):
    """Class to unify all filter, paginate and sort request parameters.

    This Model allows fine-grained filtering, sorting and pagination of
    resources.

    Usage example for subclasses of this class:
    ```
    ResourceListModel(
        name="contains:default",
        workspace="default"
        count_steps="gte:5"
        sort_by="created",
        page=2,
        size=20
    )
    ```
    """

    # List of fields that cannot be used as filters.
    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        "sort_by",
        "page",
        "size",
        "logical_operator",
    ]
    CUSTOM_SORTING_OPTIONS: ClassVar[List[str]] = []

    # List of fields that are not even mentioned as options in the CLI.
    CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = []

    # List of fields that are wrapped with `fastapi.Query(default)` in API.
    API_MULTI_INPUT_PARAMS: ClassVar[List[str]] = []

    sort_by: str = Field(
        default="created", description="Which column to sort by."
    )
    logical_operator: LogicalOperators = Field(
        default=LogicalOperators.AND,
        description="Which logical operator to use between all filters "
        "['and', 'or']",
    )
    page: int = Field(
        default=PAGINATION_STARTING_PAGE, ge=1, description="Page number"
    )
    size: int = Field(
        default=PAGE_SIZE_DEFAULT,
        ge=1,
        le=PAGE_SIZE_MAXIMUM,
        description="Page size",
    )

    id: Optional[Union[UUID, str]] = Field(
        default=None,
        description="Id for this resource",
        union_mode="left_to_right",
    )
    created: Optional[Union[datetime, str]] = Field(
        default=None, description="Created", union_mode="left_to_right"
    )
    updated: Optional[Union[datetime, str]] = Field(
        default=None, description="Updated", union_mode="left_to_right"
    )

    _rbac_configuration: Optional[
        Tuple[UUID, Dict[str, Optional[Set[UUID]]]]
    ] = None

    @field_validator("sort_by", mode="before")
    @classmethod
    def validate_sort_by(cls, value: Any) -> Any:
        """Validate that the sort_column is a valid column with a valid operand.

        Args:
            value: The sort_by field value.

        Returns:
            The validated sort_by field value.

        Raises:
            ValidationError: If the sort_by field is not a string.
            ValueError: If the resource can't be sorted by this field.
        """
        # Somehow pydantic allows you to pass in int values, which will be
        #  interpreted as string, however within the validator they are still
        #  integers, which don't have a .split() method
        if not isinstance(value, str):
            raise ValidationError(
                f"str type expected for the sort_by field. "
                f"Received a {type(value)}"
            )
        column = value
        split_value = value.split(":", 1)
        if len(split_value) == 2:
            column = split_value[1]

            if split_value[0] not in SorterOps.values():
                logger.warning(
                    "Invalid operand used for column sorting. "
                    "Only the following operands are supported `%s`. "
                    "Defaulting to 'asc' on column `%s`.",
                    SorterOps.values(),
                    column,
                )
                value = column

        if column in cls.FILTER_EXCLUDE_FIELDS:
            raise ValueError(
                f"This resource can not be sorted by this field: '{value}'"
            )
        elif column in cls.model_fields:
            return value
        elif column in cls.CUSTOM_SORTING_OPTIONS:
            return value
        else:
            raise ValueError(
                "You can only sort by valid fields of this resource"
            )

    @model_validator(mode="before")
    @classmethod
    @before_validator_handler
    def filter_ops(cls, data: Dict[str, Any]) -> Dict[str, Any]:
        """Parse incoming filters to ensure all filters are legal.

        Args:
            data: The values of the class.

        Returns:
            The values of the class.
        """
        cls._generate_filter_list(data)
        return data

    @property
    def list_of_filters(self) -> List[Filter]:
        """Converts the class variables into a list of usable Filter Models.

        Returns:
            A list of Filter models.
        """
        return self._generate_filter_list(
            {key: getattr(self, key) for key in self.model_fields}
        )

    @property
    def sorting_params(self) -> Tuple[str, SorterOps]:
        """Converts the class variables into a list of usable Filter Models.

        Returns:
            A tuple of the column to sort by and the sorting operand.
        """
        column = self.sort_by
        # The default sorting operand is asc
        operator = SorterOps.ASCENDING

        # Check if user explicitly set an operand
        split_value = self.sort_by.split(":", 1)
        if len(split_value) == 2:
            column = split_value[1]
            operator = SorterOps(split_value[0])

        return column, operator

    def configure_rbac(
        self,
        authenticated_user_id: UUID,
        **column_allowed_ids: Optional[Set[UUID]],
    ) -> None:
        """Configure RBAC allowed column values.

        Args:
            authenticated_user_id: ID of the authenticated user. All entities
                owned by this user will be included.
            column_allowed_ids: Set of IDs per column to limit the query to.
                If given, the remaining filters will be applied to entities
                within this set only. If `None`, the remaining filters will
                be applied to all entries in the table.
        """
        self._rbac_configuration = (authenticated_user_id, column_allowed_ids)

    def generate_rbac_filter(
        self,
        table: Type["AnySchema"],
    ) -> Optional["ColumnElement[bool]"]:
        """Generates an optional RBAC filter.

        Args:
            table: The query table.

        Returns:
            The RBAC filter.
        """
        from sqlmodel import or_

        if not self._rbac_configuration:
            return None

        expressions = []

        for column_name, allowed_ids in self._rbac_configuration[1].items():
            if allowed_ids is not None:
                expression = getattr(table, column_name).in_(allowed_ids)
                expressions.append(expression)

        if expressions and hasattr(table, "user_id"):
            # If `expressions` is not empty, we do not have full access to all
            # rows of the table. In this case, we also include rows which the
            # user owns.

            # Unowned entities are considered server-owned and can be seen
            # by anyone
            expressions.append(getattr(table, "user_id").is_(None))
            # The authenticated user owns this entity
            expressions.append(
                getattr(table, "user_id") == self._rbac_configuration[0]
            )

        if expressions:
            return or_(*expressions)
        else:
            return None

    @classmethod
    def _generate_filter_list(cls, values: Dict[str, Any]) -> List[Filter]:
        """Create a list of filters from a (column, value) dictionary.

        Args:
            values: A dictionary of column names and values to filter on.

        Returns:
            A list of filters.
        """
        list_of_filters: List[Filter] = []

        for key, value in values.items():
            # Ignore excluded filters
            if key in cls.FILTER_EXCLUDE_FIELDS:
                continue

            # Skip filtering for None values
            if value is None:
                continue

            # Determine the operator and filter value
            value, operator = cls._resolve_operator(value)

            # Define the filter
            filter = cls._define_filter(
                column=key, value=value, operator=operator
            )
            list_of_filters.append(filter)

        return list_of_filters

    @staticmethod
    def _resolve_operator(value: Any) -> Tuple[Any, GenericFilterOps]:
        """Determine the operator and filter value from a user-provided value.

        If the user-provided value is a string of the form "operator:value",
        then the operator is extracted and the value is returned. Otherwise,
        `GenericFilterOps.EQUALS` is used as default operator and the value
        is returned as-is.

        Args:
            value: The user-provided value.

        Returns:
            A tuple of the filter value and the operator.
        """
        operator = GenericFilterOps.EQUALS  # Default operator
        if isinstance(value, str):
            split_value = value.split(":", 1)
            if (
                len(split_value) == 2
                and split_value[0] in GenericFilterOps.values()
            ):
                value = split_value[1]
                operator = GenericFilterOps(split_value[0])
        return value, operator

    @classmethod
    def _define_filter(
        cls, column: str, value: Any, operator: GenericFilterOps
    ) -> Filter:
        """Define a filter for a given column.

        Args:
            column: The column to filter on.
            value: The value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.
        """
        # Create datetime filters
        if cls.is_datetime_field(column):
            return cls._define_datetime_filter(
                column=column,
                value=value,
                operator=operator,
            )

        # Create UUID filters
        if cls.is_uuid_field(column):
            return cls._define_uuid_filter(
                column=column,
                value=value,
                operator=operator,
            )

        # Create int filters
        if cls.is_int_field(column):
            return NumericFilter(
                operation=GenericFilterOps(operator),
                column=column,
                value=int(value),
            )

        # Create bool filters
        if cls.is_bool_field(column):
            return cls._define_bool_filter(
                column=column,
                value=value,
                operator=operator,
            )

        # Create str filters
        if cls.is_str_field(column):
            return StrFilter(
                operation=GenericFilterOps(operator),
                column=column,
                value=value,
            )

        # Handle unsupported datatypes
        logger.warning(
            f"The Datatype {cls.model_fields[column].annotation} might not be "
            "supported for filtering. Defaulting to a string filter."
        )
        return StrFilter(
            operation=GenericFilterOps(operator),
            column=column,
            value=str(value),
        )

    @classmethod
    def check_field_annotation(cls, k: str, type_: Any) -> bool:
        """Checks whether a model field has a certain annotation.

        Args:
            k: The name of the field.
            type_: The type to check.

        Raises:
            ValueError: if the model field within does not have an annotation.

        Returns:
            True if the annotation of the field matches the given type, False
            otherwise.
        """
        try:
            annotation = cls.model_fields[k].annotation

            if annotation is not None:
                return (
                    issubclass(type_, get_args(annotation))
                    or annotation is type_
                )
            else:
                raise ValueError(
                    f"The field '{k}' inside the model {cls.__name__} "
                    "does not have an annotation."
                )
        except TypeError:
            return False

    @classmethod
    def is_datetime_field(cls, k: str) -> bool:
        """Checks if it's a datetime field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a datetime field, False otherwise.
        """
        return cls.check_field_annotation(k=k, type_=datetime)

    @classmethod
    def is_uuid_field(cls, k: str) -> bool:
        """Checks if it's a UUID field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a UUID field, False otherwise.
        """
        return cls.check_field_annotation(k=k, type_=UUID)

    @classmethod
    def is_int_field(cls, k: str) -> bool:
        """Checks if it's an int field.

        Args:
            k: The key to check.

        Returns:
            True if the field is an int field, False otherwise.
        """
        return cls.check_field_annotation(k=k, type_=int)

    @classmethod
    def is_bool_field(cls, k: str) -> bool:
        """Checks if it's a bool field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a bool field, False otherwise.
        """
        return cls.check_field_annotation(k=k, type_=bool)

    @classmethod
    def is_str_field(cls, k: str) -> bool:
        """Checks if it's a string field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a string field, False otherwise.
        """
        return cls.check_field_annotation(k=k, type_=str)

    @classmethod
    def is_sort_by_field(cls, k: str) -> bool:
        """Checks if it's a sort by field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a sort by field, False otherwise.
        """
        return cls.check_field_annotation(k=k, type_=str) and k == "sort_by"

    @staticmethod
    def _define_datetime_filter(
        column: str, value: Any, operator: GenericFilterOps
    ) -> NumericFilter:
        """Define a datetime filter for a given column.

        Args:
            column: The column to filter on.
            value: The datetime value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.

        Raises:
            ValueError: If the value is not a valid datetime.
        """
        try:
            if isinstance(value, datetime):
                datetime_value = value
            else:
                datetime_value = datetime.strptime(
                    value, FILTERING_DATETIME_FORMAT
                )
        except ValueError as e:
            raise ValueError(
                "The datetime filter only works with values in the following "
                f"format: {FILTERING_DATETIME_FORMAT}"
            ) from e
        datetime_filter = NumericFilter(
            operation=GenericFilterOps(operator),
            column=column,
            value=datetime_value,
        )
        return datetime_filter

    @staticmethod
    def _define_uuid_filter(
        column: str, value: Any, operator: GenericFilterOps
    ) -> UUIDFilter:
        """Define a UUID filter for a given column.

        Args:
            column: The column to filter on.
            value: The UUID value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.

        Raises:
            ValueError: If the value is not a valid UUID.
        """
        # For equality checks, ensure that the value is a valid UUID.
        if operator == GenericFilterOps.EQUALS and not isinstance(value, UUID):
            try:
                UUID(value)
            except ValueError as e:
                raise ValueError(
                    "Invalid value passed as UUID query parameter."
                ) from e

        # Cast the value to string for further comparisons.
        value = str(value)

        # Generate the filter.
        uuid_filter = UUIDFilter(
            operation=GenericFilterOps(operator),
            column=column,
            value=value,
        )
        return uuid_filter

    @staticmethod
    def _define_bool_filter(
        column: str, value: Any, operator: GenericFilterOps
    ) -> BoolFilter:
        """Define a bool filter for a given column.

        Args:
            column: The column to filter on.
            value: The bool value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.
        """
        if GenericFilterOps(operator) != GenericFilterOps.EQUALS:
            logger.warning(
                "Boolean filters do not support any"
                "operation except for equals. Defaulting"
                "to an `equals` comparison."
            )
        return BoolFilter(
            operation=GenericFilterOps.EQUALS,
            column=column,
            value=bool(value),
        )

    @property
    def offset(self) -> int:
        """Returns the offset needed for the query on the data persistence layer.

        Returns:
            The offset for the query.
        """
        return self.size * (self.page - 1)

    def generate_filter(
        self, table: Type[SQLModel]
    ) -> Union["ColumnElement[bool]"]:
        """Generate the filter for the query.

        Args:
            table: The Table that is being queried from.

        Returns:
            The filter expression for the query.

        Raises:
            RuntimeError: If a valid logical operator is not supplied.
        """
        from sqlmodel import and_, or_

        filters = []
        for column_filter in self.list_of_filters:
            filters.append(
                column_filter.generate_query_conditions(table=table)
            )
        for custom_filter in self.get_custom_filters():
            filters.append(custom_filter)
        if self.logical_operator == LogicalOperators.OR:
            return or_(False, *filters)
        elif self.logical_operator == LogicalOperators.AND:
            return and_(True, *filters)
        else:
            raise RuntimeError("No valid logical operator was supplied.")

    def get_custom_filters(self) -> List["ColumnElement[bool]"]:
        """Get custom filters.

        This can be overridden by subclasses to define custom filters that are
        not based on the columns of the underlying table.

        Returns:
            A list of custom filters.
        """
        return []

    def apply_filter(
        self,
        query: AnyQuery,
        table: Type["AnySchema"],
    ) -> AnyQuery:
        """Applies the filter to a query.

        Args:
            query: The query to which to apply the filter.
            table: The query table.

        Returns:
            The query with filter applied.
        """
        rbac_filter = self.generate_rbac_filter(table=table)

        if rbac_filter is not None:
            query = query.where(rbac_filter)

        filters = self.generate_filter(table=table)

        if filters is not None:
            query = query.where(filters)

        return query

    def apply_sorting(
        self,
        query: AnyQuery,
        table: Type["AnySchema"],
    ) -> AnyQuery:
        """Apply sorting to the query.

        Args:
            query: The query to which to apply the sorting.
            table: The query table.

        Returns:
            The query with sorting applied.
        """
        column, operand = self.sorting_params

        if operand == SorterOps.DESCENDING:
            sort_clause = desc(getattr(table, column))  # type: ignore[var-annotated]
        else:
            sort_clause = asc(getattr(table, column))

        # We always add the `id` column as a tiebreaker to ensure a stable,
        # repeatable order of items, otherwise subsequent pages might contain
        # the same items.
        query = query.order_by(sort_clause, asc(table.id))  # type: ignore[arg-type]

        return query
list_of_filters: List[zenml.models.v2.base.filter.Filter] property readonly

Converts the class variables into a list of usable Filter Models.

Returns:

Type Description
List[zenml.models.v2.base.filter.Filter]

A list of Filter models.

offset: int property readonly

Returns the offset needed for the query on the data persistence layer.

Returns:

Type Description
int

The offset for the query.

sorting_params: Tuple[str, zenml.enums.SorterOps] property readonly

Converts the class variables into a list of usable Filter Models.

Returns:

Type Description
Tuple[str, zenml.enums.SorterOps]

A tuple of the column to sort by and the sorting operand.

apply_filter(self, query, table)

Applies the filter to a query.

Parameters:

Name Type Description Default
query ~AnyQuery

The query to which to apply the filter.

required
table Type[AnySchema]

The query table.

required

Returns:

Type Description
~AnyQuery

The query with filter applied.

Source code in zenml/models/v2/base/filter.py
def apply_filter(
    self,
    query: AnyQuery,
    table: Type["AnySchema"],
) -> AnyQuery:
    """Applies the filter to a query.

    Args:
        query: The query to which to apply the filter.
        table: The query table.

    Returns:
        The query with filter applied.
    """
    rbac_filter = self.generate_rbac_filter(table=table)

    if rbac_filter is not None:
        query = query.where(rbac_filter)

    filters = self.generate_filter(table=table)

    if filters is not None:
        query = query.where(filters)

    return query
apply_sorting(self, query, table)

Apply sorting to the query.

Parameters:

Name Type Description Default
query ~AnyQuery

The query to which to apply the sorting.

required
table Type[AnySchema]

The query table.

required

Returns:

Type Description
~AnyQuery

The query with sorting applied.

Source code in zenml/models/v2/base/filter.py
def apply_sorting(
    self,
    query: AnyQuery,
    table: Type["AnySchema"],
) -> AnyQuery:
    """Apply sorting to the query.

    Args:
        query: The query to which to apply the sorting.
        table: The query table.

    Returns:
        The query with sorting applied.
    """
    column, operand = self.sorting_params

    if operand == SorterOps.DESCENDING:
        sort_clause = desc(getattr(table, column))  # type: ignore[var-annotated]
    else:
        sort_clause = asc(getattr(table, column))

    # We always add the `id` column as a tiebreaker to ensure a stable,
    # repeatable order of items, otherwise subsequent pages might contain
    # the same items.
    query = query.order_by(sort_clause, asc(table.id))  # type: ignore[arg-type]

    return query
check_field_annotation(k, type_) classmethod

Checks whether a model field has a certain annotation.

Parameters:

Name Type Description Default
k str

The name of the field.

required
type_ Any

The type to check.

required

Exceptions:

Type Description
ValueError

if the model field within does not have an annotation.

Returns:

Type Description
bool

True if the annotation of the field matches the given type, False otherwise.

Source code in zenml/models/v2/base/filter.py
@classmethod
def check_field_annotation(cls, k: str, type_: Any) -> bool:
    """Checks whether a model field has a certain annotation.

    Args:
        k: The name of the field.
        type_: The type to check.

    Raises:
        ValueError: if the model field within does not have an annotation.

    Returns:
        True if the annotation of the field matches the given type, False
        otherwise.
    """
    try:
        annotation = cls.model_fields[k].annotation

        if annotation is not None:
            return (
                issubclass(type_, get_args(annotation))
                or annotation is type_
            )
        else:
            raise ValueError(
                f"The field '{k}' inside the model {cls.__name__} "
                "does not have an annotation."
            )
    except TypeError:
        return False
configure_rbac(self, authenticated_user_id, **column_allowed_ids)

Configure RBAC allowed column values.

Parameters:

Name Type Description Default
authenticated_user_id UUID

ID of the authenticated user. All entities owned by this user will be included.

required
column_allowed_ids Optional[Set[uuid.UUID]]

Set of IDs per column to limit the query to. If given, the remaining filters will be applied to entities within this set only. If None, the remaining filters will be applied to all entries in the table.

{}
Source code in zenml/models/v2/base/filter.py
def configure_rbac(
    self,
    authenticated_user_id: UUID,
    **column_allowed_ids: Optional[Set[UUID]],
) -> None:
    """Configure RBAC allowed column values.

    Args:
        authenticated_user_id: ID of the authenticated user. All entities
            owned by this user will be included.
        column_allowed_ids: Set of IDs per column to limit the query to.
            If given, the remaining filters will be applied to entities
            within this set only. If `None`, the remaining filters will
            be applied to all entries in the table.
    """
    self._rbac_configuration = (authenticated_user_id, column_allowed_ids)
filter_ops(data, validation_info) classmethod

Wrapper method to handle the raw data.

Parameters:

Name Type Description Default
cls

the class handler

required
data Any

the raw input data

required
validation_info ValidationInfo

the context of the validation.

required

Returns:

Type Description
Any

the validated data

Source code in zenml/models/v2/base/filter.py
def before_validator(
    cls: Type[BaseModel], data: Any, validation_info: ValidationInfo
) -> Any:
    """Wrapper method to handle the raw data.

    Args:
        cls: the class handler
        data: the raw input data
        validation_info: the context of the validation.

    Returns:
        the validated data
    """
    data = model_validator_data_handler(
        raw_data=data, base_class=cls, validation_info=validation_info
    )
    return method(cls=cls, data=data)
generate_filter(self, table)

Generate the filter for the query.

Parameters:

Name Type Description Default
table Type[sqlmodel.main.SQLModel]

The Table that is being queried from.

required

Returns:

Type Description
ColumnElement[bool]

The filter expression for the query.

Exceptions:

Type Description
RuntimeError

If a valid logical operator is not supplied.

Source code in zenml/models/v2/base/filter.py
def generate_filter(
    self, table: Type[SQLModel]
) -> Union["ColumnElement[bool]"]:
    """Generate the filter for the query.

    Args:
        table: The Table that is being queried from.

    Returns:
        The filter expression for the query.

    Raises:
        RuntimeError: If a valid logical operator is not supplied.
    """
    from sqlmodel import and_, or_

    filters = []
    for column_filter in self.list_of_filters:
        filters.append(
            column_filter.generate_query_conditions(table=table)
        )
    for custom_filter in self.get_custom_filters():
        filters.append(custom_filter)
    if self.logical_operator == LogicalOperators.OR:
        return or_(False, *filters)
    elif self.logical_operator == LogicalOperators.AND:
        return and_(True, *filters)
    else:
        raise RuntimeError("No valid logical operator was supplied.")
generate_rbac_filter(self, table)

Generates an optional RBAC filter.

Parameters:

Name Type Description Default
table Type[AnySchema]

The query table.

required

Returns:

Type Description
Optional[ColumnElement[bool]]

The RBAC filter.

Source code in zenml/models/v2/base/filter.py
def generate_rbac_filter(
    self,
    table: Type["AnySchema"],
) -> Optional["ColumnElement[bool]"]:
    """Generates an optional RBAC filter.

    Args:
        table: The query table.

    Returns:
        The RBAC filter.
    """
    from sqlmodel import or_

    if not self._rbac_configuration:
        return None

    expressions = []

    for column_name, allowed_ids in self._rbac_configuration[1].items():
        if allowed_ids is not None:
            expression = getattr(table, column_name).in_(allowed_ids)
            expressions.append(expression)

    if expressions and hasattr(table, "user_id"):
        # If `expressions` is not empty, we do not have full access to all
        # rows of the table. In this case, we also include rows which the
        # user owns.

        # Unowned entities are considered server-owned and can be seen
        # by anyone
        expressions.append(getattr(table, "user_id").is_(None))
        # The authenticated user owns this entity
        expressions.append(
            getattr(table, "user_id") == self._rbac_configuration[0]
        )

    if expressions:
        return or_(*expressions)
    else:
        return None
get_custom_filters(self)

Get custom filters.

This can be overridden by subclasses to define custom filters that are not based on the columns of the underlying table.

Returns:

Type Description
List[ColumnElement[bool]]

A list of custom filters.

Source code in zenml/models/v2/base/filter.py
def get_custom_filters(self) -> List["ColumnElement[bool]"]:
    """Get custom filters.

    This can be overridden by subclasses to define custom filters that are
    not based on the columns of the underlying table.

    Returns:
        A list of custom filters.
    """
    return []
is_bool_field(k) classmethod

Checks if it's a bool field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a bool field, False otherwise.

Source code in zenml/models/v2/base/filter.py
@classmethod
def is_bool_field(cls, k: str) -> bool:
    """Checks if it's a bool field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a bool field, False otherwise.
    """
    return cls.check_field_annotation(k=k, type_=bool)
is_datetime_field(k) classmethod

Checks if it's a datetime field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a datetime field, False otherwise.

Source code in zenml/models/v2/base/filter.py
@classmethod
def is_datetime_field(cls, k: str) -> bool:
    """Checks if it's a datetime field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a datetime field, False otherwise.
    """
    return cls.check_field_annotation(k=k, type_=datetime)
is_int_field(k) classmethod

Checks if it's an int field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is an int field, False otherwise.

Source code in zenml/models/v2/base/filter.py
@classmethod
def is_int_field(cls, k: str) -> bool:
    """Checks if it's an int field.

    Args:
        k: The key to check.

    Returns:
        True if the field is an int field, False otherwise.
    """
    return cls.check_field_annotation(k=k, type_=int)
is_sort_by_field(k) classmethod

Checks if it's a sort by field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a sort by field, False otherwise.

Source code in zenml/models/v2/base/filter.py
@classmethod
def is_sort_by_field(cls, k: str) -> bool:
    """Checks if it's a sort by field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a sort by field, False otherwise.
    """
    return cls.check_field_annotation(k=k, type_=str) and k == "sort_by"
is_str_field(k) classmethod

Checks if it's a string field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a string field, False otherwise.

Source code in zenml/models/v2/base/filter.py
@classmethod
def is_str_field(cls, k: str) -> bool:
    """Checks if it's a string field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a string field, False otherwise.
    """
    return cls.check_field_annotation(k=k, type_=str)
is_uuid_field(k) classmethod

Checks if it's a UUID field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a UUID field, False otherwise.

Source code in zenml/models/v2/base/filter.py
@classmethod
def is_uuid_field(cls, k: str) -> bool:
    """Checks if it's a UUID field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a UUID field, False otherwise.
    """
    return cls.check_field_annotation(k=k, type_=UUID)
model_post_init(self, __context)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Parameters:

Name Type Description Default
self BaseModel

The BaseModel instance.

required
__context Any

The context.

required
Source code in zenml/models/v2/base/filter.py
def init_private_attributes(self: BaseModel, __context: Any) -> None:
    """This function is meant to behave like a BaseModel method to initialise private attributes.

    It takes context as an argument since that's what pydantic-core passes when calling it.

    Args:
        self: The BaseModel instance.
        __context: The context.
    """
    if getattr(self, '__pydantic_private__', None) is None:
        pydantic_private = {}
        for name, private_attr in self.__private_attributes__.items():
            default = private_attr.get_default()
            if default is not PydanticUndefined:
                pydantic_private[name] = default
        object_setattr(self, '__pydantic_private__', pydantic_private)
validate_sort_by(value) classmethod

Validate that the sort_column is a valid column with a valid operand.

Parameters:

Name Type Description Default
value Any

The sort_by field value.

required

Returns:

Type Description
Any

The validated sort_by field value.

Exceptions:

Type Description
ValidationError

If the sort_by field is not a string.

ValueError

If the resource can't be sorted by this field.

Source code in zenml/models/v2/base/filter.py
@field_validator("sort_by", mode="before")
@classmethod
def validate_sort_by(cls, value: Any) -> Any:
    """Validate that the sort_column is a valid column with a valid operand.

    Args:
        value: The sort_by field value.

    Returns:
        The validated sort_by field value.

    Raises:
        ValidationError: If the sort_by field is not a string.
        ValueError: If the resource can't be sorted by this field.
    """
    # Somehow pydantic allows you to pass in int values, which will be
    #  interpreted as string, however within the validator they are still
    #  integers, which don't have a .split() method
    if not isinstance(value, str):
        raise ValidationError(
            f"str type expected for the sort_by field. "
            f"Received a {type(value)}"
        )
    column = value
    split_value = value.split(":", 1)
    if len(split_value) == 2:
        column = split_value[1]

        if split_value[0] not in SorterOps.values():
            logger.warning(
                "Invalid operand used for column sorting. "
                "Only the following operands are supported `%s`. "
                "Defaulting to 'asc' on column `%s`.",
                SorterOps.values(),
                column,
            )
            value = column

    if column in cls.FILTER_EXCLUDE_FIELDS:
        raise ValueError(
            f"This resource can not be sorted by this field: '{value}'"
        )
    elif column in cls.model_fields:
        return value
    elif column in cls.CUSTOM_SORTING_OPTIONS:
        return value
    else:
        raise ValueError(
            "You can only sort by valid fields of this resource"
        )
BoolFilter (Filter)

Filter for all Boolean fields.

Source code in zenml/models/v2/base/filter.py
class BoolFilter(Filter):
    """Filter for all Boolean fields."""

    ALLOWED_OPS: ClassVar[List[str]] = [GenericFilterOps.EQUALS]

    def generate_query_conditions_from_column(self, column: Any) -> Any:
        """Generate query conditions for a boolean column.

        Args:
            column: The boolean column of an SQLModel table on which to filter.

        Returns:
            A list of query conditions.
        """
        return column == self.value
generate_query_conditions_from_column(self, column)

Generate query conditions for a boolean column.

Parameters:

Name Type Description Default
column Any

The boolean column of an SQLModel table on which to filter.

required

Returns:

Type Description
Any

A list of query conditions.

Source code in zenml/models/v2/base/filter.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
    """Generate query conditions for a boolean column.

    Args:
        column: The boolean column of an SQLModel table on which to filter.

    Returns:
        A list of query conditions.
    """
    return column == self.value
Filter (BaseModel, ABC)

Filter for all fields.

A Filter is a combination of a column, a value that the user uses to filter on this column and an operation to use. The easiest example would be user equals aria with column=user, value=aria and the operation=equals.

All subclasses of this class will support different sets of operations. This operation set is defined in the ALLOWED_OPS class variable.

Source code in zenml/models/v2/base/filter.py
class Filter(BaseModel, ABC):
    """Filter for all fields.

    A Filter is a combination of a column, a value that the user uses to
    filter on this column and an operation to use. The easiest example
    would be `user equals aria` with column=`user`, value=`aria` and the
    operation=`equals`.

    All subclasses of this class will support different sets of operations.
    This operation set is defined in the ALLOWED_OPS class variable.
    """

    ALLOWED_OPS: ClassVar[List[str]] = []

    operation: GenericFilterOps
    column: str
    value: Optional[Any] = None

    @field_validator("operation", mode="before")
    @classmethod
    def validate_operation(cls, value: Any) -> Any:
        """Validate that the operation is a valid op for the field type.

        Args:
            value: The operation of this filter.

        Returns:
            The operation if it is valid.

        Raises:
            ValueError: If the operation is not valid for this field type.
        """
        if value not in cls.ALLOWED_OPS:
            raise ValueError(
                f"This datatype can not be filtered using this operation: "
                f"'{value}'. The allowed operations are: {cls.ALLOWED_OPS}"
            )
        else:
            return value

    def generate_query_conditions(
        self,
        table: Type[SQLModel],
    ) -> Union["ColumnElement[bool]"]:
        """Generate the query conditions for the database.

        This method converts the Filter class into an appropriate SQLModel
        query condition, to be used when filtering on the Database.

        Args:
            table: The SQLModel table to use for the query creation

        Returns:
            A list of conditions that will be combined using the `and` operation
        """
        column = getattr(table, self.column)
        conditions = self.generate_query_conditions_from_column(column)
        return conditions  # type:ignore[no-any-return]

    @abstractmethod
    def generate_query_conditions_from_column(self, column: Any) -> Any:
        """Generate query conditions given the corresponding database column.

        This method should be overridden by subclasses to define how each
        supported operation in `self.ALLOWED_OPS` can be used to filter the
        given column by `self.value`.

        Args:
            column: The column of an SQLModel table on which to filter.

        Returns:
            A list of query conditions.
        """
generate_query_conditions(self, table)

Generate the query conditions for the database.

This method converts the Filter class into an appropriate SQLModel query condition, to be used when filtering on the Database.

Parameters:

Name Type Description Default
table Type[sqlmodel.main.SQLModel]

The SQLModel table to use for the query creation

required

Returns:

Type Description
ColumnElement[bool]

A list of conditions that will be combined using the and operation

Source code in zenml/models/v2/base/filter.py
def generate_query_conditions(
    self,
    table: Type[SQLModel],
) -> Union["ColumnElement[bool]"]:
    """Generate the query conditions for the database.

    This method converts the Filter class into an appropriate SQLModel
    query condition, to be used when filtering on the Database.

    Args:
        table: The SQLModel table to use for the query creation

    Returns:
        A list of conditions that will be combined using the `and` operation
    """
    column = getattr(table, self.column)
    conditions = self.generate_query_conditions_from_column(column)
    return conditions  # type:ignore[no-any-return]
generate_query_conditions_from_column(self, column)

Generate query conditions given the corresponding database column.

This method should be overridden by subclasses to define how each supported operation in self.ALLOWED_OPS can be used to filter the given column by self.value.

Parameters:

Name Type Description Default
column Any

The column of an SQLModel table on which to filter.

required

Returns:

Type Description
Any

A list of query conditions.

Source code in zenml/models/v2/base/filter.py
@abstractmethod
def generate_query_conditions_from_column(self, column: Any) -> Any:
    """Generate query conditions given the corresponding database column.

    This method should be overridden by subclasses to define how each
    supported operation in `self.ALLOWED_OPS` can be used to filter the
    given column by `self.value`.

    Args:
        column: The column of an SQLModel table on which to filter.

    Returns:
        A list of query conditions.
    """
validate_operation(value) classmethod

Validate that the operation is a valid op for the field type.

Parameters:

Name Type Description Default
value Any

The operation of this filter.

required

Returns:

Type Description
Any

The operation if it is valid.

Exceptions:

Type Description
ValueError

If the operation is not valid for this field type.

Source code in zenml/models/v2/base/filter.py
@field_validator("operation", mode="before")
@classmethod
def validate_operation(cls, value: Any) -> Any:
    """Validate that the operation is a valid op for the field type.

    Args:
        value: The operation of this filter.

    Returns:
        The operation if it is valid.

    Raises:
        ValueError: If the operation is not valid for this field type.
    """
    if value not in cls.ALLOWED_OPS:
        raise ValueError(
            f"This datatype can not be filtered using this operation: "
            f"'{value}'. The allowed operations are: {cls.ALLOWED_OPS}"
        )
    else:
        return value
NumericFilter (Filter)

Filter for all numeric fields.

Source code in zenml/models/v2/base/filter.py
class NumericFilter(Filter):
    """Filter for all numeric fields."""

    value: Union[float, datetime] = Field(union_mode="left_to_right")

    ALLOWED_OPS: ClassVar[List[str]] = [
        GenericFilterOps.EQUALS,
        GenericFilterOps.GT,
        GenericFilterOps.GTE,
        GenericFilterOps.LT,
        GenericFilterOps.LTE,
    ]

    def generate_query_conditions_from_column(self, column: Any) -> Any:
        """Generate query conditions for a UUID column.

        Args:
            column: The UUID column of an SQLModel table on which to filter.

        Returns:
            A list of query conditions.
        """
        if self.operation == GenericFilterOps.GTE:
            return column >= self.value
        if self.operation == GenericFilterOps.GT:
            return column > self.value
        if self.operation == GenericFilterOps.LTE:
            return column <= self.value
        if self.operation == GenericFilterOps.LT:
            return column < self.value
        return column == self.value
generate_query_conditions_from_column(self, column)

Generate query conditions for a UUID column.

Parameters:

Name Type Description Default
column Any

The UUID column of an SQLModel table on which to filter.

required

Returns:

Type Description
Any

A list of query conditions.

Source code in zenml/models/v2/base/filter.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
    """Generate query conditions for a UUID column.

    Args:
        column: The UUID column of an SQLModel table on which to filter.

    Returns:
        A list of query conditions.
    """
    if self.operation == GenericFilterOps.GTE:
        return column >= self.value
    if self.operation == GenericFilterOps.GT:
        return column > self.value
    if self.operation == GenericFilterOps.LTE:
        return column <= self.value
    if self.operation == GenericFilterOps.LT:
        return column < self.value
    return column == self.value
StrFilter (Filter)

Filter for all string fields.

Source code in zenml/models/v2/base/filter.py
class StrFilter(Filter):
    """Filter for all string fields."""

    ALLOWED_OPS: ClassVar[List[str]] = [
        GenericFilterOps.EQUALS,
        GenericFilterOps.STARTSWITH,
        GenericFilterOps.CONTAINS,
        GenericFilterOps.ENDSWITH,
    ]

    def generate_query_conditions_from_column(self, column: Any) -> Any:
        """Generate query conditions for a string column.

        Args:
            column: The string column of an SQLModel table on which to filter.

        Returns:
            A list of query conditions.
        """
        if self.operation == GenericFilterOps.CONTAINS:
            return column.like(f"%{self.value}%")
        if self.operation == GenericFilterOps.STARTSWITH:
            return column.startswith(f"{self.value}")
        if self.operation == GenericFilterOps.ENDSWITH:
            return column.endswith(f"{self.value}")
        return column == self.value
generate_query_conditions_from_column(self, column)

Generate query conditions for a string column.

Parameters:

Name Type Description Default
column Any

The string column of an SQLModel table on which to filter.

required

Returns:

Type Description
Any

A list of query conditions.

Source code in zenml/models/v2/base/filter.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
    """Generate query conditions for a string column.

    Args:
        column: The string column of an SQLModel table on which to filter.

    Returns:
        A list of query conditions.
    """
    if self.operation == GenericFilterOps.CONTAINS:
        return column.like(f"%{self.value}%")
    if self.operation == GenericFilterOps.STARTSWITH:
        return column.startswith(f"{self.value}")
    if self.operation == GenericFilterOps.ENDSWITH:
        return column.endswith(f"{self.value}")
    return column == self.value
UUIDFilter (StrFilter)

Filter for all uuid fields which are mostly treated like strings.

Source code in zenml/models/v2/base/filter.py
class UUIDFilter(StrFilter):
    """Filter for all uuid fields which are mostly treated like strings."""

    def generate_query_conditions_from_column(self, column: Any) -> Any:
        """Generate query conditions for a UUID column.

        Args:
            column: The UUID column of an SQLModel table on which to filter.

        Returns:
            A list of query conditions.
        """
        import sqlalchemy
        from sqlalchemy_utils.functions import cast_if

        # For equality checks, compare the UUID directly
        if self.operation == GenericFilterOps.EQUALS:
            return column == self.value

        # For all other operations, cast and handle the column as string
        return super().generate_query_conditions_from_column(
            column=cast_if(column, sqlalchemy.String)
        )
generate_query_conditions_from_column(self, column)

Generate query conditions for a UUID column.

Parameters:

Name Type Description Default
column Any

The UUID column of an SQLModel table on which to filter.

required

Returns:

Type Description
Any

A list of query conditions.

Source code in zenml/models/v2/base/filter.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
    """Generate query conditions for a UUID column.

    Args:
        column: The UUID column of an SQLModel table on which to filter.

    Returns:
        A list of query conditions.
    """
    import sqlalchemy
    from sqlalchemy_utils.functions import cast_if

    # For equality checks, compare the UUID directly
    if self.operation == GenericFilterOps.EQUALS:
        return column == self.value

    # For all other operations, cast and handle the column as string
    return super().generate_query_conditions_from_column(
        column=cast_if(column, sqlalchemy.String)
    )
page

Page model definitions.

Page (BaseModel, Generic)

Return Model for List Models to accommodate pagination.

Source code in zenml/models/v2/base/page.py
class Page(BaseModel, Generic[B]):
    """Return Model for List Models to accommodate pagination."""

    index: PositiveInt
    max_size: PositiveInt
    total_pages: NonNegativeInt
    total: NonNegativeInt
    items: List[B]

    __params_type__ = BaseFilter

    @property
    def size(self) -> int:
        """Return the item count of the page.

        Returns:
            The amount of items in the page.
        """
        return len(self.items)

    def __len__(self) -> int:
        """Return the item count of the page.

        This enables `len(page)`.

        Returns:
            The amount of items in the page.
        """
        return len(self.items)

    def __getitem__(self, index: int) -> B:
        """Return the item at the given index.

        This enables `page[index]`.

        Args:
            index: The index to get the item from.

        Returns:
            The item at the given index.
        """
        return self.items[index]

    def __iter__(self) -> Generator[B, None, None]:  # type: ignore[override]
        """Return an iterator over the items in the page.

        This enables `for item in page` loops, but breaks `dict(page)`.

        Yields:
            An iterator over the items in the page.
        """
        for item in self.items.__iter__():
            yield item

    def __contains__(self, item: B) -> bool:
        """Returns whether the page contains a specific item.

        This enables `item in page` checks.

        Args:
            item: The item to check for.

        Returns:
            Whether the item is in the page.
        """
        return item in self.items
size: int property readonly

Return the item count of the page.

Returns:

Type Description
int

The amount of items in the page.

__params_type__ (BaseModel)

Class to unify all filter, paginate and sort request parameters.

This Model allows fine-grained filtering, sorting and pagination of resources.

Usage example for subclasses of this class:

ResourceListModel(
    name="contains:default",
    workspace="default"
    count_steps="gte:5"
    sort_by="created",
    page=2,
    size=20
)
Source code in zenml/models/v2/base/page.py
class BaseFilter(BaseModel):
    """Class to unify all filter, paginate and sort request parameters.

    This Model allows fine-grained filtering, sorting and pagination of
    resources.

    Usage example for subclasses of this class:
    ```
    ResourceListModel(
        name="contains:default",
        workspace="default"
        count_steps="gte:5"
        sort_by="created",
        page=2,
        size=20
    )
    ```
    """

    # List of fields that cannot be used as filters.
    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        "sort_by",
        "page",
        "size",
        "logical_operator",
    ]
    CUSTOM_SORTING_OPTIONS: ClassVar[List[str]] = []

    # List of fields that are not even mentioned as options in the CLI.
    CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = []

    # List of fields that are wrapped with `fastapi.Query(default)` in API.
    API_MULTI_INPUT_PARAMS: ClassVar[List[str]] = []

    sort_by: str = Field(
        default="created", description="Which column to sort by."
    )
    logical_operator: LogicalOperators = Field(
        default=LogicalOperators.AND,
        description="Which logical operator to use between all filters "
        "['and', 'or']",
    )
    page: int = Field(
        default=PAGINATION_STARTING_PAGE, ge=1, description="Page number"
    )
    size: int = Field(
        default=PAGE_SIZE_DEFAULT,
        ge=1,
        le=PAGE_SIZE_MAXIMUM,
        description="Page size",
    )

    id: Optional[Union[UUID, str]] = Field(
        default=None,
        description="Id for this resource",
        union_mode="left_to_right",
    )
    created: Optional[Union[datetime, str]] = Field(
        default=None, description="Created", union_mode="left_to_right"
    )
    updated: Optional[Union[datetime, str]] = Field(
        default=None, description="Updated", union_mode="left_to_right"
    )

    _rbac_configuration: Optional[
        Tuple[UUID, Dict[str, Optional[Set[UUID]]]]
    ] = None

    @field_validator("sort_by", mode="before")
    @classmethod
    def validate_sort_by(cls, value: Any) -> Any:
        """Validate that the sort_column is a valid column with a valid operand.

        Args:
            value: The sort_by field value.

        Returns:
            The validated sort_by field value.

        Raises:
            ValidationError: If the sort_by field is not a string.
            ValueError: If the resource can't be sorted by this field.
        """
        # Somehow pydantic allows you to pass in int values, which will be
        #  interpreted as string, however within the validator they are still
        #  integers, which don't have a .split() method
        if not isinstance(value, str):
            raise ValidationError(
                f"str type expected for the sort_by field. "
                f"Received a {type(value)}"
            )
        column = value
        split_value = value.split(":", 1)
        if len(split_value) == 2:
            column = split_value[1]

            if split_value[0] not in SorterOps.values():
                logger.warning(
                    "Invalid operand used for column sorting. "
                    "Only the following operands are supported `%s`. "
                    "Defaulting to 'asc' on column `%s`.",
                    SorterOps.values(),
                    column,
                )
                value = column

        if column in cls.FILTER_EXCLUDE_FIELDS:
            raise ValueError(
                f"This resource can not be sorted by this field: '{value}'"
            )
        elif column in cls.model_fields:
            return value
        elif column in cls.CUSTOM_SORTING_OPTIONS:
            return value
        else:
            raise ValueError(
                "You can only sort by valid fields of this resource"
            )

    @model_validator(mode="before")
    @classmethod
    @before_validator_handler
    def filter_ops(cls, data: Dict[str, Any]) -> Dict[str, Any]:
        """Parse incoming filters to ensure all filters are legal.

        Args:
            data: The values of the class.

        Returns:
            The values of the class.
        """
        cls._generate_filter_list(data)
        return data

    @property
    def list_of_filters(self) -> List[Filter]:
        """Converts the class variables into a list of usable Filter Models.

        Returns:
            A list of Filter models.
        """
        return self._generate_filter_list(
            {key: getattr(self, key) for key in self.model_fields}
        )

    @property
    def sorting_params(self) -> Tuple[str, SorterOps]:
        """Converts the class variables into a list of usable Filter Models.

        Returns:
            A tuple of the column to sort by and the sorting operand.
        """
        column = self.sort_by
        # The default sorting operand is asc
        operator = SorterOps.ASCENDING

        # Check if user explicitly set an operand
        split_value = self.sort_by.split(":", 1)
        if len(split_value) == 2:
            column = split_value[1]
            operator = SorterOps(split_value[0])

        return column, operator

    def configure_rbac(
        self,
        authenticated_user_id: UUID,
        **column_allowed_ids: Optional[Set[UUID]],
    ) -> None:
        """Configure RBAC allowed column values.

        Args:
            authenticated_user_id: ID of the authenticated user. All entities
                owned by this user will be included.
            column_allowed_ids: Set of IDs per column to limit the query to.
                If given, the remaining filters will be applied to entities
                within this set only. If `None`, the remaining filters will
                be applied to all entries in the table.
        """
        self._rbac_configuration = (authenticated_user_id, column_allowed_ids)

    def generate_rbac_filter(
        self,
        table: Type["AnySchema"],
    ) -> Optional["ColumnElement[bool]"]:
        """Generates an optional RBAC filter.

        Args:
            table: The query table.

        Returns:
            The RBAC filter.
        """
        from sqlmodel import or_

        if not self._rbac_configuration:
            return None

        expressions = []

        for column_name, allowed_ids in self._rbac_configuration[1].items():
            if allowed_ids is not None:
                expression = getattr(table, column_name).in_(allowed_ids)
                expressions.append(expression)

        if expressions and hasattr(table, "user_id"):
            # If `expressions` is not empty, we do not have full access to all
            # rows of the table. In this case, we also include rows which the
            # user owns.

            # Unowned entities are considered server-owned and can be seen
            # by anyone
            expressions.append(getattr(table, "user_id").is_(None))
            # The authenticated user owns this entity
            expressions.append(
                getattr(table, "user_id") == self._rbac_configuration[0]
            )

        if expressions:
            return or_(*expressions)
        else:
            return None

    @classmethod
    def _generate_filter_list(cls, values: Dict[str, Any]) -> List[Filter]:
        """Create a list of filters from a (column, value) dictionary.

        Args:
            values: A dictionary of column names and values to filter on.

        Returns:
            A list of filters.
        """
        list_of_filters: List[Filter] = []

        for key, value in values.items():
            # Ignore excluded filters
            if key in cls.FILTER_EXCLUDE_FIELDS:
                continue

            # Skip filtering for None values
            if value is None:
                continue

            # Determine the operator and filter value
            value, operator = cls._resolve_operator(value)

            # Define the filter
            filter = cls._define_filter(
                column=key, value=value, operator=operator
            )
            list_of_filters.append(filter)

        return list_of_filters

    @staticmethod
    def _resolve_operator(value: Any) -> Tuple[Any, GenericFilterOps]:
        """Determine the operator and filter value from a user-provided value.

        If the user-provided value is a string of the form "operator:value",
        then the operator is extracted and the value is returned. Otherwise,
        `GenericFilterOps.EQUALS` is used as default operator and the value
        is returned as-is.

        Args:
            value: The user-provided value.

        Returns:
            A tuple of the filter value and the operator.
        """
        operator = GenericFilterOps.EQUALS  # Default operator
        if isinstance(value, str):
            split_value = value.split(":", 1)
            if (
                len(split_value) == 2
                and split_value[0] in GenericFilterOps.values()
            ):
                value = split_value[1]
                operator = GenericFilterOps(split_value[0])
        return value, operator

    @classmethod
    def _define_filter(
        cls, column: str, value: Any, operator: GenericFilterOps
    ) -> Filter:
        """Define a filter for a given column.

        Args:
            column: The column to filter on.
            value: The value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.
        """
        # Create datetime filters
        if cls.is_datetime_field(column):
            return cls._define_datetime_filter(
                column=column,
                value=value,
                operator=operator,
            )

        # Create UUID filters
        if cls.is_uuid_field(column):
            return cls._define_uuid_filter(
                column=column,
                value=value,
                operator=operator,
            )

        # Create int filters
        if cls.is_int_field(column):
            return NumericFilter(
                operation=GenericFilterOps(operator),
                column=column,
                value=int(value),
            )

        # Create bool filters
        if cls.is_bool_field(column):
            return cls._define_bool_filter(
                column=column,
                value=value,
                operator=operator,
            )

        # Create str filters
        if cls.is_str_field(column):
            return StrFilter(
                operation=GenericFilterOps(operator),
                column=column,
                value=value,
            )

        # Handle unsupported datatypes
        logger.warning(
            f"The Datatype {cls.model_fields[column].annotation} might not be "
            "supported for filtering. Defaulting to a string filter."
        )
        return StrFilter(
            operation=GenericFilterOps(operator),
            column=column,
            value=str(value),
        )

    @classmethod
    def check_field_annotation(cls, k: str, type_: Any) -> bool:
        """Checks whether a model field has a certain annotation.

        Args:
            k: The name of the field.
            type_: The type to check.

        Raises:
            ValueError: if the model field within does not have an annotation.

        Returns:
            True if the annotation of the field matches the given type, False
            otherwise.
        """
        try:
            annotation = cls.model_fields[k].annotation

            if annotation is not None:
                return (
                    issubclass(type_, get_args(annotation))
                    or annotation is type_
                )
            else:
                raise ValueError(
                    f"The field '{k}' inside the model {cls.__name__} "
                    "does not have an annotation."
                )
        except TypeError:
            return False

    @classmethod
    def is_datetime_field(cls, k: str) -> bool:
        """Checks if it's a datetime field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a datetime field, False otherwise.
        """
        return cls.check_field_annotation(k=k, type_=datetime)

    @classmethod
    def is_uuid_field(cls, k: str) -> bool:
        """Checks if it's a UUID field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a UUID field, False otherwise.
        """
        return cls.check_field_annotation(k=k, type_=UUID)

    @classmethod
    def is_int_field(cls, k: str) -> bool:
        """Checks if it's an int field.

        Args:
            k: The key to check.

        Returns:
            True if the field is an int field, False otherwise.
        """
        return cls.check_field_annotation(k=k, type_=int)

    @classmethod
    def is_bool_field(cls, k: str) -> bool:
        """Checks if it's a bool field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a bool field, False otherwise.
        """
        return cls.check_field_annotation(k=k, type_=bool)

    @classmethod
    def is_str_field(cls, k: str) -> bool:
        """Checks if it's a string field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a string field, False otherwise.
        """
        return cls.check_field_annotation(k=k, type_=str)

    @classmethod
    def is_sort_by_field(cls, k: str) -> bool:
        """Checks if it's a sort by field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a sort by field, False otherwise.
        """
        return cls.check_field_annotation(k=k, type_=str) and k == "sort_by"

    @staticmethod
    def _define_datetime_filter(
        column: str, value: Any, operator: GenericFilterOps
    ) -> NumericFilter:
        """Define a datetime filter for a given column.

        Args:
            column: The column to filter on.
            value: The datetime value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.

        Raises:
            ValueError: If the value is not a valid datetime.
        """
        try:
            if isinstance(value, datetime):
                datetime_value = value
            else:
                datetime_value = datetime.strptime(
                    value, FILTERING_DATETIME_FORMAT
                )
        except ValueError as e:
            raise ValueError(
                "The datetime filter only works with values in the following "
                f"format: {FILTERING_DATETIME_FORMAT}"
            ) from e
        datetime_filter = NumericFilter(
            operation=GenericFilterOps(operator),
            column=column,
            value=datetime_value,
        )
        return datetime_filter

    @staticmethod
    def _define_uuid_filter(
        column: str, value: Any, operator: GenericFilterOps
    ) -> UUIDFilter:
        """Define a UUID filter for a given column.

        Args:
            column: The column to filter on.
            value: The UUID value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.

        Raises:
            ValueError: If the value is not a valid UUID.
        """
        # For equality checks, ensure that the value is a valid UUID.
        if operator == GenericFilterOps.EQUALS and not isinstance(value, UUID):
            try:
                UUID(value)
            except ValueError as e:
                raise ValueError(
                    "Invalid value passed as UUID query parameter."
                ) from e

        # Cast the value to string for further comparisons.
        value = str(value)

        # Generate the filter.
        uuid_filter = UUIDFilter(
            operation=GenericFilterOps(operator),
            column=column,
            value=value,
        )
        return uuid_filter

    @staticmethod
    def _define_bool_filter(
        column: str, value: Any, operator: GenericFilterOps
    ) -> BoolFilter:
        """Define a bool filter for a given column.

        Args:
            column: The column to filter on.
            value: The bool value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.
        """
        if GenericFilterOps(operator) != GenericFilterOps.EQUALS:
            logger.warning(
                "Boolean filters do not support any"
                "operation except for equals. Defaulting"
                "to an `equals` comparison."
            )
        return BoolFilter(
            operation=GenericFilterOps.EQUALS,
            column=column,
            value=bool(value),
        )

    @property
    def offset(self) -> int:
        """Returns the offset needed for the query on the data persistence layer.

        Returns:
            The offset for the query.
        """
        return self.size * (self.page - 1)

    def generate_filter(
        self, table: Type[SQLModel]
    ) -> Union["ColumnElement[bool]"]:
        """Generate the filter for the query.

        Args:
            table: The Table that is being queried from.

        Returns:
            The filter expression for the query.

        Raises:
            RuntimeError: If a valid logical operator is not supplied.
        """
        from sqlmodel import and_, or_

        filters = []
        for column_filter in self.list_of_filters:
            filters.append(
                column_filter.generate_query_conditions(table=table)
            )
        for custom_filter in self.get_custom_filters():
            filters.append(custom_filter)
        if self.logical_operator == LogicalOperators.OR:
            return or_(False, *filters)
        elif self.logical_operator == LogicalOperators.AND:
            return and_(True, *filters)
        else:
            raise RuntimeError("No valid logical operator was supplied.")

    def get_custom_filters(self) -> List["ColumnElement[bool]"]:
        """Get custom filters.

        This can be overridden by subclasses to define custom filters that are
        not based on the columns of the underlying table.

        Returns:
            A list of custom filters.
        """
        return []

    def apply_filter(
        self,
        query: AnyQuery,
        table: Type["AnySchema"],
    ) -> AnyQuery:
        """Applies the filter to a query.

        Args:
            query: The query to which to apply the filter.
            table: The query table.

        Returns:
            The query with filter applied.
        """
        rbac_filter = self.generate_rbac_filter(table=table)

        if rbac_filter is not None:
            query = query.where(rbac_filter)

        filters = self.generate_filter(table=table)

        if filters is not None:
            query = query.where(filters)

        return query

    def apply_sorting(
        self,
        query: AnyQuery,
        table: Type["AnySchema"],
    ) -> AnyQuery:
        """Apply sorting to the query.

        Args:
            query: The query to which to apply the sorting.
            table: The query table.

        Returns:
            The query with sorting applied.
        """
        column, operand = self.sorting_params

        if operand == SorterOps.DESCENDING:
            sort_clause = desc(getattr(table, column))  # type: ignore[var-annotated]
        else:
            sort_clause = asc(getattr(table, column))

        # We always add the `id` column as a tiebreaker to ensure a stable,
        # repeatable order of items, otherwise subsequent pages might contain
        # the same items.
        query = query.order_by(sort_clause, asc(table.id))  # type: ignore[arg-type]

        return query
list_of_filters: List[zenml.models.v2.base.filter.Filter] property readonly

Converts the class variables into a list of usable Filter Models.

Returns:

Type Description
List[zenml.models.v2.base.filter.Filter]

A list of Filter models.

offset: int property readonly

Returns the offset needed for the query on the data persistence layer.

Returns:

Type Description
int

The offset for the query.

sorting_params: Tuple[str, zenml.enums.SorterOps] property readonly

Converts the class variables into a list of usable Filter Models.

Returns:

Type Description
Tuple[str, zenml.enums.SorterOps]

A tuple of the column to sort by and the sorting operand.

apply_filter(self, query, table)

Applies the filter to a query.

Parameters:

Name Type Description Default
query ~AnyQuery

The query to which to apply the filter.

required
table Type[AnySchema]

The query table.

required

Returns:

Type Description
~AnyQuery

The query with filter applied.

Source code in zenml/models/v2/base/page.py
def apply_filter(
    self,
    query: AnyQuery,
    table: Type["AnySchema"],
) -> AnyQuery:
    """Applies the filter to a query.

    Args:
        query: The query to which to apply the filter.
        table: The query table.

    Returns:
        The query with filter applied.
    """
    rbac_filter = self.generate_rbac_filter(table=table)

    if rbac_filter is not None:
        query = query.where(rbac_filter)

    filters = self.generate_filter(table=table)

    if filters is not None:
        query = query.where(filters)

    return query
apply_sorting(self, query, table)

Apply sorting to the query.

Parameters:

Name Type Description Default
query ~AnyQuery

The query to which to apply the sorting.

required
table Type[AnySchema]

The query table.

required

Returns:

Type Description
~AnyQuery

The query with sorting applied.

Source code in zenml/models/v2/base/page.py
def apply_sorting(
    self,
    query: AnyQuery,
    table: Type["AnySchema"],
) -> AnyQuery:
    """Apply sorting to the query.

    Args:
        query: The query to which to apply the sorting.
        table: The query table.

    Returns:
        The query with sorting applied.
    """
    column, operand = self.sorting_params

    if operand == SorterOps.DESCENDING:
        sort_clause = desc(getattr(table, column))  # type: ignore[var-annotated]
    else:
        sort_clause = asc(getattr(table, column))

    # We always add the `id` column as a tiebreaker to ensure a stable,
    # repeatable order of items, otherwise subsequent pages might contain
    # the same items.
    query = query.order_by(sort_clause, asc(table.id))  # type: ignore[arg-type]

    return query
check_field_annotation(k, type_) classmethod

Checks whether a model field has a certain annotation.

Parameters:

Name Type Description Default
k str

The name of the field.

required
type_ Any

The type to check.

required

Exceptions:

Type Description
ValueError

if the model field within does not have an annotation.

Returns:

Type Description
bool

True if the annotation of the field matches the given type, False otherwise.

Source code in zenml/models/v2/base/page.py
@classmethod
def check_field_annotation(cls, k: str, type_: Any) -> bool:
    """Checks whether a model field has a certain annotation.

    Args:
        k: The name of the field.
        type_: The type to check.

    Raises:
        ValueError: if the model field within does not have an annotation.

    Returns:
        True if the annotation of the field matches the given type, False
        otherwise.
    """
    try:
        annotation = cls.model_fields[k].annotation

        if annotation is not None:
            return (
                issubclass(type_, get_args(annotation))
                or annotation is type_
            )
        else:
            raise ValueError(
                f"The field '{k}' inside the model {cls.__name__} "
                "does not have an annotation."
            )
    except TypeError:
        return False
configure_rbac(self, authenticated_user_id, **column_allowed_ids)

Configure RBAC allowed column values.

Parameters:

Name Type Description Default
authenticated_user_id UUID

ID of the authenticated user. All entities owned by this user will be included.

required
column_allowed_ids Optional[Set[uuid.UUID]]

Set of IDs per column to limit the query to. If given, the remaining filters will be applied to entities within this set only. If None, the remaining filters will be applied to all entries in the table.

{}
Source code in zenml/models/v2/base/page.py
def configure_rbac(
    self,
    authenticated_user_id: UUID,
    **column_allowed_ids: Optional[Set[UUID]],
) -> None:
    """Configure RBAC allowed column values.

    Args:
        authenticated_user_id: ID of the authenticated user. All entities
            owned by this user will be included.
        column_allowed_ids: Set of IDs per column to limit the query to.
            If given, the remaining filters will be applied to entities
            within this set only. If `None`, the remaining filters will
            be applied to all entries in the table.
    """
    self._rbac_configuration = (authenticated_user_id, column_allowed_ids)
filter_ops(data, validation_info) classmethod

Wrapper method to handle the raw data.

Parameters:

Name Type Description Default
cls

the class handler

required
data Any

the raw input data

required
validation_info ValidationInfo

the context of the validation.

required

Returns:

Type Description
Any

the validated data

Source code in zenml/models/v2/base/page.py
def before_validator(
    cls: Type[BaseModel], data: Any, validation_info: ValidationInfo
) -> Any:
    """Wrapper method to handle the raw data.

    Args:
        cls: the class handler
        data: the raw input data
        validation_info: the context of the validation.

    Returns:
        the validated data
    """
    data = model_validator_data_handler(
        raw_data=data, base_class=cls, validation_info=validation_info
    )
    return method(cls=cls, data=data)
generate_filter(self, table)

Generate the filter for the query.

Parameters:

Name Type Description Default
table Type[sqlmodel.main.SQLModel]

The Table that is being queried from.

required

Returns:

Type Description
ColumnElement[bool]

The filter expression for the query.

Exceptions:

Type Description
RuntimeError

If a valid logical operator is not supplied.

Source code in zenml/models/v2/base/page.py
def generate_filter(
    self, table: Type[SQLModel]
) -> Union["ColumnElement[bool]"]:
    """Generate the filter for the query.

    Args:
        table: The Table that is being queried from.

    Returns:
        The filter expression for the query.

    Raises:
        RuntimeError: If a valid logical operator is not supplied.
    """
    from sqlmodel import and_, or_

    filters = []
    for column_filter in self.list_of_filters:
        filters.append(
            column_filter.generate_query_conditions(table=table)
        )
    for custom_filter in self.get_custom_filters():
        filters.append(custom_filter)
    if self.logical_operator == LogicalOperators.OR:
        return or_(False, *filters)
    elif self.logical_operator == LogicalOperators.AND:
        return and_(True, *filters)
    else:
        raise RuntimeError("No valid logical operator was supplied.")
generate_rbac_filter(self, table)

Generates an optional RBAC filter.

Parameters:

Name Type Description Default
table Type[AnySchema]

The query table.

required

Returns:

Type Description
Optional[ColumnElement[bool]]

The RBAC filter.

Source code in zenml/models/v2/base/page.py
def generate_rbac_filter(
    self,
    table: Type["AnySchema"],
) -> Optional["ColumnElement[bool]"]:
    """Generates an optional RBAC filter.

    Args:
        table: The query table.

    Returns:
        The RBAC filter.
    """
    from sqlmodel import or_

    if not self._rbac_configuration:
        return None

    expressions = []

    for column_name, allowed_ids in self._rbac_configuration[1].items():
        if allowed_ids is not None:
            expression = getattr(table, column_name).in_(allowed_ids)
            expressions.append(expression)

    if expressions and hasattr(table, "user_id"):
        # If `expressions` is not empty, we do not have full access to all
        # rows of the table. In this case, we also include rows which the
        # user owns.

        # Unowned entities are considered server-owned and can be seen
        # by anyone
        expressions.append(getattr(table, "user_id").is_(None))
        # The authenticated user owns this entity
        expressions.append(
            getattr(table, "user_id") == self._rbac_configuration[0]
        )

    if expressions:
        return or_(*expressions)
    else:
        return None
get_custom_filters(self)

Get custom filters.

This can be overridden by subclasses to define custom filters that are not based on the columns of the underlying table.

Returns:

Type Description
List[ColumnElement[bool]]

A list of custom filters.

Source code in zenml/models/v2/base/page.py
def get_custom_filters(self) -> List["ColumnElement[bool]"]:
    """Get custom filters.

    This can be overridden by subclasses to define custom filters that are
    not based on the columns of the underlying table.

    Returns:
        A list of custom filters.
    """
    return []
is_bool_field(k) classmethod

Checks if it's a bool field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a bool field, False otherwise.

Source code in zenml/models/v2/base/page.py
@classmethod
def is_bool_field(cls, k: str) -> bool:
    """Checks if it's a bool field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a bool field, False otherwise.
    """
    return cls.check_field_annotation(k=k, type_=bool)
is_datetime_field(k) classmethod

Checks if it's a datetime field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a datetime field, False otherwise.

Source code in zenml/models/v2/base/page.py
@classmethod
def is_datetime_field(cls, k: str) -> bool:
    """Checks if it's a datetime field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a datetime field, False otherwise.
    """
    return cls.check_field_annotation(k=k, type_=datetime)
is_int_field(k) classmethod

Checks if it's an int field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is an int field, False otherwise.

Source code in zenml/models/v2/base/page.py
@classmethod
def is_int_field(cls, k: str) -> bool:
    """Checks if it's an int field.

    Args:
        k: The key to check.

    Returns:
        True if the field is an int field, False otherwise.
    """
    return cls.check_field_annotation(k=k, type_=int)
is_sort_by_field(k) classmethod

Checks if it's a sort by field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a sort by field, False otherwise.

Source code in zenml/models/v2/base/page.py
@classmethod
def is_sort_by_field(cls, k: str) -> bool:
    """Checks if it's a sort by field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a sort by field, False otherwise.
    """
    return cls.check_field_annotation(k=k, type_=str) and k == "sort_by"
is_str_field(k) classmethod

Checks if it's a string field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a string field, False otherwise.

Source code in zenml/models/v2/base/page.py
@classmethod
def is_str_field(cls, k: str) -> bool:
    """Checks if it's a string field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a string field, False otherwise.
    """
    return cls.check_field_annotation(k=k, type_=str)
is_uuid_field(k) classmethod

Checks if it's a UUID field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a UUID field, False otherwise.

Source code in zenml/models/v2/base/page.py
@classmethod
def is_uuid_field(cls, k: str) -> bool:
    """Checks if it's a UUID field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a UUID field, False otherwise.
    """
    return cls.check_field_annotation(k=k, type_=UUID)
model_post_init(self, __context)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Parameters:

Name Type Description Default
self BaseModel

The BaseModel instance.

required
__context Any

The context.

required
Source code in zenml/models/v2/base/page.py
def init_private_attributes(self: BaseModel, __context: Any) -> None:
    """This function is meant to behave like a BaseModel method to initialise private attributes.

    It takes context as an argument since that's what pydantic-core passes when calling it.

    Args:
        self: The BaseModel instance.
        __context: The context.
    """
    if getattr(self, '__pydantic_private__', None) is None:
        pydantic_private = {}
        for name, private_attr in self.__private_attributes__.items():
            default = private_attr.get_default()
            if default is not PydanticUndefined:
                pydantic_private[name] = default
        object_setattr(self, '__pydantic_private__', pydantic_private)
validate_sort_by(value) classmethod

Validate that the sort_column is a valid column with a valid operand.

Parameters:

Name Type Description Default
value Any

The sort_by field value.

required

Returns:

Type Description
Any

The validated sort_by field value.

Exceptions:

Type Description
ValidationError

If the sort_by field is not a string.

ValueError

If the resource can't be sorted by this field.

Source code in zenml/models/v2/base/page.py
@field_validator("sort_by", mode="before")
@classmethod
def validate_sort_by(cls, value: Any) -> Any:
    """Validate that the sort_column is a valid column with a valid operand.

    Args:
        value: The sort_by field value.

    Returns:
        The validated sort_by field value.

    Raises:
        ValidationError: If the sort_by field is not a string.
        ValueError: If the resource can't be sorted by this field.
    """
    # Somehow pydantic allows you to pass in int values, which will be
    #  interpreted as string, however within the validator they are still
    #  integers, which don't have a .split() method
    if not isinstance(value, str):
        raise ValidationError(
            f"str type expected for the sort_by field. "
            f"Received a {type(value)}"
        )
    column = value
    split_value = value.split(":", 1)
    if len(split_value) == 2:
        column = split_value[1]

        if split_value[0] not in SorterOps.values():
            logger.warning(
                "Invalid operand used for column sorting. "
                "Only the following operands are supported `%s`. "
                "Defaulting to 'asc' on column `%s`.",
                SorterOps.values(),
                column,
            )
            value = column

    if column in cls.FILTER_EXCLUDE_FIELDS:
        raise ValueError(
            f"This resource can not be sorted by this field: '{value}'"
        )
    elif column in cls.model_fields:
        return value
    elif column in cls.CUSTOM_SORTING_OPTIONS:
        return value
    else:
        raise ValueError(
            "You can only sort by valid fields of this resource"
        )
__contains__(self, item) special

Returns whether the page contains a specific item.

This enables item in page checks.

Parameters:

Name Type Description Default
item ~B

The item to check for.

required

Returns:

Type Description
bool

Whether the item is in the page.

Source code in zenml/models/v2/base/page.py
def __contains__(self, item: B) -> bool:
    """Returns whether the page contains a specific item.

    This enables `item in page` checks.

    Args:
        item: The item to check for.

    Returns:
        Whether the item is in the page.
    """
    return item in self.items
__getitem__(self, index) special

Return the item at the given index.

This enables page[index].

Parameters:

Name Type Description Default
index int

The index to get the item from.

required

Returns:

Type Description
~B

The item at the given index.

Source code in zenml/models/v2/base/page.py
def __getitem__(self, index: int) -> B:
    """Return the item at the given index.

    This enables `page[index]`.

    Args:
        index: The index to get the item from.

    Returns:
        The item at the given index.
    """
    return self.items[index]
__iter__(self) special

Return an iterator over the items in the page.

This enables for item in page loops, but breaks dict(page).

Yields:

Type Description
Generator[~B, NoneType, NoneType]

An iterator over the items in the page.

Source code in zenml/models/v2/base/page.py
def __iter__(self) -> Generator[B, None, None]:  # type: ignore[override]
    """Return an iterator over the items in the page.

    This enables `for item in page` loops, but breaks `dict(page)`.

    Yields:
        An iterator over the items in the page.
    """
    for item in self.items.__iter__():
        yield item
__len__(self) special

Return the item count of the page.

This enables len(page).

Returns:

Type Description
int

The amount of items in the page.

Source code in zenml/models/v2/base/page.py
def __len__(self) -> int:
    """Return the item count of the page.

    This enables `len(page)`.

    Returns:
        The amount of items in the page.
    """
    return len(self.items)
scoped

Scoped model definitions.

UserScopedFilter (BaseFilter)

Model to enable advanced user-based scoping.

Source code in zenml/models/v2/base/scoped.py
class UserScopedFilter(BaseFilter):
    """Model to enable advanced user-based scoping."""

    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *BaseFilter.FILTER_EXCLUDE_FIELDS,
        "scope_user",
    ]
    CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *BaseFilter.CLI_EXCLUDE_FIELDS,
        "scope_user",
    ]
    scope_user: Optional[UUID] = Field(
        default=None,
        description="The user to scope this query to.",
    )

    def set_scope_user(self, user_id: UUID) -> None:
        """Set the user that is performing the filtering to scope the response.

        Args:
            user_id: The user ID to scope the response to.
        """
        self.scope_user = user_id

    def apply_filter(
        self,
        query: AnyQuery,
        table: Type["AnySchema"],
    ) -> AnyQuery:
        """Applies the filter to a query.

        Args:
            query: The query to which to apply the filter.
            table: The query table.

        Returns:
            The query with filter applied.
        """
        query = super().apply_filter(query=query, table=table)

        if self.scope_user:
            query = query.where(getattr(table, "user_id") == self.scope_user)

        return query
apply_filter(self, query, table)

Applies the filter to a query.

Parameters:

Name Type Description Default
query ~AnyQuery

The query to which to apply the filter.

required
table Type[AnySchema]

The query table.

required

Returns:

Type Description
~AnyQuery

The query with filter applied.

Source code in zenml/models/v2/base/scoped.py
def apply_filter(
    self,
    query: AnyQuery,
    table: Type["AnySchema"],
) -> AnyQuery:
    """Applies the filter to a query.

    Args:
        query: The query to which to apply the filter.
        table: The query table.

    Returns:
        The query with filter applied.
    """
    query = super().apply_filter(query=query, table=table)

    if self.scope_user:
        query = query.where(getattr(table, "user_id") == self.scope_user)

    return query
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
set_scope_user(self, user_id)

Set the user that is performing the filtering to scope the response.

Parameters:

Name Type Description Default
user_id UUID

The user ID to scope the response to.

required
Source code in zenml/models/v2/base/scoped.py
def set_scope_user(self, user_id: UUID) -> None:
    """Set the user that is performing the filtering to scope the response.

    Args:
        user_id: The user ID to scope the response to.
    """
    self.scope_user = user_id
UserScopedRequest (BaseRequest)

Base user-owned request model.

Used as a base class for all domain models that are "owned" by a user.

Source code in zenml/models/v2/base/scoped.py
class UserScopedRequest(BaseRequest):
    """Base user-owned request model.

    Used as a base class for all domain models that are "owned" by a user.
    """

    user: UUID = Field(title="The id of the user that created this resource.")

    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Fetches the analytics metadata for user scoped models.

        Returns:
            The analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        metadata["user_id"] = self.user
        return metadata
get_analytics_metadata(self)

Fetches the analytics metadata for user scoped models.

Returns:

Type Description
Dict[str, Any]

The analytics metadata.

Source code in zenml/models/v2/base/scoped.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Fetches the analytics metadata for user scoped models.

    Returns:
        The analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    metadata["user_id"] = self.user
    return metadata
UserScopedResponse (BaseIdentifiedResponse[~UserBody, ~UserMetadata, ~UserResources], Generic)

Base user-owned model.

Used as a base class for all domain models that are "owned" by a user.

Source code in zenml/models/v2/base/scoped.py
class UserScopedResponse(
    BaseIdentifiedResponse[UserBody, UserMetadata, UserResources],
    Generic[UserBody, UserMetadata, UserResources],
):
    """Base user-owned model.

    Used as a base class for all domain models that are "owned" by a user.
    """

    # Analytics
    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Fetches the analytics metadata for user scoped models.

        Returns:
            The analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        if self.user is not None:
            metadata["user_id"] = self.user.id
        return metadata

    # Body and metadata properties
    @property
    def user(self) -> Optional["UserResponse"]:
        """The `user` property.

        Returns:
            the value of the property.
        """
        return self.get_body().user
user: Optional[UserResponse] property readonly

The user property.

Returns:

Type Description
Optional[UserResponse]

the value of the property.

get_analytics_metadata(self)

Fetches the analytics metadata for user scoped models.

Returns:

Type Description
Dict[str, Any]

The analytics metadata.

Source code in zenml/models/v2/base/scoped.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Fetches the analytics metadata for user scoped models.

    Returns:
        The analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    if self.user is not None:
        metadata["user_id"] = self.user.id
    return metadata
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
UserScopedResponseBody (BaseDatedResponseBody)

Base user-owned body.

Source code in zenml/models/v2/base/scoped.py
class UserScopedResponseBody(BaseDatedResponseBody):
    """Base user-owned body."""

    user: Optional["UserResponse"] = Field(
        title="The user who created this resource.", default=None
    )
UserScopedResponseMetadata (BaseResponseMetadata)

Base user-owned metadata.

Source code in zenml/models/v2/base/scoped.py
class UserScopedResponseMetadata(BaseResponseMetadata):
    """Base user-owned metadata."""
UserScopedResponseResources (BaseResponseResources)

Base class for all resource models associated with the user.

Source code in zenml/models/v2/base/scoped.py
class UserScopedResponseResources(BaseResponseResources):
    """Base class for all resource models associated with the user."""
UserScopedResponse[FlavorResponseBody, FlavorResponseMetadata, FlavorResponseResources] (UserScopedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
UserScopedResponse[OAuthDeviceResponseBody, OAuthDeviceResponseMetadata, OAuthDeviceResponseResources] (UserScopedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
UserScopedResponse[~WorkspaceBody, ~WorkspaceMetadata, ~WorkspaceResources] (UserScopedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
WorkspaceScopedFilter (BaseFilter)

Model to enable advanced scoping with workspace.

Source code in zenml/models/v2/base/scoped.py
class WorkspaceScopedFilter(BaseFilter):
    """Model to enable advanced scoping with workspace."""

    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *BaseFilter.FILTER_EXCLUDE_FIELDS,
        "scope_workspace",
    ]
    CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *BaseFilter.CLI_EXCLUDE_FIELDS,
        "scope_workspace",
    ]
    scope_workspace: Optional[UUID] = Field(
        default=None,
        description="The workspace to scope this query to.",
    )

    def set_scope_workspace(self, workspace_id: UUID) -> None:
        """Set the workspace to scope this response.

        Args:
            workspace_id: The workspace to scope this response to.
        """
        self.scope_workspace = workspace_id

    def apply_filter(
        self,
        query: AnyQuery,
        table: Type["AnySchema"],
    ) -> AnyQuery:
        """Applies the filter to a query.

        Args:
            query: The query to which to apply the filter.
            table: The query table.

        Returns:
            The query with filter applied.
        """
        from sqlmodel import or_

        query = super().apply_filter(query=query, table=table)

        if self.scope_workspace:
            scope_filter = or_(
                getattr(table, "workspace_id") == self.scope_workspace,
                getattr(table, "workspace_id").is_(None),
            )
            query = query.where(scope_filter)

        return query
apply_filter(self, query, table)

Applies the filter to a query.

Parameters:

Name Type Description Default
query ~AnyQuery

The query to which to apply the filter.

required
table Type[AnySchema]

The query table.

required

Returns:

Type Description
~AnyQuery

The query with filter applied.

Source code in zenml/models/v2/base/scoped.py
def apply_filter(
    self,
    query: AnyQuery,
    table: Type["AnySchema"],
) -> AnyQuery:
    """Applies the filter to a query.

    Args:
        query: The query to which to apply the filter.
        table: The query table.

    Returns:
        The query with filter applied.
    """
    from sqlmodel import or_

    query = super().apply_filter(query=query, table=table)

    if self.scope_workspace:
        scope_filter = or_(
            getattr(table, "workspace_id") == self.scope_workspace,
            getattr(table, "workspace_id").is_(None),
        )
        query = query.where(scope_filter)

    return query
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
set_scope_workspace(self, workspace_id)

Set the workspace to scope this response.

Parameters:

Name Type Description Default
workspace_id UUID

The workspace to scope this response to.

required
Source code in zenml/models/v2/base/scoped.py
def set_scope_workspace(self, workspace_id: UUID) -> None:
    """Set the workspace to scope this response.

    Args:
        workspace_id: The workspace to scope this response to.
    """
    self.scope_workspace = workspace_id
WorkspaceScopedRequest (UserScopedRequest)

Base workspace-scoped request domain model.

Used as a base class for all domain models that are workspace-scoped.

Source code in zenml/models/v2/base/scoped.py
class WorkspaceScopedRequest(UserScopedRequest):
    """Base workspace-scoped request domain model.

    Used as a base class for all domain models that are workspace-scoped.
    """

    workspace: UUID = Field(
        title="The workspace to which this resource belongs."
    )

    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Fetches the analytics metadata for workspace scoped models.

        Returns:
            The analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        metadata["workspace_id"] = self.workspace
        return metadata
get_analytics_metadata(self)

Fetches the analytics metadata for workspace scoped models.

Returns:

Type Description
Dict[str, Any]

The analytics metadata.

Source code in zenml/models/v2/base/scoped.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Fetches the analytics metadata for workspace scoped models.

    Returns:
        The analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    metadata["workspace_id"] = self.workspace
    return metadata
WorkspaceScopedResponse (UserScopedResponse[~WorkspaceBody, ~WorkspaceMetadata, ~WorkspaceResources], Generic)

Base workspace-scoped domain model.

Used as a base class for all domain models that are workspace-scoped.

Source code in zenml/models/v2/base/scoped.py
class WorkspaceScopedResponse(
    UserScopedResponse[WorkspaceBody, WorkspaceMetadata, WorkspaceResources],
    Generic[WorkspaceBody, WorkspaceMetadata, WorkspaceResources],
):
    """Base workspace-scoped domain model.

    Used as a base class for all domain models that are workspace-scoped.
    """

    # Body and metadata properties
    @property
    def workspace(self) -> "WorkspaceResponse":
        """The workspace property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().workspace
workspace: WorkspaceResponse property readonly

The workspace property.

Returns:

Type Description
WorkspaceResponse

the value of the property.

model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
WorkspaceScopedResponseBody (UserScopedResponseBody)

Base workspace-scoped body.

Source code in zenml/models/v2/base/scoped.py
class WorkspaceScopedResponseBody(UserScopedResponseBody):
    """Base workspace-scoped body."""
WorkspaceScopedResponseMetadata (UserScopedResponseMetadata)

Base workspace-scoped metadata.

Source code in zenml/models/v2/base/scoped.py
class WorkspaceScopedResponseMetadata(UserScopedResponseMetadata):
    """Base workspace-scoped metadata."""

    workspace: "WorkspaceResponse" = Field(
        title="The workspace of this resource."
    )
WorkspaceScopedResponseResources (UserScopedResponseResources)

Base workspace-scoped resources.

Source code in zenml/models/v2/base/scoped.py
class WorkspaceScopedResponseResources(UserScopedResponseResources):
    """Base workspace-scoped resources."""
WorkspaceScopedResponse[ActionResponseBody, ActionResponseMetadata, ActionResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
WorkspaceScopedResponse[ArtifactVersionResponseBody, ArtifactVersionResponseMetadata, ArtifactVersionResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
WorkspaceScopedResponse[CodeRepositoryResponseBody, CodeRepositoryResponseMetadata, CodeRepositoryResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
WorkspaceScopedResponse[ComponentResponseBody, ComponentResponseMetadata, ComponentResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
WorkspaceScopedResponse[EventSourceResponseBody, EventSourceResponseMetadata, EventSourceResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
WorkspaceScopedResponse[ModelResponseBody, ModelResponseMetadata, ModelResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
WorkspaceScopedResponse[ModelVersionResponseBody, ModelVersionResponseMetadata, ModelVersionResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
WorkspaceScopedResponse[PipelineBuildResponseBody, PipelineBuildResponseMetadata, PipelineBuildResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
WorkspaceScopedResponse[PipelineDeploymentResponseBody, PipelineDeploymentResponseMetadata, PipelineDeploymentResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
WorkspaceScopedResponse[PipelineResponseBody, PipelineResponseMetadata, PipelineResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
WorkspaceScopedResponse[PipelineRunResponseBody, PipelineRunResponseMetadata, PipelineRunResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
WorkspaceScopedResponse[RunMetadataResponseBody, RunMetadataResponseMetadata, RunMetadataResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
WorkspaceScopedResponse[RunTemplateResponseBody, RunTemplateResponseMetadata, RunTemplateResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
WorkspaceScopedResponse[ScheduleResponseBody, ScheduleResponseMetadata, ScheduleResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
WorkspaceScopedResponse[SecretResponseBody, SecretResponseMetadata, SecretResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
WorkspaceScopedResponse[ServiceConnectorResponseBody, ServiceConnectorResponseMetadata, ServiceConnectorResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
WorkspaceScopedResponse[ServiceResponseBody, ServiceResponseMetadata, ServiceResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
WorkspaceScopedResponse[StackResponseBody, StackResponseMetadata, StackResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
WorkspaceScopedResponse[StepRunResponseBody, StepRunResponseMetadata, StepRunResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
WorkspaceScopedResponse[TriggerResponseBody, TriggerResponseMetadata, TriggerResponseResources] (WorkspaceScopedResponse)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
WorkspaceScopedTaggableFilter (WorkspaceScopedFilter)

Model to enable advanced scoping with workspace and tagging.

Source code in zenml/models/v2/base/scoped.py
class WorkspaceScopedTaggableFilter(WorkspaceScopedFilter):
    """Model to enable advanced scoping with workspace and tagging."""

    tag: Optional[str] = Field(
        description="Tag to apply to the filter query.", default=None
    )

    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *WorkspaceScopedFilter.FILTER_EXCLUDE_FIELDS,
        "tag",
    ]

    def apply_filter(
        self,
        query: AnyQuery,
        table: Type["AnySchema"],
    ) -> AnyQuery:
        """Applies the filter to a query.

        Args:
            query: The query to which to apply the filter.
            table: The query table.

        Returns:
            The query with filter applied.
        """
        from zenml.zen_stores.schemas import TagResourceSchema

        query = super().apply_filter(query=query, table=table)
        if self.tag:
            query = (
                query.join(getattr(table, "tags"))
                .join(TagResourceSchema.tag)
                .distinct()
            )

        return query

    def get_custom_filters(self) -> List["ColumnElement[bool]"]:
        """Get custom tag filters.

        Returns:
            A list of custom filters.
        """
        from zenml.zen_stores.schemas import TagSchema

        custom_filters = super().get_custom_filters()
        if self.tag:
            custom_filters.append(col(TagSchema.name) == self.tag)

        return custom_filters
apply_filter(self, query, table)

Applies the filter to a query.

Parameters:

Name Type Description Default
query ~AnyQuery

The query to which to apply the filter.

required
table Type[AnySchema]

The query table.

required

Returns:

Type Description
~AnyQuery

The query with filter applied.

Source code in zenml/models/v2/base/scoped.py
def apply_filter(
    self,
    query: AnyQuery,
    table: Type["AnySchema"],
) -> AnyQuery:
    """Applies the filter to a query.

    Args:
        query: The query to which to apply the filter.
        table: The query table.

    Returns:
        The query with filter applied.
    """
    from zenml.zen_stores.schemas import TagResourceSchema

    query = super().apply_filter(query=query, table=table)
    if self.tag:
        query = (
            query.join(getattr(table, "tags"))
            .join(TagResourceSchema.tag)
            .distinct()
        )

    return query
get_custom_filters(self)

Get custom tag filters.

Returns:

Type Description
List[ColumnElement[bool]]

A list of custom filters.

Source code in zenml/models/v2/base/scoped.py
def get_custom_filters(self) -> List["ColumnElement[bool]"]:
    """Get custom tag filters.

    Returns:
        A list of custom filters.
    """
    from zenml.zen_stores.schemas import TagSchema

    custom_filters = super().get_custom_filters()
    if self.tag:
        custom_filters.append(col(TagSchema.name) == self.tag)

    return custom_filters
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/base/scoped.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)

core special

action

Collection of all models concerning actions.

ActionFilter (WorkspaceScopedFilter)

Model to enable advanced filtering of all actions.

Source code in zenml/models/v2/core/action.py
class ActionFilter(WorkspaceScopedFilter):
    """Model to enable advanced filtering of all actions."""

    name: Optional[str] = Field(
        default=None,
        description="Name of the action.",
    )
    flavor: Optional[str] = Field(
        default=None,
        title="The flavor of the action.",
    )
    plugin_subtype: Optional[str] = Field(
        default=None,
        title="The subtype of the action.",
    )
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/core/action.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
ActionRequest (WorkspaceScopedRequest)

Model for creating a new action.

Source code in zenml/models/v2/core/action.py
class ActionRequest(WorkspaceScopedRequest):
    """Model for creating a new action."""

    name: str = Field(
        title="The name of the action.", max_length=STR_FIELD_MAX_LENGTH
    )
    description: str = Field(
        default="",
        title="The description of the action",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    flavor: str = Field(
        title="The flavor of the action.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    plugin_subtype: PluginSubType = Field(
        title="The subtype of the action.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    configuration: Dict[str, Any] = Field(
        title="The configuration for the action.",
    )
    service_account_id: UUID = Field(
        title="The service account that is used to execute the action.",
    )
    auth_window: Optional[int] = Field(
        default=None,
        title="The time window in minutes for which the service account is "
        "authorized to execute the action. Set this to 0 to authorize the "
        "service account indefinitely (not recommended). If not set, a "
        "default value defined for each individual action type is used.",
    )
ActionResponse (WorkspaceScopedResponse[ActionResponseBody, ActionResponseMetadata, ActionResponseResources])

Response model for actions.

Source code in zenml/models/v2/core/action.py
class ActionResponse(
    WorkspaceScopedResponse[
        ActionResponseBody, ActionResponseMetadata, ActionResponseResources
    ]
):
    """Response model for actions."""

    name: str = Field(
        title="The name of the action.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

    def get_hydrated_version(self) -> "ActionResponse":
        """Get the hydrated version of this action.

        Returns:
            An instance of the same entity with the metadata field attached.
        """
        from zenml.client import Client

        return Client().zen_store.get_action(self.id)

    # Body and metadata properties
    @property
    def flavor(self) -> str:
        """The `flavor` property.

        Returns:
            the value of the property.
        """
        return self.get_body().flavor

    @property
    def plugin_subtype(self) -> PluginSubType:
        """The `plugin_subtype` property.

        Returns:
            the value of the property.
        """
        return self.get_body().plugin_subtype

    @property
    def description(self) -> str:
        """The `description` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().description

    @property
    def auth_window(self) -> int:
        """The `auth_window` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().auth_window

    @property
    def configuration(self) -> Dict[str, Any]:
        """The `configuration` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().configuration

    def set_configuration(self, configuration: Dict[str, Any]) -> None:
        """Set the `configuration` property.

        Args:
            configuration: The value to set.
        """
        self.get_metadata().configuration = configuration

    # Resource properties
    @property
    def service_account(self) -> "UserResponse":
        """The `service_account` property.

        Returns:
            the value of the property.
        """
        return self.get_resources().service_account
auth_window: int property readonly

The auth_window property.

Returns:

Type Description
int

the value of the property.

configuration: Dict[str, Any] property readonly

The configuration property.

Returns:

Type Description
Dict[str, Any]

the value of the property.

description: str property readonly

The description property.

Returns:

Type Description
str

the value of the property.

flavor: str property readonly

The flavor property.

Returns:

Type Description
str

the value of the property.

plugin_subtype: PluginSubType property readonly

The plugin_subtype property.

Returns:

Type Description
PluginSubType

the value of the property.

service_account: UserResponse property readonly

The service_account property.

Returns:

Type Description
UserResponse

the value of the property.

get_hydrated_version(self)

Get the hydrated version of this action.

Returns:

Type Description
ActionResponse

An instance of the same entity with the metadata field attached.

Source code in zenml/models/v2/core/action.py
def get_hydrated_version(self) -> "ActionResponse":
    """Get the hydrated version of this action.

    Returns:
        An instance of the same entity with the metadata field attached.
    """
    from zenml.client import Client

    return Client().zen_store.get_action(self.id)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/core/action.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
set_configuration(self, configuration)

Set the configuration property.

Parameters:

Name Type Description Default
configuration Dict[str, Any]

The value to set.

required
Source code in zenml/models/v2/core/action.py
def set_configuration(self, configuration: Dict[str, Any]) -> None:
    """Set the `configuration` property.

    Args:
        configuration: The value to set.
    """
    self.get_metadata().configuration = configuration
ActionResponseBody (WorkspaceScopedResponseBody)

Response body for actions.

Source code in zenml/models/v2/core/action.py
class ActionResponseBody(WorkspaceScopedResponseBody):
    """Response body for actions."""

    flavor: str = Field(
        title="The flavor of the action.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    plugin_subtype: PluginSubType = Field(
        title="The subtype of the action.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
ActionResponseMetadata (WorkspaceScopedResponseMetadata)

Response metadata for actions.

Source code in zenml/models/v2/core/action.py
class ActionResponseMetadata(WorkspaceScopedResponseMetadata):
    """Response metadata for actions."""

    description: str = Field(
        default="",
        title="The description of the action.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    configuration: Dict[str, Any] = Field(
        title="The configuration for the action.",
    )
    auth_window: int = Field(
        title="The time window in minutes for which the service account is "
        "authorized to execute the action."
    )
ActionResponseResources (WorkspaceScopedResponseResources)

Class for all resource models associated with the action entity.

Source code in zenml/models/v2/core/action.py
class ActionResponseResources(WorkspaceScopedResponseResources):
    """Class for all resource models associated with the action entity."""

    service_account: UserResponse = Field(
        title="The service account that is used to execute the action.",
    )
ActionUpdate (BaseUpdate)

Update model for actions.

Source code in zenml/models/v2/core/action.py
class ActionUpdate(BaseUpdate):
    """Update model for actions."""

    name: Optional[str] = Field(
        default=None,
        title="The new name for the action.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    description: Optional[str] = Field(
        default=None,
        title="The new description for the action.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    configuration: Optional[Dict[str, Any]] = Field(
        default=None,
        title="The configuration for the action.",
    )
    service_account_id: Optional[UUID] = Field(
        default=None,
        title="The service account that is used to execute the action.",
    )
    auth_window: Optional[int] = Field(
        default=None,
        title="The time window in minutes for which the service account is "
        "authorized to execute the action. Set this to 0 to authorize the "
        "service account indefinitely (not recommended). If not set, a "
        "default value defined for each individual action type is used.",
    )

    @classmethod
    def from_response(cls, response: "ActionResponse") -> "ActionUpdate":
        """Create an update model from a response model.

        Args:
            response: The response model to create the update model from.

        Returns:
            The update model.
        """
        return ActionUpdate(
            configuration=copy.deepcopy(response.configuration),
        )
from_response(response) classmethod

Create an update model from a response model.

Parameters:

Name Type Description Default
response ActionResponse

The response model to create the update model from.

required

Returns:

Type Description
ActionUpdate

The update model.

Source code in zenml/models/v2/core/action.py
@classmethod
def from_response(cls, response: "ActionResponse") -> "ActionUpdate":
    """Create an update model from a response model.

    Args:
        response: The response model to create the update model from.

    Returns:
        The update model.
    """
    return ActionUpdate(
        configuration=copy.deepcopy(response.configuration),
    )
action_flavor

Action flavor model definitions.

ActionFlavorResponse (BasePluginFlavorResponse[ActionFlavorResponseBody, ActionFlavorResponseMetadata, ActionFlavorResponseResources])

Response model for Action Flavors.

Source code in zenml/models/v2/core/action_flavor.py
class ActionFlavorResponse(
    BasePluginFlavorResponse[
        ActionFlavorResponseBody,
        ActionFlavorResponseMetadata,
        ActionFlavorResponseResources,
    ]
):
    """Response model for Action Flavors."""

    # Body and metadata properties
    @property
    def config_schema(self) -> Dict[str, Any]:
        """The `source_config_schema` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().config_schema
config_schema: Dict[str, Any] property readonly

The source_config_schema property.

Returns:

Type Description
Dict[str, Any]

the value of the property.

model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/core/action_flavor.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
ActionFlavorResponseBody (BasePluginResponseBody)

Response body for action flavors.

Source code in zenml/models/v2/core/action_flavor.py
class ActionFlavorResponseBody(BasePluginResponseBody):
    """Response body for action flavors."""
ActionFlavorResponseMetadata (BasePluginResponseMetadata)

Response metadata for action flavors.

Source code in zenml/models/v2/core/action_flavor.py
class ActionFlavorResponseMetadata(BasePluginResponseMetadata):
    """Response metadata for action flavors."""

    config_schema: Dict[str, Any]
ActionFlavorResponseResources (BasePluginResponseResources)

Response resources for action flavors.

Source code in zenml/models/v2/core/action_flavor.py
class ActionFlavorResponseResources(BasePluginResponseResources):
    """Response resources for action flavors."""
api_key

Models representing API keys.

APIKey (BaseModel)

Encoded model for API keys.

Source code in zenml/models/v2/core/api_key.py
class APIKey(BaseModel):
    """Encoded model for API keys."""

    id: UUID
    key: str

    @classmethod
    def decode_api_key(cls, encoded_key: str) -> "APIKey":
        """Decodes an API key from a base64 string.

        Args:
            encoded_key: The encoded API key.

        Returns:
            The decoded API key.

        Raises:
            ValueError: If the key is not valid.
        """
        if encoded_key.startswith(ZENML_API_KEY_PREFIX):
            encoded_key = encoded_key[len(ZENML_API_KEY_PREFIX) :]
        try:
            json_key = b64_decode(encoded_key)
            return cls.model_validate_json(json_key)
        except Exception:
            raise ValueError("Invalid API key.")

    def encode(self) -> str:
        """Encodes the API key in a base64 string that includes the key ID and prefix.

        Returns:
            The encoded API key.
        """
        encoded_key = b64_encode(self.model_dump_json())
        return f"{ZENML_API_KEY_PREFIX}{encoded_key}"
decode_api_key(encoded_key) classmethod

Decodes an API key from a base64 string.

Parameters:

Name Type Description Default
encoded_key str

The encoded API key.

required

Returns:

Type Description
APIKey

The decoded API key.

Exceptions:

Type Description
ValueError

If the key is not valid.

Source code in zenml/models/v2/core/api_key.py
@classmethod
def decode_api_key(cls, encoded_key: str) -> "APIKey":
    """Decodes an API key from a base64 string.

    Args:
        encoded_key: The encoded API key.

    Returns:
        The decoded API key.

    Raises:
        ValueError: If the key is not valid.
    """
    if encoded_key.startswith(ZENML_API_KEY_PREFIX):
        encoded_key = encoded_key[len(ZENML_API_KEY_PREFIX) :]
    try:
        json_key = b64_decode(encoded_key)
        return cls.model_validate_json(json_key)
    except Exception:
        raise ValueError("Invalid API key.")
encode(self)

Encodes the API key in a base64 string that includes the key ID and prefix.

Returns:

Type Description
str

The encoded API key.

Source code in zenml/models/v2/core/api_key.py
def encode(self) -> str:
    """Encodes the API key in a base64 string that includes the key ID and prefix.

    Returns:
        The encoded API key.
    """
    encoded_key = b64_encode(self.model_dump_json())
    return f"{ZENML_API_KEY_PREFIX}{encoded_key}"
APIKeyFilter (BaseFilter)

Filter model for API keys.

Source code in zenml/models/v2/core/api_key.py
class APIKeyFilter(BaseFilter):
    """Filter model for API keys."""

    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *BaseFilter.FILTER_EXCLUDE_FIELDS,
        "service_account",
    ]
    CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *BaseFilter.CLI_EXCLUDE_FIELDS,
        "service_account",
    ]

    service_account: Optional[UUID] = Field(
        default=None,
        description="The service account to scope this query to.",
    )
    name: Optional[str] = Field(
        default=None,
        description="Name of the API key",
    )
    description: Optional[str] = Field(
        default=None,
        title="Filter by the API key description.",
    )
    active: Optional[Union[bool, str]] = Field(
        default=None,
        title="Whether the API key is active.",
        union_mode="left_to_right",
    )
    last_login: Optional[Union[datetime, str]] = Field(
        default=None,
        title="Time when the API key was last used to log in.",
        union_mode="left_to_right",
    )
    last_rotated: Optional[Union[datetime, str]] = Field(
        default=None,
        title="Time when the API key was last rotated.",
        union_mode="left_to_right",
    )

    def set_service_account(self, service_account_id: UUID) -> None:
        """Set the service account by which to scope this query.

        Args:
            service_account_id: The service account ID.
        """
        self.service_account = service_account_id

    def apply_filter(
        self,
        query: AnyQuery,
        table: Type["AnySchema"],
    ) -> AnyQuery:
        """Override to apply the service account scope as an additional filter.

        Args:
            query: The query to which to apply the filter.
            table: The query table.

        Returns:
            The query with filter applied.
        """
        query = super().apply_filter(query=query, table=table)

        if self.service_account:
            scope_filter = (
                getattr(table, "service_account_id") == self.service_account
            )
            query = query.where(scope_filter)

        return query
apply_filter(self, query, table)

Override to apply the service account scope as an additional filter.

Parameters:

Name Type Description Default
query ~AnyQuery

The query to which to apply the filter.

required
table Type[AnySchema]

The query table.

required

Returns:

Type Description
~AnyQuery

The query with filter applied.

Source code in zenml/models/v2/core/api_key.py
def apply_filter(
    self,
    query: AnyQuery,
    table: Type["AnySchema"],
) -> AnyQuery:
    """Override to apply the service account scope as an additional filter.

    Args:
        query: The query to which to apply the filter.
        table: The query table.

    Returns:
        The query with filter applied.
    """
    query = super().apply_filter(query=query, table=table)

    if self.service_account:
        scope_filter = (
            getattr(table, "service_account_id") == self.service_account
        )
        query = query.where(scope_filter)

    return query
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/core/api_key.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
set_service_account(self, service_account_id)

Set the service account by which to scope this query.

Parameters:

Name Type Description Default
service_account_id UUID

The service account ID.

required
Source code in zenml/models/v2/core/api_key.py
def set_service_account(self, service_account_id: UUID) -> None:
    """Set the service account by which to scope this query.

    Args:
        service_account_id: The service account ID.
    """
    self.service_account = service_account_id
APIKeyInternalResponse (APIKeyResponse)

Response model for API keys used internally.

Source code in zenml/models/v2/core/api_key.py
class APIKeyInternalResponse(APIKeyResponse):
    """Response model for API keys used internally."""

    previous_key: Optional[str] = Field(
        default=None,
        title="The previous API key. Only set if the key was rotated.",
    )

    def verify_key(
        self,
        key: str,
    ) -> bool:
        """Verifies a given key against the stored (hashed) key(s).

        Args:
            key: Input key to be verified.

        Returns:
            True if the keys match.
        """
        # even when the hashed key is not set, we still want to execute
        # the hash verification to protect against response discrepancy
        # attacks (https://cwe.mitre.org/data/definitions/204.html)
        key_hash: Optional[str] = None
        context = CryptContext(schemes=["bcrypt"], deprecated="auto")
        if self.key is not None and self.active:
            key_hash = self.key
        result = context.verify(key, key_hash)

        # same for the previous key, if set and if it's still valid
        key_hash = None
        if (
            self.previous_key is not None
            and self.last_rotated is not None
            and self.active
            and self.retain_period_minutes > 0
        ):
            # check if the previous key is still valid
            if datetime.utcnow() - self.last_rotated < timedelta(
                minutes=self.retain_period_minutes
            ):
                key_hash = self.previous_key
        previous_result = context.verify(key, key_hash)

        return result or previous_result
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/core/api_key.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
verify_key(self, key)

Verifies a given key against the stored (hashed) key(s).

Parameters:

Name Type Description Default
key str

Input key to be verified.

required

Returns:

Type Description
bool

True if the keys match.

Source code in zenml/models/v2/core/api_key.py
def verify_key(
    self,
    key: str,
) -> bool:
    """Verifies a given key against the stored (hashed) key(s).

    Args:
        key: Input key to be verified.

    Returns:
        True if the keys match.
    """
    # even when the hashed key is not set, we still want to execute
    # the hash verification to protect against response discrepancy
    # attacks (https://cwe.mitre.org/data/definitions/204.html)
    key_hash: Optional[str] = None
    context = CryptContext(schemes=["bcrypt"], deprecated="auto")
    if self.key is not None and self.active:
        key_hash = self.key
    result = context.verify(key, key_hash)

    # same for the previous key, if set and if it's still valid
    key_hash = None
    if (
        self.previous_key is not None
        and self.last_rotated is not None
        and self.active
        and self.retain_period_minutes > 0
    ):
        # check if the previous key is still valid
        if datetime.utcnow() - self.last_rotated < timedelta(
            minutes=self.retain_period_minutes
        ):
            key_hash = self.previous_key
    previous_result = context.verify(key, key_hash)

    return result or previous_result
APIKeyInternalUpdate (APIKeyUpdate)

Update model for API keys used internally.

Source code in zenml/models/v2/core/api_key.py
class APIKeyInternalUpdate(APIKeyUpdate):
    """Update model for API keys used internally."""

    update_last_login: bool = Field(
        default=False,
        title="Whether to update the last login timestamp.",
    )
APIKeyRequest (BaseRequest)

Request model for API keys.

Source code in zenml/models/v2/core/api_key.py
class APIKeyRequest(BaseRequest):
    """Request model for API keys."""

    name: str = Field(
        title="The name of the API Key.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

    description: Optional[str] = Field(
        default=None,
        title="The description of the API Key.",
        max_length=TEXT_FIELD_MAX_LENGTH,
    )
APIKeyResponse (BaseIdentifiedResponse[APIKeyResponseBody, APIKeyResponseMetadata, APIKeyResponseResources])

Response model for API keys.

Source code in zenml/models/v2/core/api_key.py
class APIKeyResponse(
    BaseIdentifiedResponse[
        APIKeyResponseBody, APIKeyResponseMetadata, APIKeyResponseResources
    ]
):
    """Response model for API keys."""

    name: str = Field(
        title="The name of the API Key.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

    _warn_on_response_updates = False

    def get_hydrated_version(self) -> "APIKeyResponse":
        """Get the hydrated version of this API key.

        Returns:
            an instance of the same entity with the metadata field attached.
        """
        from zenml.client import Client

        return Client().zen_store.get_api_key(
            service_account_id=self.service_account.id,
            api_key_name_or_id=self.id,
        )

    # Helper functions
    def set_key(self, key: str) -> None:
        """Sets the API key and encodes it.

        Args:
            key: The API key value to be set.
        """
        self.get_body().key = APIKey(id=self.id, key=key).encode()

    # Body and metadata properties
    @property
    def key(self) -> Optional[str]:
        """The `key` property.

        Returns:
            the value of the property.
        """
        return self.get_body().key

    @property
    def active(self) -> bool:
        """The `active` property.

        Returns:
            the value of the property.
        """
        return self.get_body().active

    @property
    def service_account(self) -> "ServiceAccountResponse":
        """The `service_account` property.

        Returns:
            the value of the property.
        """
        return self.get_body().service_account

    @property
    def description(self) -> str:
        """The `description` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().description

    @property
    def retain_period_minutes(self) -> int:
        """The `retain_period_minutes` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().retain_period_minutes

    @property
    def last_login(self) -> Optional[datetime]:
        """The `last_login` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().last_login

    @property
    def last_rotated(self) -> Optional[datetime]:
        """The `last_rotated` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().last_rotated
active: bool property readonly

The active property.

Returns:

Type Description
bool

the value of the property.

description: str property readonly

The description property.

Returns:

Type Description
str

the value of the property.

key: Optional[str] property readonly

The key property.

Returns:

Type Description
Optional[str]

the value of the property.

last_login: Optional[datetime.datetime] property readonly

The last_login property.

Returns:

Type Description
Optional[datetime.datetime]

the value of the property.

last_rotated: Optional[datetime.datetime] property readonly

The last_rotated property.

Returns:

Type Description
Optional[datetime.datetime]

the value of the property.

retain_period_minutes: int property readonly

The retain_period_minutes property.

Returns:

Type Description
int

the value of the property.

service_account: ServiceAccountResponse property readonly

The service_account property.

Returns:

Type Description
ServiceAccountResponse

the value of the property.

get_hydrated_version(self)

Get the hydrated version of this API key.

Returns:

Type Description
APIKeyResponse

an instance of the same entity with the metadata field attached.

Source code in zenml/models/v2/core/api_key.py
def get_hydrated_version(self) -> "APIKeyResponse":
    """Get the hydrated version of this API key.

    Returns:
        an instance of the same entity with the metadata field attached.
    """
    from zenml.client import Client

    return Client().zen_store.get_api_key(
        service_account_id=self.service_account.id,
        api_key_name_or_id=self.id,
    )
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/core/api_key.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
set_key(self, key)

Sets the API key and encodes it.

Parameters:

Name Type Description Default
key str

The API key value to be set.

required
Source code in zenml/models/v2/core/api_key.py
def set_key(self, key: str) -> None:
    """Sets the API key and encodes it.

    Args:
        key: The API key value to be set.
    """
    self.get_body().key = APIKey(id=self.id, key=key).encode()
APIKeyResponseBody (BaseDatedResponseBody)

Response body for API keys.

Source code in zenml/models/v2/core/api_key.py
class APIKeyResponseBody(BaseDatedResponseBody):
    """Response body for API keys."""

    key: Optional[str] = Field(
        default=None,
        title="The API key. Only set immediately after creation or rotation.",
    )
    active: bool = Field(
        default=True,
        title="Whether the API key is active.",
    )
    service_account: "ServiceAccountResponse" = Field(
        title="The service account associated with this API key."
    )
APIKeyResponseMetadata (BaseResponseMetadata)

Response metadata for API keys.

Source code in zenml/models/v2/core/api_key.py
class APIKeyResponseMetadata(BaseResponseMetadata):
    """Response metadata for API keys."""

    description: str = Field(
        default="",
        title="The description of the API Key.",
        max_length=TEXT_FIELD_MAX_LENGTH,
    )
    retain_period_minutes: int = Field(
        title="Number of minutes for which the previous key is still valid "
        "after it has been rotated.",
    )
    last_login: Optional[datetime] = Field(
        default=None, title="Time when the API key was last used to log in."
    )
    last_rotated: Optional[datetime] = Field(
        default=None, title="Time when the API key was last rotated."
    )
APIKeyResponseResources (BaseResponseResources)

Class for all resource models associated with the APIKey entity.

Source code in zenml/models/v2/core/api_key.py
class APIKeyResponseResources(BaseResponseResources):
    """Class for all resource models associated with the APIKey entity."""
APIKeyRotateRequest (BaseModel)

Request model for API key rotation.

Source code in zenml/models/v2/core/api_key.py
class APIKeyRotateRequest(BaseModel):
    """Request model for API key rotation."""

    retain_period_minutes: int = Field(
        default=0,
        title="Number of minutes for which the previous key is still valid "
        "after it has been rotated.",
    )
APIKeyUpdate (BaseUpdate)

Update model for API keys.

Source code in zenml/models/v2/core/api_key.py
class APIKeyUpdate(BaseUpdate):
    """Update model for API keys."""

    name: Optional[str] = Field(
        title="The name of the API Key.",
        max_length=STR_FIELD_MAX_LENGTH,
        default=None,
    )
    description: Optional[str] = Field(
        title="The description of the API Key.",
        max_length=TEXT_FIELD_MAX_LENGTH,
        default=None,
    )
    active: Optional[bool] = Field(
        title="Whether the API key is active.",
        default=None,
    )
artifact

Models representing artifacts.

ArtifactFilter (WorkspaceScopedTaggableFilter)

Model to enable advanced filtering of artifacts.

Source code in zenml/models/v2/core/artifact.py
class ArtifactFilter(WorkspaceScopedTaggableFilter):
    """Model to enable advanced filtering of artifacts."""

    name: Optional[str] = None
    has_custom_name: Optional[bool] = None
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/core/artifact.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
ArtifactRequest (BaseRequest)

Artifact request model.

Source code in zenml/models/v2/core/artifact.py
class ArtifactRequest(BaseRequest):
    """Artifact request model."""

    name: str = Field(
        title="Name of the artifact.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    has_custom_name: bool = Field(
        title="Whether the name is custom (True) or auto-generated (False).",
        default=False,
    )
    tags: Optional[List[str]] = Field(
        title="Artifact tags.",
        description="Should be a list of plain strings, e.g., ['tag1', 'tag2']",
        default=None,
    )
ArtifactResponse (BaseIdentifiedResponse[ArtifactResponseBody, ArtifactResponseMetadata, ArtifactResponseResources])

Artifact response model.

Source code in zenml/models/v2/core/artifact.py
class ArtifactResponse(
    BaseIdentifiedResponse[
        ArtifactResponseBody,
        ArtifactResponseMetadata,
        ArtifactResponseResources,
    ]
):
    """Artifact response model."""

    def get_hydrated_version(self) -> "ArtifactResponse":
        """Get the hydrated version of this artifact.

        Returns:
            an instance of the same entity with the metadata field attached.
        """
        from zenml.client import Client

        return Client().zen_store.get_artifact(self.id)

    name: str = Field(
        title="Name of the output in the parent step.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

    # Body and metadata properties
    @property
    def tags(self) -> List[TagResponse]:
        """The `tags` property.

        Returns:
            the value of the property.
        """
        return self.get_body().tags

    @property
    def latest_version_name(self) -> Optional[str]:
        """The `latest_version_name` property.

        Returns:
            the value of the property.
        """
        return self.get_body().latest_version_name

    @property
    def latest_version_id(self) -> Optional[UUID]:
        """The `latest_version_id` property.

        Returns:
            the value of the property.
        """
        return self.get_body().latest_version_id

    @property
    def has_custom_name(self) -> bool:
        """The `has_custom_name` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().has_custom_name

    # Helper methods
    @property
    def versions(self) -> Dict[str, "ArtifactVersionResponse"]:
        """Get a list of all versions of this artifact.

        Returns:
            A list of all versions of this artifact.
        """
        from zenml.client import Client

        responses = Client().list_artifact_versions(name=self.name)
        return {str(response.version): response for response in responses}
has_custom_name: bool property readonly

The has_custom_name property.

Returns:

Type Description
bool

the value of the property.

latest_version_id: Optional[uuid.UUID] property readonly

The latest_version_id property.

Returns:

Type Description
Optional[uuid.UUID]

the value of the property.

latest_version_name: Optional[str] property readonly

The latest_version_name property.

Returns:

Type Description
Optional[str]

the value of the property.

tags: List[zenml.models.v2.core.tag.TagResponse] property readonly

The tags property.

Returns:

Type Description
List[zenml.models.v2.core.tag.TagResponse]

the value of the property.

versions: Dict[str, ArtifactVersionResponse] property readonly

Get a list of all versions of this artifact.

Returns:

Type Description
Dict[str, ArtifactVersionResponse]

A list of all versions of this artifact.

get_hydrated_version(self)

Get the hydrated version of this artifact.

Returns:

Type Description
ArtifactResponse

an instance of the same entity with the metadata field attached.

Source code in zenml/models/v2/core/artifact.py
def get_hydrated_version(self) -> "ArtifactResponse":
    """Get the hydrated version of this artifact.

    Returns:
        an instance of the same entity with the metadata field attached.
    """
    from zenml.client import Client

    return Client().zen_store.get_artifact(self.id)
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/core/artifact.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
ArtifactResponseBody (BaseDatedResponseBody)

Response body for artifacts.

Source code in zenml/models/v2/core/artifact.py
class ArtifactResponseBody(BaseDatedResponseBody):
    """Response body for artifacts."""

    tags: List[TagResponse] = Field(
        title="Tags associated with the model",
    )
    latest_version_name: Optional[str] = None
    latest_version_id: Optional[UUID] = None
ArtifactResponseMetadata (BaseResponseMetadata)

Response metadata for artifacts.

Source code in zenml/models/v2/core/artifact.py
class ArtifactResponseMetadata(BaseResponseMetadata):
    """Response metadata for artifacts."""

    has_custom_name: bool = Field(
        title="Whether the name is custom (True) or auto-generated (False).",
        default=False,
    )
ArtifactResponseResources (BaseResponseResources)

Class for all resource models associated with the Artifact Entity.

Source code in zenml/models/v2/core/artifact.py
class ArtifactResponseResources(BaseResponseResources):
    """Class for all resource models associated with the Artifact Entity."""
ArtifactUpdate (BaseModel)

Artifact update model.

Source code in zenml/models/v2/core/artifact.py
class ArtifactUpdate(BaseModel):
    """Artifact update model."""

    name: Optional[str] = None
    add_tags: Optional[List[str]] = None
    remove_tags: Optional[List[str]] = None
    has_custom_name: Optional[bool] = None
artifact_version

Models representing artifact versions.

ArtifactVersionFilter (WorkspaceScopedTaggableFilter)

Model to enable advanced filtering of artifact versions.

Source code in zenml/models/v2/core/artifact_version.py
class ArtifactVersionFilter(WorkspaceScopedTaggableFilter):
    """Model to enable advanced filtering of artifact versions."""

    # `name` and `only_unused` refer to properties related to other entities
    #  rather than a field in the db, hence they need to be handled
    #  explicitly
    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *WorkspaceScopedTaggableFilter.FILTER_EXCLUDE_FIELDS,
        "name",
        "only_unused",
        "has_custom_name",
    ]
    artifact_id: Optional[Union[UUID, str]] = Field(
        default=None,
        description="ID of the artifact to which this version belongs.",
        union_mode="left_to_right",
    )
    name: Optional[str] = Field(
        default=None,
        description="Name of the artifact to which this version belongs.",
    )
    version: Optional[str] = Field(
        default=None,
        description="Version of the artifact",
    )
    version_number: Optional[Union[int, str]] = Field(
        default=None,
        description="Version of the artifact if it is an integer",
        union_mode="left_to_right",
    )
    uri: Optional[str] = Field(
        default=None,
        description="Uri of the artifact",
    )
    materializer: Optional[str] = Field(
        default=None,
        description="Materializer used to produce the artifact",
    )
    type: Optional[str] = Field(
        default=None,
        description="Type of the artifact",
    )
    data_type: Optional[str] = Field(
        default=None,
        description="Datatype of the artifact",
    )
    artifact_store_id: Optional[Union[UUID, str]] = Field(
        default=None,
        description="Artifact store for this artifact",
        union_mode="left_to_right",
    )
    workspace_id: Optional[Union[UUID, str]] = Field(
        default=None,
        description="Workspace for this artifact",
        union_mode="left_to_right",
    )
    user_id: Optional[Union[UUID, str]] = Field(
        default=None,
        description="User that produced this artifact",
        union_mode="left_to_right",
    )
    only_unused: Optional[bool] = Field(
        default=False, description="Filter only for unused artifacts"
    )
    has_custom_name: Optional[bool] = Field(
        default=None,
        description="Filter only artifacts with/without custom names.",
    )

    def get_custom_filters(self) -> List[Union["ColumnElement[bool]"]]:
        """Get custom filters.

        Returns:
            A list of custom filters.
        """
        custom_filters = super().get_custom_filters()

        from sqlmodel import and_, select

        from zenml.zen_stores.schemas.artifact_schemas import (
            ArtifactSchema,
            ArtifactVersionSchema,
        )
        from zenml.zen_stores.schemas.step_run_schemas import (
            StepRunInputArtifactSchema,
            StepRunOutputArtifactSchema,
        )

        if self.name:
            value, filter_operator = self._resolve_operator(self.name)
            filter_ = StrFilter(
                operation=GenericFilterOps(filter_operator),
                column="name",
                value=value,
            )
            artifact_name_filter = and_(
                ArtifactVersionSchema.artifact_id == ArtifactSchema.id,
                filter_.generate_query_conditions(ArtifactSchema),
            )
            custom_filters.append(artifact_name_filter)

        if self.only_unused:
            unused_filter = and_(
                ArtifactVersionSchema.id.notin_(  # type: ignore[attr-defined]
                    select(StepRunOutputArtifactSchema.artifact_id)
                ),
                ArtifactVersionSchema.id.notin_(  # type: ignore[attr-defined]
                    select(StepRunInputArtifactSchema.artifact_id)
                ),
            )
            custom_filters.append(unused_filter)

        if self.has_custom_name is not None:
            custom_name_filter = and_(
                ArtifactVersionSchema.artifact_id == ArtifactSchema.id,
                ArtifactSchema.has_custom_name == self.has_custom_name,
            )
            custom_filters.append(custom_name_filter)

        return custom_filters
get_custom_filters(self)

Get custom filters.

Returns:

Type Description
List[ColumnElement[bool]]

A list of custom filters.

Source code in zenml/models/v2/core/artifact_version.py
def get_custom_filters(self) -> List[Union["ColumnElement[bool]"]]:
    """Get custom filters.

    Returns:
        A list of custom filters.
    """
    custom_filters = super().get_custom_filters()

    from sqlmodel import and_, select

    from zenml.zen_stores.schemas.artifact_schemas import (
        ArtifactSchema,
        ArtifactVersionSchema,
    )
    from zenml.zen_stores.schemas.step_run_schemas import (
        StepRunInputArtifactSchema,
        StepRunOutputArtifactSchema,
    )

    if self.name:
        value, filter_operator = self._resolve_operator(self.name)
        filter_ = StrFilter(
            operation=GenericFilterOps(filter_operator),
            column="name",
            value=value,
        )
        artifact_name_filter = and_(
            ArtifactVersionSchema.artifact_id == ArtifactSchema.id,
            filter_.generate_query_conditions(ArtifactSchema),
        )
        custom_filters.append(artifact_name_filter)

    if self.only_unused:
        unused_filter = and_(
            ArtifactVersionSchema.id.notin_(  # type: ignore[attr-defined]
                select(StepRunOutputArtifactSchema.artifact_id)
            ),
            ArtifactVersionSchema.id.notin_(  # type: ignore[attr-defined]
                select(StepRunInputArtifactSchema.artifact_id)
            ),
        )
        custom_filters.append(unused_filter)

    if self.has_custom_name is not None:
        custom_name_filter = and_(
            ArtifactVersionSchema.artifact_id == ArtifactSchema.id,
            ArtifactSchema.has_custom_name == self.has_custom_name,
        )
        custom_filters.append(custom_name_filter)

    return custom_filters
model_post_init(self, _ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/models/v2/core/artifact_version.py
def wrapped_model_post_init(self: BaseModel, __context: Any) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, __context)
    original_model_post_init(self, __context)
ArtifactVersionRequest (WorkspaceScopedRequest)

Request model for artifact versions.

Source code in zenml/models/v2/core/artifact_version.py
class ArtifactVersionRequest(WorkspaceScopedRequest):
    """Request model for artifact versions."""

    artifact_id: UUID = Field(
        title="ID of the artifact to which this version belongs.",
    )
    version: Union[str, int] = Field(
        title="Version of the artifact.", union_mode="left_to_right"
    )
    has_custom_name: bool = Field(
        title="Whether the name is custom (True) or auto-generated (False).",
        default=False,
    )
    type: ArtifactType = Field(title="Type of the artifact.")
    artifact_store_id: Optional[UUID] = Field(
        title="ID of the artifact store in which this artifact is stored.",
        default=None,
    )
    uri: str = Field(
        title="URI of the artifact.", max_length=TEXT_FIELD_MAX_LENGTH
    )
    materializer: SourceWithValidator = Field(
        title="Materializer class to use for this artifact.",
    )
    data_type: SourceWithValidator = Field(
        title="Data type of the artifact.",
    )
    tags: Optional[List[str]] = Field(
        title="Tags of the artifact.",
        description="Should be a list of plain strings, e.g., ['tag1', 'tag2']",
        default=None,
    )
    visualizations: Optional[List["ArtifactVisualizationRequest"]] = Field(
        default=None, title="Visualizations of the artifact."
    )

    @field_validator("version")
    @classmethod
    def str_field_max_length_check(cls, value: Any) -> Any:
        """Checks if the length of the value exceeds the maximum str length.

        Args:
            value: the value set in the field

        Returns:
            the value itself.

        Raises:
            AssertionError: if the length of the field is longer than the
                maximum threshold.
        """
        assert len(str(value)) < STR_FIELD_MAX_LENGTH, (
            "The length of the value for this field can not "
            f"exceed {STR_FIELD_MAX_LENGTH}"
        )
        return value
str_field_max_length_check(value) classmethod

Checks if the length of the value exceeds the maximum str length.

Parameters:

Name Type Description Default
value Any

the value set in the field

required

Returns:

Type Description
Any

the value itself.

Exceptions:

Type Description
AssertionError

if the length of the field is longer than the maximum threshold.

Source code in zenml/models/v2/core/artifact_version.py
@field_validator("version")
@classmethod
def str_field_max_length_check(cls, value: Any) -> Any:
    """Checks if the length of the value exceeds the maximum str length.

    Args:
        value: the value set in the field

    Returns:
        the value itself.

    Raises:
        AssertionError: if the length of the field is longer than the
            maximum threshold.
    """
    assert len(str(value)) < STR_FIELD_MAX_LENGTH, (
        "The length of the value for this field can not "
        f"exceed {STR_FIELD_MAX_LENGTH}"
    )
    return value
ArtifactVersionResponse (WorkspaceScopedResponse[ArtifactVersionResponseBody, ArtifactVersionResponseMetadata, ArtifactVersionResponseResources])

Response model for artifact versions.

Source code in zenml/models/v2/core/artifact_version.py
class ArtifactVersionResponse(
    WorkspaceScopedResponse[
        ArtifactVersionResponseBody,
        ArtifactVersionResponseMetadata,
        ArtifactVersionResponseResources,
    ]
):
    """Response model for artifact versions."""

    def get_hydrated_version(self) -> "ArtifactVersionResponse":
        """Get the hydrated version of this artifact version.

        Returns:
            an instance of the same entity with the metadata field attached.
        """
        from zenml.client import Client

        return Client().zen_store.get_artifact_version(self.id)

    # Body and metadata properties
    @property
    def artifact(self) -> "ArtifactResponse":
        """The `artifact` property.

        Returns:
            the value of the property.
        """
        return self.get_body().artifact

    @property
    def version(self) -> Union[str, int]:
        """The `version` property.

        Returns:
            the value of the property.
        """
        return self.get_body().version

    @property
    def uri(self) -> str:
        """The `uri` property.

        Returns:
            the value of the property.
        """
        return self.get_body().uri

    @property
    def type(self) -> ArtifactType:
        """The `type` property.

        Returns:
            the value of the property.
        """
        return self.get_body().type

    @property
    def tags(self) -> List[TagResponse]:
        """The `tags` property.

        Returns:
            the value of the property.
        """
        return self.get_body().tags

    @property
    def producer_pipeline_run_id(self) -> Optional[UUID]:
        """The `producer_pipeline_run_id` property.

        Returns:
            the value of the property.
        """
        return self.get_body().producer_pipeline_run_id

    @property
    def artifact_store_id(self) -> Optional[UUID]:
        """The `artifact_store_id` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().artifact_store_id

    @property
    def producer_step_run_id(self) -> Optional[UUID]:
        """The `producer_step_run_id` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().producer_step_run_id

    @property
    def visualizations(
        self,
    ) -> Optional[List["ArtifactVisualizationResponse"]]:
        """The `visualizations` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().visualizations

    @property
    def run_metadata(self) -> Dict[str, "RunMetadataResponse"]:
        """The `metadata` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().run_metadata

    @property
    def materializer(self) -> Source:
        """The `materializer` property.

        Returns:
            the value of the property.
        """
        return self.get_body().materializer

    @property
    def data_type(self) -> Source:
        """The `data_type` property.

        Returns:
            the value of the property.
        """
        return self.get_body().data_type

    # Helper methods
    @property
    def name(self) -> str:
        """The `name` property.

        Returns:
            the value of the property.
        """
        return self.artifact.name

    @property
    def step(self) -> "StepRunResponse":
        """Get the step that produced this artifact.

        Returns:
            The step that produced this artifact.
        """
        from zenml.artifacts.utils import get_producer_step_of_artifact

        return get_producer_step_of_artifact(self)

    @property
    def run(self) -> "PipelineRunResponse":
        """Get the pipeline run that produced this artifact.

        Returns:
            The pipeline run that produced this artifact.
        """
        from zenml.client import Client

        return Client().get_pipeline_run(self.step.pipeline_run_id)

    def load(self) -> Any:
        """Materializes (loads) the data stored in this artifact.

        Returns:
            The materialized data.
        """
        from zenml.artifacts.utils import load_artifact_from_response

        return load_artifact_from_response(self)

    def download_files(self, path: str, overwrite: bool = False) -> None:
        """Downloads data for an artifact with no materializing.

        Any artifacts will be saved as a zip file to the given path.

        Args:
            path: The path to save the binary data to.
            overwrite: Whether to overwrite the file if it already exists.

        Raises:
            ValueError: If the path does not end with '.zip'.
        """
        if not path.endswith(".zip"):
            raise ValueError(
                "The path should end with '.zip' to save the binary data."
            )
        from zenml.artifacts.utils import (
            download_artifact_files_from_response,
        )

        download_artifact_files_from_response(
            self,
            path=path,
            overwrite=overwrite,
        )

    def read(self) -> Any:
        """(Deprecated) Materializes (loads) the data stored in this artifact.

        Returns:
            The materialized data.
        """
        logger.warning(
            "`artifact.read()` is deprecated and will be removed in a future "
            "release. Please use `artifact.load()` instead."
        )
        return self.load()

    def visualize(self, title: Optional[str] = None) -> None:
        """Visualize the artifact in notebook environments.

        Args:
            title: Optional title to show before the visualizations.
        """
        from zenml.utils.visualization_utils import visualize_artifact

        visualize_artifact(self, title=title)
artifact: ArtifactResponse property readonly

The artifact property.

Returns:

Type Description
ArtifactResponse

the value of the property.

artifact_store_id: Optional[uuid.UUID] property readonly

The artifact_store_id property.

Returns:

Type Description
Optional[uuid.UUID]

the value of the property.

data_type: Source property readonly

The data_type property.

Returns:

Type Description
Source

the value of the property.

materializer: Source property readonly

The materializer property.

Returns:

Type Description
Source

the value of the property.

name: str property readonly

The name property.

Returns:

Type Description
str

the value of the property.

producer_pipeline_run_id: Optional[uuid.UUID] property readonly

The producer_pipeline_run_id property.

Returns:

Type Description
Optional[uuid.UUID]

the value of the property.

producer_step_run_id: Optional[uuid.UUID] property readonly

The producer_step_run_id property.

Returns:

Type Description
Optional[uuid.UUID]

the value of the property.

run: PipelineRunResponse property readonly

Get the pipeline run that produced this artifact.

Returns:

Type Description
PipelineRunResponse

The pipeline run that produced this artifact.

run_metadata: Dict[str, RunMetadataResponse] property readonly

The metadata property.

Returns:

Type Description
Dict[str, RunMetadataResponse]

the value of the property.

step: StepRunResponse property readonly

Get the step that produced this artifact.

Returns:

Type Description
StepRunResponse

The step that produced this artifact.

tags: List[zenml.models.v2.core.tag.TagResponse] property readonly

The tags property.

Returns:

Type Description
List[zenml.models.v2.core.tag.TagResponse]

the value of the property.

type: ArtifactType property readonly

The type property.

Returns:

Type Description
ArtifactType

the value of the property.

uri: str property readonly

The uri property.

Returns:

Type Description
str

the value of the property.

version: Union[str, int] property readonly

The version property.

Returns:

Type Description
Union[str, int]

the value of the property.

visualizations: Optional[List[ArtifactVisualizationResponse]] property readonly

The visualizations property.

Returns:

Type Description
Optional[List[ArtifactVisualizationResponse]]

the value of the property.

download_files(self,