Skip to content

Models

zenml.models special

Pydantic models for the various concepts in ZenML.

v2 special

base special

base

Base model definitions.

BaseRequest (BaseZenModel) pydantic-model

Base request model.

Used as a base class for all request models.

Source code in zenml/models/v2/base/base.py
class BaseRequest(BaseZenModel):
    """Base request model.

    Used as a base class for all request models.
    """
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

BaseResponse (GenericModel, Generic, BaseZenModel) pydantic-model

Base domain model.

Source code in zenml/models/v2/base/base.py
class BaseResponse(GenericModel, Generic[AnyBody, AnyMetadata], BaseZenModel):
    """Base domain model."""

    id: UUID = Field(title="The unique resource id.")
    permission_denied: bool = False

    # Body and metadata pair
    body: Optional["AnyBody"] = Field(title="The body of the resource.")
    metadata: Optional["AnyMetadata"] = Field(
        title="The metadata related to this resource."
    )

    _response_update_strategy: (
        ResponseUpdateStrategy
    ) = ResponseUpdateStrategy.ALLOW
    _warn_on_response_updates: bool = True

    def get_hydrated_version(self) -> "BaseResponse[AnyBody, AnyMetadata]":
        """Abstract method to fetch the hydrated version of the model.

        Raises:
            NotImplementedError: in case the method is not implemented.
        """
        raise NotImplementedError(
            "Please implement a `get_hydrated_version` method before "
            "using/hydrating the model."
        )

    # Helper functions
    def __hash__(self) -> int:
        """Implementation of hash magic method.

        Returns:
            Hash of the UUID.
        """
        return hash((type(self),) + tuple([self.id]))

    def __eq__(self, other: Any) -> bool:
        """Implementation of equality magic method.

        Args:
            other: The other object to compare to.

        Returns:
            True if the other object is of the same type and has the same UUID.
        """
        if isinstance(other, type(self)):
            return self.id == other.id
        else:
            return False

    def _validate_hydrated_version(
        self, hydrated_model: "BaseResponse[AnyBody, AnyMetadata]"
    ) -> None:
        """Helper method to validate the values within the hydrated version.

        Args:
            hydrated_model: the hydrated version of the model.

        Raises:
            HydrationError: if the hydrated version has different values set
                for either the name of the body fields and the
                _method_body_mutation is set to ResponseBodyUpdate.DENY.
        """
        # Check whether the metadata exists in the hydrated version
        if hydrated_model.metadata is None:
            raise HydrationError(
                "The hydrated model does not have a metadata field."
            )

        # Check if the ID is the same
        if self.id != hydrated_model.id:
            raise HydrationError(
                "The hydrated version of the model does not have the same id."
            )

        # Check if the name has changed
        if "name" in self.__fields__:
            original_name = getattr(self, "name")
            hydrated_name = getattr(hydrated_model, "name")

            if original_name != hydrated_name:
                if (
                    self._response_update_strategy
                    == ResponseUpdateStrategy.ALLOW
                ):
                    setattr(self, "name", hydrated_name)

                    if self._warn_on_response_updates:
                        logger.warning(
                            f"The name of the entity has changed from "
                            f"`{original_name}` to `{hydrated_name}`."
                        )

                elif (
                    self._response_update_strategy
                    == ResponseUpdateStrategy.IGNORE
                ):
                    if self._warn_on_response_updates:
                        logger.warning(
                            f"Ignoring the name change in the hydrated version "
                            f"of the response: `{original_name}` to "
                            f"`{hydrated_name}`."
                        )
                elif (
                    self._response_update_strategy
                    == ResponseUpdateStrategy.DENY
                ):
                    raise HydrationError(
                        f"Failing the hydration, because there is a change in "
                        f"the name of the entity: `{original_name}` to "
                        f"`{hydrated_name}`."
                    )

        # Check all the fields in the body
        for field in self.get_body().__fields__:
            original_value = getattr(self.get_body(), field)
            hydrated_value = getattr(hydrated_model.get_body(), field)

            if original_value != hydrated_value:
                if (
                    self._response_update_strategy
                    == ResponseUpdateStrategy.ALLOW
                ):
                    setattr(self.get_body(), field, hydrated_value)

                    if self._warn_on_response_updates:
                        logger.warning(
                            f"The field `{field}` in the body of the response "
                            f"has changed from `{original_value}` to "
                            f"`{hydrated_value}`."
                        )

                elif (
                    self._response_update_strategy
                    == ResponseUpdateStrategy.IGNORE
                ):
                    if self._warn_on_response_updates:
                        logger.warning(
                            f"Ignoring the change in the hydrated version of "
                            f"the field `{field}`: `{original_value}` -> "
                            f"`{hydrated_value}`."
                        )
                elif (
                    self._response_update_strategy
                    == ResponseUpdateStrategy.DENY
                ):
                    raise HydrationError(
                        f"Failing the hydration, because there is a change in "
                        f"the field `{field}`: `{original_value}` -> "
                        f"`{hydrated_value}`"
                    )

    def get_body(self) -> AnyBody:
        """Fetch the body of the entity.

        Returns:
            The body field of the response.

        Raises:
            IllegalOperationError: If the user lacks permission to access the
                entity represented by this response.
            RuntimeError: If the body was not included in the response.
        """
        if self.permission_denied:
            raise IllegalOperationError(
                f"Missing permissions to access {type(self).__name__} with "
                f"ID {self.id}."
            )

        if not self.body:
            raise RuntimeError(
                f"Missing response body for {type(self).__name__} with ID "
                f"{self.id}."
            )

        return self.body

    def get_metadata(self) -> "AnyMetadata":
        """Fetch the metadata of the entity.

        Returns:
            The metadata field of the response.

        Raises:
            IllegalOperationError: If the user lacks permission to access this
                entity represented by this response.
        """
        if self.permission_denied:
            raise IllegalOperationError(
                f"Missing permissions to access {type(self).__name__} with "
                f"ID {self.id}."
            )

        if self.metadata is None:
            # If the metadata is not there, check the class first.
            metadata_type = self.__fields__["metadata"].type_

            if len(metadata_type.__fields__):
                # If the metadata class defines any fields, fetch the metadata
                # through the hydrated version.
                hydrated_version = self.get_hydrated_version()
                self._validate_hydrated_version(hydrated_version)
                self.metadata = hydrated_version.metadata
            else:
                # Otherwise, use the metadata class to create an empty metadata
                # object.
                self.metadata = metadata_type()

        assert self.metadata is not None

        return self.metadata

    # Analytics
    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Fetches the analytics metadata for base response models.

        Returns:
            The analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        metadata["entity_id"] = self.id
        return metadata

    # Body and metadata properties
    @property
    def created(self) -> Optional[datetime]:
        """The `created` property.

        Returns:
            the value of the property.
        """
        return self.get_body().created

    @property
    def updated(self) -> Optional[datetime]:
        """The `updated` property.

        Returns:
            the value of the property.
        """
        return self.get_body().updated
created: Optional[datetime.datetime] property readonly

The created property.

Returns:

Type Description
Optional[datetime.datetime]

the value of the property.

updated: Optional[datetime.datetime] property readonly

The updated property.

Returns:

Type Description
Optional[datetime.datetime]

the value of the property.

__eq__(self, other) special

Implementation of equality magic method.

Parameters:

Name Type Description Default
other Any

The other object to compare to.

required

Returns:

Type Description
bool

True if the other object is of the same type and has the same UUID.

Source code in zenml/models/v2/base/base.py
def __eq__(self, other: Any) -> bool:
    """Implementation of equality magic method.

    Args:
        other: The other object to compare to.

    Returns:
        True if the other object is of the same type and has the same UUID.
    """
    if isinstance(other, type(self)):
        return self.id == other.id
    else:
        return False
__hash__(self) special

Implementation of hash magic method.

Returns:

Type Description
int

Hash of the UUID.

Source code in zenml/models/v2/base/base.py
def __hash__(self) -> int:
    """Implementation of hash magic method.

    Returns:
        Hash of the UUID.
    """
    return hash((type(self),) + tuple([self.id]))
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_analytics_metadata(self)

Fetches the analytics metadata for base response models.

Returns:

Type Description
Dict[str, Any]

The analytics metadata.

Source code in zenml/models/v2/base/base.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Fetches the analytics metadata for base response models.

    Returns:
        The analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    metadata["entity_id"] = self.id
    return metadata
get_body(self)

Fetch the body of the entity.

Returns:

Type Description
~AnyBody

The body field of the response.

Exceptions:

Type Description
IllegalOperationError

If the user lacks permission to access the entity represented by this response.

RuntimeError

If the body was not included in the response.

Source code in zenml/models/v2/base/base.py
def get_body(self) -> AnyBody:
    """Fetch the body of the entity.

    Returns:
        The body field of the response.

    Raises:
        IllegalOperationError: If the user lacks permission to access the
            entity represented by this response.
        RuntimeError: If the body was not included in the response.
    """
    if self.permission_denied:
        raise IllegalOperationError(
            f"Missing permissions to access {type(self).__name__} with "
            f"ID {self.id}."
        )

    if not self.body:
        raise RuntimeError(
            f"Missing response body for {type(self).__name__} with ID "
            f"{self.id}."
        )

    return self.body
get_hydrated_version(self)

Abstract method to fetch the hydrated version of the model.

Exceptions:

Type Description
NotImplementedError

in case the method is not implemented.

Source code in zenml/models/v2/base/base.py
def get_hydrated_version(self) -> "BaseResponse[AnyBody, AnyMetadata]":
    """Abstract method to fetch the hydrated version of the model.

    Raises:
        NotImplementedError: in case the method is not implemented.
    """
    raise NotImplementedError(
        "Please implement a `get_hydrated_version` method before "
        "using/hydrating the model."
    )
get_metadata(self)

Fetch the metadata of the entity.

Returns:

Type Description
AnyMetadata

The metadata field of the response.

Exceptions:

Type Description
IllegalOperationError

If the user lacks permission to access this entity represented by this response.

Source code in zenml/models/v2/base/base.py
def get_metadata(self) -> "AnyMetadata":
    """Fetch the metadata of the entity.

    Returns:
        The metadata field of the response.

    Raises:
        IllegalOperationError: If the user lacks permission to access this
            entity represented by this response.
    """
    if self.permission_denied:
        raise IllegalOperationError(
            f"Missing permissions to access {type(self).__name__} with "
            f"ID {self.id}."
        )

    if self.metadata is None:
        # If the metadata is not there, check the class first.
        metadata_type = self.__fields__["metadata"].type_

        if len(metadata_type.__fields__):
            # If the metadata class defines any fields, fetch the metadata
            # through the hydrated version.
            hydrated_version = self.get_hydrated_version()
            self._validate_hydrated_version(hydrated_version)
            self.metadata = hydrated_version.metadata
        else:
            # Otherwise, use the metadata class to create an empty metadata
            # object.
            self.metadata = metadata_type()

    assert self.metadata is not None

    return self.metadata
BaseResponseBody (BaseZenModel) pydantic-model

Base body model.

Used as a base class for all body models associated with responses. Features a creation and update timestamp.

Source code in zenml/models/v2/base/base.py
class BaseResponseBody(BaseZenModel):
    """Base body model.

    Used as a base class for all body models associated with responses.
    Features a creation and update timestamp.
    """

    created: Optional[datetime] = Field(
        title="The timestamp when this resource was created.",
        default=None,
    )
    updated: Optional[datetime] = Field(
        title="The timestamp when this resource was last updated.",
        default=None,
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

BaseResponseMetadata (BaseZenModel) pydantic-model

Base metadata model.

Used as a base class for all metadata models associated with responses.

Source code in zenml/models/v2/base/base.py
class BaseResponseMetadata(BaseZenModel):
    """Base metadata model.

    Used as a base class for all metadata models associated with responses.
    """
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

BaseZenModel (YAMLSerializationMixin, AnalyticsTrackedModelMixin) pydantic-model

Base model class for all ZenML models.

This class is used as a base class for all ZenML models. It provides functionality for tracking analytics events and proper encoding of SecretStr values.

Source code in zenml/models/v2/base/base.py
class BaseZenModel(YAMLSerializationMixin, AnalyticsTrackedModelMixin):
    """Base model class for all ZenML models.

    This class is used as a base class for all ZenML models. It provides
    functionality for tracking analytics events and proper encoding of
    SecretStr values.
    """

    class Config:
        """Pydantic configuration class."""

        # This is needed to allow the REST client and server to unpack SecretStr
        # values correctly.
        json_encoders = {
            SecretStr: lambda v: v.get_secret_value()
            if v is not None
            else None
        }

        # Allow extras on all models to support forwards and backwards
        # compatibility (e.g. new fields in newer versions of ZenML servers
        # are allowed to be present in older versions of ZenML clients and
        # vice versa).
        extra = "allow"
Config

Pydantic configuration class.

Source code in zenml/models/v2/base/base.py
class Config:
    """Pydantic configuration class."""

    # This is needed to allow the REST client and server to unpack SecretStr
    # values correctly.
    json_encoders = {
        SecretStr: lambda v: v.get_secret_value()
        if v is not None
        else None
    }

    # Allow extras on all models to support forwards and backwards
    # compatibility (e.g. new fields in newer versions of ZenML servers
    # are allowed to be present in older versions of ZenML clients and
    # vice versa).
    extra = "allow"
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

filter

Base filter model definitions.

BaseFilter (BaseModel) pydantic-model

Class to unify all filter, paginate and sort request parameters.

This Model allows fine-grained filtering, sorting and pagination of resources.

Usage example for subclasses of this class:

ResourceListModel(
    name="contains:default",
    workspace="default"
    count_steps="gte:5"
    sort_by="created",
    page=2,
    size=20
)
Source code in zenml/models/v2/base/filter.py
class BaseFilter(BaseModel):
    """Class to unify all filter, paginate and sort request parameters.

    This Model allows fine-grained filtering, sorting and pagination of
    resources.

    Usage example for subclasses of this class:
    ```
    ResourceListModel(
        name="contains:default",
        workspace="default"
        count_steps="gte:5"
        sort_by="created",
        page=2,
        size=20
    )
    ```
    """

    # List of fields that cannot be used as filters.
    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        "sort_by",
        "page",
        "size",
        "logical_operator",
    ]

    # List of fields that are not even mentioned as options in the CLI.
    CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = []

    # List of fields that are wrapped with `fastapi.Query(default)` in API.
    API_MULTI_INPUT_PARAMS: ClassVar[List[str]] = []

    sort_by: str = Field(
        default="created", description="Which column to sort by."
    )
    logical_operator: LogicalOperators = Field(
        default=LogicalOperators.AND,
        description="Which logical operator to use between all filters "
        "['and', 'or']",
    )
    page: int = Field(
        default=PAGINATION_STARTING_PAGE, ge=1, description="Page number"
    )
    size: int = Field(
        default=PAGE_SIZE_DEFAULT,
        ge=1,
        le=PAGE_SIZE_MAXIMUM,
        description="Page size",
    )

    id: Optional[Union[UUID, str]] = Field(
        default=None, description="Id for this resource"
    )
    created: Optional[Union[datetime, str]] = Field(
        default=None, description="Created"
    )
    updated: Optional[Union[datetime, str]] = Field(
        default=None, description="Updated"
    )

    _rbac_configuration: Optional[
        Tuple[UUID, Dict[str, Optional[Set[UUID]]]]
    ] = None

    @validator("sort_by", pre=True)
    def validate_sort_by(cls, v: str) -> str:
        """Validate that the sort_column is a valid column with a valid operand.

        Args:
            v: The sort_by field value.

        Returns:
            The validated sort_by field value.

        Raises:
            ValidationError: If the sort_by field is not a string.
            ValueError: If the resource can't be sorted by this field.
        """
        # Somehow pydantic allows you to pass in int values, which will be
        #  interpreted as string, however within the validator they are still
        #  integers, which don't have a .split() method
        if not isinstance(v, str):
            raise ValidationError(
                f"str type expected for the sort_by field. "
                f"Received a {type(v)}"
            )
        column = v
        split_value = v.split(":", 1)
        if len(split_value) == 2:
            column = split_value[1]

            if split_value[0] not in SorterOps.values():
                logger.warning(
                    "Invalid operand used for column sorting. "
                    "Only the following operands are supported `%s`. "
                    "Defaulting to 'asc' on column `%s`.",
                    SorterOps.values(),
                    column,
                )
                v = column

        if column in cls.FILTER_EXCLUDE_FIELDS:
            raise ValueError(
                f"This resource can not be sorted by this field: '{v}'"
            )
        elif column in cls.__fields__:
            return v
        else:
            raise ValueError(
                "You can only sort by valid fields of this resource"
            )

    @root_validator(pre=True)
    def filter_ops(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        """Parse incoming filters to ensure all filters are legal.

        Args:
            values: The values of the class.

        Returns:
            The values of the class.
        """
        cls._generate_filter_list(values)
        return values

    @property
    def list_of_filters(self) -> List[Filter]:
        """Converts the class variables into a list of usable Filter Models.

        Returns:
            A list of Filter models.
        """
        return self._generate_filter_list(
            {key: getattr(self, key) for key in self.__fields__}
        )

    @property
    def sorting_params(self) -> Tuple[str, SorterOps]:
        """Converts the class variables into a list of usable Filter Models.

        Returns:
            A tuple of the column to sort by and the sorting operand.
        """
        column = self.sort_by
        # The default sorting operand is asc
        operator = SorterOps.ASCENDING

        # Check if user explicitly set an operand
        split_value = self.sort_by.split(":", 1)
        if len(split_value) == 2:
            column = split_value[1]
            operator = SorterOps(split_value[0])

        return column, operator

    def configure_rbac(
        self,
        authenticated_user_id: UUID,
        **column_allowed_ids: Optional[Set[UUID]],
    ) -> None:
        """Configure RBAC allowed column values.

        Args:
            authenticated_user_id: ID of the authenticated user. All entities
                owned by this user will be included.
            column_allowed_ids: Set of IDs per column to limit the query to.
                If given, the remaining filters will be applied to entities
                within this set only. If `None`, the remaining filters will
                applied to all entries in the table.
        """
        self._rbac_configuration = (authenticated_user_id, column_allowed_ids)

    def generate_rbac_filter(
        self,
        table: Type["AnySchema"],
    ) -> Optional["BooleanClauseList[Any]"]:
        """Generates an optional RBAC filter.

        Args:
            table: The query table.

        Returns:
            The RBAC filter.
        """
        from sqlmodel import or_

        if not self._rbac_configuration:
            return None

        expressions = []

        for column_name, allowed_ids in self._rbac_configuration[1].items():
            if allowed_ids is not None:
                expression = getattr(table, column_name).in_(allowed_ids)
                expressions.append(expression)

        if expressions and hasattr(table, "user_id"):
            # If `expressions` is not empty, we do not have full access to all
            # rows of the table. In this case, we also include rows which the
            # user owns.

            # Unowned entities are considered server-owned and can be seen
            # by anyone
            expressions.append(getattr(table, "user_id").is_(None))
            # The authenticated user owns this entity
            expressions.append(
                getattr(table, "user_id") == self._rbac_configuration[0]
            )

        if expressions:
            return or_(*expressions)
        else:
            return None

    @classmethod
    def _generate_filter_list(cls, values: Dict[str, Any]) -> List[Filter]:
        """Create a list of filters from a (column, value) dictionary.

        Args:
            values: A dictionary of column names and values to filter on.

        Returns:
            A list of filters.
        """
        list_of_filters: List[Filter] = []

        for key, value in values.items():
            # Ignore excluded filters
            if key in cls.FILTER_EXCLUDE_FIELDS:
                continue

            # Skip filtering for None values
            if value is None:
                continue

            # Determine the operator and filter value
            value, operator = cls._resolve_operator(value)

            # Define the filter
            filter = cls._define_filter(
                column=key, value=value, operator=operator
            )
            list_of_filters.append(filter)

        return list_of_filters

    @staticmethod
    def _resolve_operator(value: Any) -> Tuple[Any, GenericFilterOps]:
        """Determine the operator and filter value from a user-provided value.

        If the user-provided value is a string of the form "operator:value",
        then the operator is extracted and the value is returned. Otherwise,
        `GenericFilterOps.EQUALS` is used as default operator and the value
        is returned as-is.

        Args:
            value: The user-provided value.

        Returns:
            A tuple of the filter value and the operator.
        """
        operator = GenericFilterOps.EQUALS  # Default operator
        if isinstance(value, str):
            split_value = value.split(":", 1)
            if (
                len(split_value) == 2
                and split_value[0] in GenericFilterOps.values()
            ):
                value = split_value[1]
                operator = GenericFilterOps(split_value[0])
        return value, operator

    @classmethod
    def _define_filter(
        cls, column: str, value: Any, operator: GenericFilterOps
    ) -> Filter:
        """Define a filter for a given column.

        Args:
            column: The column to filter on.
            value: The value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.
        """
        # Create datetime filters
        if cls.is_datetime_field(column):
            return cls._define_datetime_filter(
                column=column,
                value=value,
                operator=operator,
            )

        # Create UUID filters
        if cls.is_uuid_field(column):
            return cls._define_uuid_filter(
                column=column,
                value=value,
                operator=operator,
            )

        # Create int filters
        if cls.is_int_field(column):
            return NumericFilter(
                operation=GenericFilterOps(operator),
                column=column,
                value=int(value),
            )

        # Create bool filters
        if cls.is_bool_field(column):
            return cls._define_bool_filter(
                column=column,
                value=value,
                operator=operator,
            )

        # Create str filters
        if cls.is_str_field(column):
            return StrFilter(
                operation=GenericFilterOps(operator),
                column=column,
                value=value,
            )

        # Handle unsupported datatypes
        logger.warning(
            f"The Datatype {cls.__fields__[column].type_} might not be "
            "supported for filtering. Defaulting to a string filter."
        )
        return StrFilter(
            operation=GenericFilterOps(operator),
            column=column,
            value=str(value),
        )

    @classmethod
    def is_datetime_field(cls, k: str) -> bool:
        """Checks if it's a datetime field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a datetime field, False otherwise.
        """
        return (
            issubclass(datetime, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is datetime
        )

    @classmethod
    def is_uuid_field(cls, k: str) -> bool:
        """Checks if it's a uuid field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a uuid field, False otherwise.
        """
        return (
            issubclass(UUID, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is UUID
        )

    @classmethod
    def is_int_field(cls, k: str) -> bool:
        """Checks if it's a int field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a int field, False otherwise.
        """
        return (
            issubclass(int, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is int
        )

    @classmethod
    def is_bool_field(cls, k: str) -> bool:
        """Checks if it's a bool field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a bool field, False otherwise.
        """
        return (
            issubclass(bool, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is bool
        )

    @classmethod
    def is_str_field(cls, k: str) -> bool:
        """Checks if it's a string field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a string field, False otherwise.
        """
        return (
            issubclass(str, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is str
        )

    @classmethod
    def is_sort_by_field(cls, k: str) -> bool:
        """Checks if it's a sort by field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a sort by field, False otherwise.
        """
        return (
            issubclass(str, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ == str
        ) and k == "sort_by"

    @staticmethod
    def _define_datetime_filter(
        column: str, value: Any, operator: GenericFilterOps
    ) -> NumericFilter:
        """Define a datetime filter for a given column.

        Args:
            column: The column to filter on.
            value: The datetime value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.

        Raises:
            ValueError: If the value is not a valid datetime.
        """
        try:
            if isinstance(value, datetime):
                datetime_value = value
            else:
                datetime_value = datetime.strptime(
                    value, FILTERING_DATETIME_FORMAT
                )
        except ValueError as e:
            raise ValueError(
                "The datetime filter only works with values in the following "
                f"format: {FILTERING_DATETIME_FORMAT}"
            ) from e
        datetime_filter = NumericFilter(
            operation=GenericFilterOps(operator),
            column=column,
            value=datetime_value,
        )
        return datetime_filter

    @staticmethod
    def _define_uuid_filter(
        column: str, value: Any, operator: GenericFilterOps
    ) -> UUIDFilter:
        """Define a UUID filter for a given column.

        Args:
            column: The column to filter on.
            value: The UUID value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.

        Raises:
            ValueError: If the value is not a valid UUID.
        """
        # For equality checks, ensure that the value is a valid UUID.
        if operator == GenericFilterOps.EQUALS and not isinstance(value, UUID):
            try:
                UUID(value)
            except ValueError as e:
                raise ValueError(
                    "Invalid value passed as UUID query parameter."
                ) from e

        # Cast the value to string for further comparisons.
        value = str(value)

        # Generate the filter.
        uuid_filter = UUIDFilter(
            operation=GenericFilterOps(operator),
            column=column,
            value=value,
        )
        return uuid_filter

    @staticmethod
    def _define_bool_filter(
        column: str, value: Any, operator: GenericFilterOps
    ) -> BoolFilter:
        """Define a bool filter for a given column.

        Args:
            column: The column to filter on.
            value: The bool value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.
        """
        if GenericFilterOps(operator) != GenericFilterOps.EQUALS:
            logger.warning(
                "Boolean filters do not support any"
                "operation except for equals. Defaulting"
                "to an `equals` comparison."
            )
        return BoolFilter(
            operation=GenericFilterOps.EQUALS,
            column=column,
            value=bool(value),
        )

    @property
    def offset(self) -> int:
        """Returns the offset needed for the query on the data persistence layer.

        Returns:
            The offset for the query.
        """
        return self.size * (self.page - 1)

    def generate_filter(
        self, table: Type[SQLModel]
    ) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
        """Generate the filter for the query.

        Args:
            table: The Table that is being queried from.

        Returns:
            The filter expression for the query.

        Raises:
            RuntimeError: If a valid logical operator is not supplied.
        """
        from sqlalchemy import and_
        from sqlmodel import or_

        filters = []
        for column_filter in self.list_of_filters:
            filters.append(
                column_filter.generate_query_conditions(table=table)
            )
        for custom_filter in self.get_custom_filters():
            filters.append(custom_filter)
        if self.logical_operator == LogicalOperators.OR:
            return or_(False, *filters)
        elif self.logical_operator == LogicalOperators.AND:
            return and_(True, *filters)
        else:
            raise RuntimeError("No valid logical operator was supplied.")

    def get_custom_filters(
        self,
    ) -> List[Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]]:
        """Get custom filters.

        This can be overridden by subclasses to define custom filters that are
        not based on the columns of the underlying table.

        Returns:
            A list of custom filters.
        """
        return []

    def apply_filter(
        self,
        query: AnyQuery,
        table: Type["AnySchema"],
    ) -> AnyQuery:
        """Applies the filter to a query.

        Args:
            query: The query to which to apply the filter.
            table: The query table.

        Returns:
            The query with filter applied.
        """
        rbac_filter = self.generate_rbac_filter(table=table)

        if rbac_filter is not None:
            query = query.where(rbac_filter)

        filters = self.generate_filter(table=table)

        if filters is not None:
            query = query.where(filters)

        return query

    class Config:
        """Pydantic configuration class."""

        # all attributes with leading underscore are private and therefore
        # are mutable and not included in serialization
        underscore_attrs_are_private = True
created: Union[datetime.datetime, str] pydantic-field

Created

id: Union[uuid.UUID, str] pydantic-field

Id for this resource

list_of_filters: List[zenml.models.v2.base.filter.Filter] property readonly

Converts the class variables into a list of usable Filter Models.

Returns:

Type Description
List[zenml.models.v2.base.filter.Filter]

A list of Filter models.

logical_operator: LogicalOperators pydantic-field

Which logical operator to use between all filters ['and', 'or']

offset: int property readonly

Returns the offset needed for the query on the data persistence layer.

Returns:

Type Description
int

The offset for the query.

page: ConstrainedIntValue pydantic-field

Page number

size: ConstrainedIntValue pydantic-field

Page size

sort_by: str pydantic-field

Which column to sort by.

sorting_params: Tuple[str, zenml.enums.SorterOps] property readonly

Converts the class variables into a list of usable Filter Models.

Returns:

Type Description
Tuple[str, zenml.enums.SorterOps]

A tuple of the column to sort by and the sorting operand.

updated: Union[datetime.datetime, str] pydantic-field

Updated

Config

Pydantic configuration class.

Source code in zenml/models/v2/base/filter.py
class Config:
    """Pydantic configuration class."""

    # all attributes with leading underscore are private and therefore
    # are mutable and not included in serialization
    underscore_attrs_are_private = True
apply_filter(self, query, table)

Applies the filter to a query.

Parameters:

Name Type Description Default
query ~AnyQuery

The query to which to apply the filter.

required
table Type[AnySchema]

The query table.

required

Returns:

Type Description
~AnyQuery

The query with filter applied.

Source code in zenml/models/v2/base/filter.py
def apply_filter(
    self,
    query: AnyQuery,
    table: Type["AnySchema"],
) -> AnyQuery:
    """Applies the filter to a query.

    Args:
        query: The query to which to apply the filter.
        table: The query table.

    Returns:
        The query with filter applied.
    """
    rbac_filter = self.generate_rbac_filter(table=table)

    if rbac_filter is not None:
        query = query.where(rbac_filter)

    filters = self.generate_filter(table=table)

    if filters is not None:
        query = query.where(filters)

    return query
configure_rbac(self, authenticated_user_id, **column_allowed_ids)

Configure RBAC allowed column values.

Parameters:

Name Type Description Default
authenticated_user_id UUID

ID of the authenticated user. All entities owned by this user will be included.

required
column_allowed_ids Optional[Set[uuid.UUID]]

Set of IDs per column to limit the query to. If given, the remaining filters will be applied to entities within this set only. If None, the remaining filters will applied to all entries in the table.

{}
Source code in zenml/models/v2/base/filter.py
def configure_rbac(
    self,
    authenticated_user_id: UUID,
    **column_allowed_ids: Optional[Set[UUID]],
) -> None:
    """Configure RBAC allowed column values.

    Args:
        authenticated_user_id: ID of the authenticated user. All entities
            owned by this user will be included.
        column_allowed_ids: Set of IDs per column to limit the query to.
            If given, the remaining filters will be applied to entities
            within this set only. If `None`, the remaining filters will
            applied to all entries in the table.
    """
    self._rbac_configuration = (authenticated_user_id, column_allowed_ids)
filter_ops(values) classmethod

Parse incoming filters to ensure all filters are legal.

Parameters:

Name Type Description Default
values Dict[str, Any]

The values of the class.

required

Returns:

Type Description
Dict[str, Any]

The values of the class.

Source code in zenml/models/v2/base/filter.py
@root_validator(pre=True)
def filter_ops(cls, values: Dict[str, Any]) -> Dict[str, Any]:
    """Parse incoming filters to ensure all filters are legal.

    Args:
        values: The values of the class.

    Returns:
        The values of the class.
    """
    cls._generate_filter_list(values)
    return values
generate_filter(self, table)

Generate the filter for the query.

Parameters:

Name Type Description Default
table Type[sqlmodel.main.SQLModel]

The Table that is being queried from.

required

Returns:

Type Description
Union[BinaryExpression[Any], BooleanClauseList[Any]]

The filter expression for the query.

Exceptions:

Type Description
RuntimeError

If a valid logical operator is not supplied.

Source code in zenml/models/v2/base/filter.py
def generate_filter(
    self, table: Type[SQLModel]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
    """Generate the filter for the query.

    Args:
        table: The Table that is being queried from.

    Returns:
        The filter expression for the query.

    Raises:
        RuntimeError: If a valid logical operator is not supplied.
    """
    from sqlalchemy import and_
    from sqlmodel import or_

    filters = []
    for column_filter in self.list_of_filters:
        filters.append(
            column_filter.generate_query_conditions(table=table)
        )
    for custom_filter in self.get_custom_filters():
        filters.append(custom_filter)
    if self.logical_operator == LogicalOperators.OR:
        return or_(False, *filters)
    elif self.logical_operator == LogicalOperators.AND:
        return and_(True, *filters)
    else:
        raise RuntimeError("No valid logical operator was supplied.")
generate_rbac_filter(self, table)

Generates an optional RBAC filter.

Parameters:

Name Type Description Default
table Type[AnySchema]

The query table.

required

Returns:

Type Description
Optional[BooleanClauseList[Any]]

The RBAC filter.

Source code in zenml/models/v2/base/filter.py
def generate_rbac_filter(
    self,
    table: Type["AnySchema"],
) -> Optional["BooleanClauseList[Any]"]:
    """Generates an optional RBAC filter.

    Args:
        table: The query table.

    Returns:
        The RBAC filter.
    """
    from sqlmodel import or_

    if not self._rbac_configuration:
        return None

    expressions = []

    for column_name, allowed_ids in self._rbac_configuration[1].items():
        if allowed_ids is not None:
            expression = getattr(table, column_name).in_(allowed_ids)
            expressions.append(expression)

    if expressions and hasattr(table, "user_id"):
        # If `expressions` is not empty, we do not have full access to all
        # rows of the table. In this case, we also include rows which the
        # user owns.

        # Unowned entities are considered server-owned and can be seen
        # by anyone
        expressions.append(getattr(table, "user_id").is_(None))
        # The authenticated user owns this entity
        expressions.append(
            getattr(table, "user_id") == self._rbac_configuration[0]
        )

    if expressions:
        return or_(*expressions)
    else:
        return None
get_custom_filters(self)

Get custom filters.

This can be overridden by subclasses to define custom filters that are not based on the columns of the underlying table.

Returns:

Type Description
List[Union[BinaryExpression[Any], BooleanClauseList[Any]]]

A list of custom filters.

Source code in zenml/models/v2/base/filter.py
def get_custom_filters(
    self,
) -> List[Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]]:
    """Get custom filters.

    This can be overridden by subclasses to define custom filters that are
    not based on the columns of the underlying table.

    Returns:
        A list of custom filters.
    """
    return []
is_bool_field(k) classmethod

Checks if it's a bool field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a bool field, False otherwise.

Source code in zenml/models/v2/base/filter.py
@classmethod
def is_bool_field(cls, k: str) -> bool:
    """Checks if it's a bool field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a bool field, False otherwise.
    """
    return (
        issubclass(bool, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is bool
    )
is_datetime_field(k) classmethod

Checks if it's a datetime field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a datetime field, False otherwise.

Source code in zenml/models/v2/base/filter.py
@classmethod
def is_datetime_field(cls, k: str) -> bool:
    """Checks if it's a datetime field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a datetime field, False otherwise.
    """
    return (
        issubclass(datetime, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is datetime
    )
is_int_field(k) classmethod

Checks if it's a int field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a int field, False otherwise.

Source code in zenml/models/v2/base/filter.py
@classmethod
def is_int_field(cls, k: str) -> bool:
    """Checks if it's a int field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a int field, False otherwise.
    """
    return (
        issubclass(int, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is int
    )
is_sort_by_field(k) classmethod

Checks if it's a sort by field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a sort by field, False otherwise.

Source code in zenml/models/v2/base/filter.py
@classmethod
def is_sort_by_field(cls, k: str) -> bool:
    """Checks if it's a sort by field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a sort by field, False otherwise.
    """
    return (
        issubclass(str, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ == str
    ) and k == "sort_by"
is_str_field(k) classmethod

Checks if it's a string field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a string field, False otherwise.

Source code in zenml/models/v2/base/filter.py
@classmethod
def is_str_field(cls, k: str) -> bool:
    """Checks if it's a string field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a string field, False otherwise.
    """
    return (
        issubclass(str, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is str
    )
is_uuid_field(k) classmethod

Checks if it's a uuid field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a uuid field, False otherwise.

Source code in zenml/models/v2/base/filter.py
@classmethod
def is_uuid_field(cls, k: str) -> bool:
    """Checks if it's a uuid field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a uuid field, False otherwise.
    """
    return (
        issubclass(UUID, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is UUID
    )
validate_sort_by(v) classmethod

Validate that the sort_column is a valid column with a valid operand.

Parameters:

Name Type Description Default
v str

The sort_by field value.

required

Returns:

Type Description
str

The validated sort_by field value.

Exceptions:

Type Description
ValidationError

If the sort_by field is not a string.

ValueError

If the resource can't be sorted by this field.

Source code in zenml/models/v2/base/filter.py
@validator("sort_by", pre=True)
def validate_sort_by(cls, v: str) -> str:
    """Validate that the sort_column is a valid column with a valid operand.

    Args:
        v: The sort_by field value.

    Returns:
        The validated sort_by field value.

    Raises:
        ValidationError: If the sort_by field is not a string.
        ValueError: If the resource can't be sorted by this field.
    """
    # Somehow pydantic allows you to pass in int values, which will be
    #  interpreted as string, however within the validator they are still
    #  integers, which don't have a .split() method
    if not isinstance(v, str):
        raise ValidationError(
            f"str type expected for the sort_by field. "
            f"Received a {type(v)}"
        )
    column = v
    split_value = v.split(":", 1)
    if len(split_value) == 2:
        column = split_value[1]

        if split_value[0] not in SorterOps.values():
            logger.warning(
                "Invalid operand used for column sorting. "
                "Only the following operands are supported `%s`. "
                "Defaulting to 'asc' on column `%s`.",
                SorterOps.values(),
                column,
            )
            v = column

    if column in cls.FILTER_EXCLUDE_FIELDS:
        raise ValueError(
            f"This resource can not be sorted by this field: '{v}'"
        )
    elif column in cls.__fields__:
        return v
    else:
        raise ValueError(
            "You can only sort by valid fields of this resource"
        )
BoolFilter (Filter) pydantic-model

Filter for all Boolean fields.

Source code in zenml/models/v2/base/filter.py
class BoolFilter(Filter):
    """Filter for all Boolean fields."""

    ALLOWED_OPS: ClassVar[List[str]] = [GenericFilterOps.EQUALS]

    def generate_query_conditions_from_column(self, column: Any) -> Any:
        """Generate query conditions for a boolean column.

        Args:
            column: The boolean column of an SQLModel table on which to filter.

        Returns:
            A list of query conditions.
        """
        return column == self.value
generate_query_conditions_from_column(self, column)

Generate query conditions for a boolean column.

Parameters:

Name Type Description Default
column Any

The boolean column of an SQLModel table on which to filter.

required

Returns:

Type Description
Any

A list of query conditions.

Source code in zenml/models/v2/base/filter.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
    """Generate query conditions for a boolean column.

    Args:
        column: The boolean column of an SQLModel table on which to filter.

    Returns:
        A list of query conditions.
    """
    return column == self.value
Filter (BaseModel, ABC) pydantic-model

Filter for all fields.

A Filter is a combination of a column, a value that the user uses to filter on this column and an operation to use. The easiest example would be user equals aria with column=user, value=aria and the operation=equals.

All subclasses of this class will support different sets of operations. This operation set is defined in the ALLOWED_OPS class variable.

Source code in zenml/models/v2/base/filter.py
class Filter(BaseModel, ABC):
    """Filter for all fields.

    A Filter is a combination of a column, a value that the user uses to
    filter on this column and an operation to use. The easiest example
    would be `user equals aria` with column=`user`, value=`aria` and the
    operation=`equals`.

    All subclasses of this class will support different sets of operations.
    This operation set is defined in the ALLOWED_OPS class variable.
    """

    ALLOWED_OPS: ClassVar[List[str]] = []

    operation: GenericFilterOps
    column: str
    value: Any

    @validator("operation", pre=True)
    def validate_operation(cls, op: str) -> str:
        """Validate that the operation is a valid op for the field type.

        Args:
            op: The operation of this filter.

        Returns:
            The operation if it is valid.

        Raises:
            ValueError: If the operation is not valid for this field type.
        """
        if op not in cls.ALLOWED_OPS:
            raise ValueError(
                f"This datatype can not be filtered using this operation: "
                f"'{op}'. The allowed operations are: {cls.ALLOWED_OPS}"
            )
        else:
            return op

    def generate_query_conditions(
        self,
        table: Type[SQLModel],
    ) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
        """Generate the query conditions for the database.

        This method converts the Filter class into an appropriate SQLModel
        query condition, to be used when filtering on the Database.

        Args:
            table: The SQLModel table to use for the query creation

        Returns:
            A list of conditions that will be combined using the `and` operation
        """
        column = getattr(table, self.column)
        conditions = self.generate_query_conditions_from_column(column)
        return conditions  # type:ignore[no-any-return]

    @abstractmethod
    def generate_query_conditions_from_column(self, column: Any) -> Any:
        """Generate query conditions given the corresponding database column.

        This method should be overridden by subclasses to define how each
        supported operation in `self.ALLOWED_OPS` can be used to filter the
        given column by `self.value`.

        Args:
            column: The column of an SQLModel table on which to filter.

        Returns:
            A list of query conditions.
        """
generate_query_conditions(self, table)

Generate the query conditions for the database.

This method converts the Filter class into an appropriate SQLModel query condition, to be used when filtering on the Database.

Parameters:

Name Type Description Default
table Type[sqlmodel.main.SQLModel]

The SQLModel table to use for the query creation

required

Returns:

Type Description
Union[BinaryExpression[Any], BooleanClauseList[Any]]

A list of conditions that will be combined using the and operation

Source code in zenml/models/v2/base/filter.py
def generate_query_conditions(
    self,
    table: Type[SQLModel],
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
    """Generate the query conditions for the database.

    This method converts the Filter class into an appropriate SQLModel
    query condition, to be used when filtering on the Database.

    Args:
        table: The SQLModel table to use for the query creation

    Returns:
        A list of conditions that will be combined using the `and` operation
    """
    column = getattr(table, self.column)
    conditions = self.generate_query_conditions_from_column(column)
    return conditions  # type:ignore[no-any-return]
generate_query_conditions_from_column(self, column)

Generate query conditions given the corresponding database column.

This method should be overridden by subclasses to define how each supported operation in self.ALLOWED_OPS can be used to filter the given column by self.value.

Parameters:

Name Type Description Default
column Any

The column of an SQLModel table on which to filter.

required

Returns:

Type Description
Any

A list of query conditions.

Source code in zenml/models/v2/base/filter.py
@abstractmethod
def generate_query_conditions_from_column(self, column: Any) -> Any:
    """Generate query conditions given the corresponding database column.

    This method should be overridden by subclasses to define how each
    supported operation in `self.ALLOWED_OPS` can be used to filter the
    given column by `self.value`.

    Args:
        column: The column of an SQLModel table on which to filter.

    Returns:
        A list of query conditions.
    """
validate_operation(op) classmethod

Validate that the operation is a valid op for the field type.

Parameters:

Name Type Description Default
op str

The operation of this filter.

required

Returns:

Type Description
str

The operation if it is valid.

Exceptions:

Type Description
ValueError

If the operation is not valid for this field type.

Source code in zenml/models/v2/base/filter.py
@validator("operation", pre=True)
def validate_operation(cls, op: str) -> str:
    """Validate that the operation is a valid op for the field type.

    Args:
        op: The operation of this filter.

    Returns:
        The operation if it is valid.

    Raises:
        ValueError: If the operation is not valid for this field type.
    """
    if op not in cls.ALLOWED_OPS:
        raise ValueError(
            f"This datatype can not be filtered using this operation: "
            f"'{op}'. The allowed operations are: {cls.ALLOWED_OPS}"
        )
    else:
        return op
NumericFilter (Filter) pydantic-model

Filter for all numeric fields.

Source code in zenml/models/v2/base/filter.py
class NumericFilter(Filter):
    """Filter for all numeric fields."""

    value: Union[float, datetime]

    ALLOWED_OPS: ClassVar[List[str]] = [
        GenericFilterOps.EQUALS,
        GenericFilterOps.GT,
        GenericFilterOps.GTE,
        GenericFilterOps.LT,
        GenericFilterOps.LTE,
    ]

    def generate_query_conditions_from_column(self, column: Any) -> Any:
        """Generate query conditions for a UUID column.

        Args:
            column: The UUID column of an SQLModel table on which to filter.

        Returns:
            A list of query conditions.
        """
        if self.operation == GenericFilterOps.GTE:
            return column >= self.value
        if self.operation == GenericFilterOps.GT:
            return column > self.value
        if self.operation == GenericFilterOps.LTE:
            return column <= self.value
        if self.operation == GenericFilterOps.LT:
            return column < self.value
        return column == self.value
generate_query_conditions_from_column(self, column)

Generate query conditions for a UUID column.

Parameters:

Name Type Description Default
column Any

The UUID column of an SQLModel table on which to filter.

required

Returns:

Type Description
Any

A list of query conditions.

Source code in zenml/models/v2/base/filter.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
    """Generate query conditions for a UUID column.

    Args:
        column: The UUID column of an SQLModel table on which to filter.

    Returns:
        A list of query conditions.
    """
    if self.operation == GenericFilterOps.GTE:
        return column >= self.value
    if self.operation == GenericFilterOps.GT:
        return column > self.value
    if self.operation == GenericFilterOps.LTE:
        return column <= self.value
    if self.operation == GenericFilterOps.LT:
        return column < self.value
    return column == self.value
StrFilter (Filter) pydantic-model

Filter for all string fields.

Source code in zenml/models/v2/base/filter.py
class StrFilter(Filter):
    """Filter for all string fields."""

    ALLOWED_OPS: ClassVar[List[str]] = [
        GenericFilterOps.EQUALS,
        GenericFilterOps.STARTSWITH,
        GenericFilterOps.CONTAINS,
        GenericFilterOps.ENDSWITH,
    ]

    def generate_query_conditions_from_column(self, column: Any) -> Any:
        """Generate query conditions for a string column.

        Args:
            column: The string column of an SQLModel table on which to filter.

        Returns:
            A list of query conditions.
        """
        if self.operation == GenericFilterOps.CONTAINS:
            return column.like(f"%{self.value}%")
        if self.operation == GenericFilterOps.STARTSWITH:
            return column.startswith(f"{self.value}")
        if self.operation == GenericFilterOps.ENDSWITH:
            return column.endswith(f"{self.value}")
        return column == self.value
generate_query_conditions_from_column(self, column)

Generate query conditions for a string column.

Parameters:

Name Type Description Default
column Any

The string column of an SQLModel table on which to filter.

required

Returns:

Type Description
Any

A list of query conditions.

Source code in zenml/models/v2/base/filter.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
    """Generate query conditions for a string column.

    Args:
        column: The string column of an SQLModel table on which to filter.

    Returns:
        A list of query conditions.
    """
    if self.operation == GenericFilterOps.CONTAINS:
        return column.like(f"%{self.value}%")
    if self.operation == GenericFilterOps.STARTSWITH:
        return column.startswith(f"{self.value}")
    if self.operation == GenericFilterOps.ENDSWITH:
        return column.endswith(f"{self.value}")
    return column == self.value
UUIDFilter (StrFilter) pydantic-model

Filter for all uuid fields which are mostly treated like strings.

Source code in zenml/models/v2/base/filter.py
class UUIDFilter(StrFilter):
    """Filter for all uuid fields which are mostly treated like strings."""

    def generate_query_conditions_from_column(self, column: Any) -> Any:
        """Generate query conditions for a UUID column.

        Args:
            column: The UUID column of an SQLModel table on which to filter.

        Returns:
            A list of query conditions.
        """
        import sqlalchemy
        from sqlalchemy_utils.functions import cast_if

        # For equality checks, compare the UUID directly
        if self.operation == GenericFilterOps.EQUALS:
            return column == self.value

        # For all other operations, cast and handle the column as string
        return super().generate_query_conditions_from_column(
            column=cast_if(column, sqlalchemy.String)
        )
generate_query_conditions_from_column(self, column)

Generate query conditions for a UUID column.

Parameters:

Name Type Description Default
column Any

The UUID column of an SQLModel table on which to filter.

required

Returns:

Type Description
Any

A list of query conditions.

Source code in zenml/models/v2/base/filter.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
    """Generate query conditions for a UUID column.

    Args:
        column: The UUID column of an SQLModel table on which to filter.

    Returns:
        A list of query conditions.
    """
    import sqlalchemy
    from sqlalchemy_utils.functions import cast_if

    # For equality checks, compare the UUID directly
    if self.operation == GenericFilterOps.EQUALS:
        return column == self.value

    # For all other operations, cast and handle the column as string
    return super().generate_query_conditions_from_column(
        column=cast_if(column, sqlalchemy.String)
    )
internal

Utility methods for internal models.

server_owned_request_model(_cls)

Convert a request model to a model which does not require a user ID.

Parameters:

Name Type Description Default
_cls Type[~T]

The class to decorate

required

Returns:

Type Description
Type[~T]

The decorated class.

Source code in zenml/models/v2/base/internal.py
def server_owned_request_model(_cls: Type[T]) -> Type[T]:
    """Convert a request model to a model which does not require a user ID.

    Args:
        _cls: The class to decorate

    Returns:
        The decorated class.
    """
    if user_field := _cls.__fields__.get("user", None):
        user_field.required = False
        user_field.allow_none = True
        user_field.default = None

    return _cls
page

Page model definitions.

Page (GenericModel, Generic) pydantic-model

Return Model for List Models to accommodate pagination.

Source code in zenml/models/v2/base/page.py
class Page(GenericModel, Generic[B]):
    """Return Model for List Models to accommodate pagination."""

    index: PositiveInt
    max_size: PositiveInt
    total_pages: NonNegativeInt
    total: NonNegativeInt
    items: List[B]

    __params_type__ = BaseFilter

    @property
    def size(self) -> int:
        """Return the item count of the page.

        Returns:
            The amount of items in the page.
        """
        return len(self.items)

    def __len__(self) -> int:
        """Return the item count of the page.

        This enables `len(page)`.

        Returns:
            The amount of items in the page.
        """
        return len(self.items)

    def __getitem__(self, index: int) -> B:
        """Return the item at the given index.

        This enables `page[index]`.

        Args:
            index: The index to get the item from.

        Returns:
            The item at the given index.
        """
        return self.items[index]

    def __iter__(self) -> Generator[B, None, None]:  # type: ignore[override]
        """Return an iterator over the items in the page.

        This enables `for item in page` loops, but breaks `dict(page)`.

        Yields:
            An iterator over the items in the page.
        """
        for item in self.items.__iter__():
            yield item

    def __contains__(self, item: B) -> bool:
        """Returns whether the page contains a specific item.

        This enables `item in page` checks.

        Args:
            item: The item to check for.

        Returns:
            Whether the item is in the page.
        """
        return item in self.items

    class Config:
        """Pydantic configuration class."""

        # This is needed to allow the REST API server to unpack SecretStr
        # values correctly before sending them to the client.
        json_encoders = {
            SecretStr: lambda v: v.get_secret_value() if v else None
        }
size: int property readonly

Return the item count of the page.

Returns:

Type Description
int

The amount of items in the page.

Config

Pydantic configuration class.

Source code in zenml/models/v2/base/page.py
class Config:
    """Pydantic configuration class."""

    # This is needed to allow the REST API server to unpack SecretStr
    # values correctly before sending them to the client.
    json_encoders = {
        SecretStr: lambda v: v.get_secret_value() if v else None
    }
__params_type__ (BaseModel) pydantic-model

Class to unify all filter, paginate and sort request parameters.

This Model allows fine-grained filtering, sorting and pagination of resources.

Usage example for subclasses of this class:

ResourceListModel(
    name="contains:default",
    workspace="default"
    count_steps="gte:5"
    sort_by="created",
    page=2,
    size=20
)
Source code in zenml/models/v2/base/page.py
class BaseFilter(BaseModel):
    """Class to unify all filter, paginate and sort request parameters.

    This Model allows fine-grained filtering, sorting and pagination of
    resources.

    Usage example for subclasses of this class:
    ```
    ResourceListModel(
        name="contains:default",
        workspace="default"
        count_steps="gte:5"
        sort_by="created",
        page=2,
        size=20
    )
    ```
    """

    # List of fields that cannot be used as filters.
    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        "sort_by",
        "page",
        "size",
        "logical_operator",
    ]

    # List of fields that are not even mentioned as options in the CLI.
    CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = []

    # List of fields that are wrapped with `fastapi.Query(default)` in API.
    API_MULTI_INPUT_PARAMS: ClassVar[List[str]] = []

    sort_by: str = Field(
        default="created", description="Which column to sort by."
    )
    logical_operator: LogicalOperators = Field(
        default=LogicalOperators.AND,
        description="Which logical operator to use between all filters "
        "['and', 'or']",
    )
    page: int = Field(
        default=PAGINATION_STARTING_PAGE, ge=1, description="Page number"
    )
    size: int = Field(
        default=PAGE_SIZE_DEFAULT,
        ge=1,
        le=PAGE_SIZE_MAXIMUM,
        description="Page size",
    )

    id: Optional[Union[UUID, str]] = Field(
        default=None, description="Id for this resource"
    )
    created: Optional[Union[datetime, str]] = Field(
        default=None, description="Created"
    )
    updated: Optional[Union[datetime, str]] = Field(
        default=None, description="Updated"
    )

    _rbac_configuration: Optional[
        Tuple[UUID, Dict[str, Optional[Set[UUID]]]]
    ] = None

    @validator("sort_by", pre=True)
    def validate_sort_by(cls, v: str) -> str:
        """Validate that the sort_column is a valid column with a valid operand.

        Args:
            v: The sort_by field value.

        Returns:
            The validated sort_by field value.

        Raises:
            ValidationError: If the sort_by field is not a string.
            ValueError: If the resource can't be sorted by this field.
        """
        # Somehow pydantic allows you to pass in int values, which will be
        #  interpreted as string, however within the validator they are still
        #  integers, which don't have a .split() method
        if not isinstance(v, str):
            raise ValidationError(
                f"str type expected for the sort_by field. "
                f"Received a {type(v)}"
            )
        column = v
        split_value = v.split(":", 1)
        if len(split_value) == 2:
            column = split_value[1]

            if split_value[0] not in SorterOps.values():
                logger.warning(
                    "Invalid operand used for column sorting. "
                    "Only the following operands are supported `%s`. "
                    "Defaulting to 'asc' on column `%s`.",
                    SorterOps.values(),
                    column,
                )
                v = column

        if column in cls.FILTER_EXCLUDE_FIELDS:
            raise ValueError(
                f"This resource can not be sorted by this field: '{v}'"
            )
        elif column in cls.__fields__:
            return v
        else:
            raise ValueError(
                "You can only sort by valid fields of this resource"
            )

    @root_validator(pre=True)
    def filter_ops(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        """Parse incoming filters to ensure all filters are legal.

        Args:
            values: The values of the class.

        Returns:
            The values of the class.
        """
        cls._generate_filter_list(values)
        return values

    @property
    def list_of_filters(self) -> List[Filter]:
        """Converts the class variables into a list of usable Filter Models.

        Returns:
            A list of Filter models.
        """
        return self._generate_filter_list(
            {key: getattr(self, key) for key in self.__fields__}
        )

    @property
    def sorting_params(self) -> Tuple[str, SorterOps]:
        """Converts the class variables into a list of usable Filter Models.

        Returns:
            A tuple of the column to sort by and the sorting operand.
        """
        column = self.sort_by
        # The default sorting operand is asc
        operator = SorterOps.ASCENDING

        # Check if user explicitly set an operand
        split_value = self.sort_by.split(":", 1)
        if len(split_value) == 2:
            column = split_value[1]
            operator = SorterOps(split_value[0])

        return column, operator

    def configure_rbac(
        self,
        authenticated_user_id: UUID,
        **column_allowed_ids: Optional[Set[UUID]],
    ) -> None:
        """Configure RBAC allowed column values.

        Args:
            authenticated_user_id: ID of the authenticated user. All entities
                owned by this user will be included.
            column_allowed_ids: Set of IDs per column to limit the query to.
                If given, the remaining filters will be applied to entities
                within this set only. If `None`, the remaining filters will
                applied to all entries in the table.
        """
        self._rbac_configuration = (authenticated_user_id, column_allowed_ids)

    def generate_rbac_filter(
        self,
        table: Type["AnySchema"],
    ) -> Optional["BooleanClauseList[Any]"]:
        """Generates an optional RBAC filter.

        Args:
            table: The query table.

        Returns:
            The RBAC filter.
        """
        from sqlmodel import or_

        if not self._rbac_configuration:
            return None

        expressions = []

        for column_name, allowed_ids in self._rbac_configuration[1].items():
            if allowed_ids is not None:
                expression = getattr(table, column_name).in_(allowed_ids)
                expressions.append(expression)

        if expressions and hasattr(table, "user_id"):
            # If `expressions` is not empty, we do not have full access to all
            # rows of the table. In this case, we also include rows which the
            # user owns.

            # Unowned entities are considered server-owned and can be seen
            # by anyone
            expressions.append(getattr(table, "user_id").is_(None))
            # The authenticated user owns this entity
            expressions.append(
                getattr(table, "user_id") == self._rbac_configuration[0]
            )

        if expressions:
            return or_(*expressions)
        else:
            return None

    @classmethod
    def _generate_filter_list(cls, values: Dict[str, Any]) -> List[Filter]:
        """Create a list of filters from a (column, value) dictionary.

        Args:
            values: A dictionary of column names and values to filter on.

        Returns:
            A list of filters.
        """
        list_of_filters: List[Filter] = []

        for key, value in values.items():
            # Ignore excluded filters
            if key in cls.FILTER_EXCLUDE_FIELDS:
                continue

            # Skip filtering for None values
            if value is None:
                continue

            # Determine the operator and filter value
            value, operator = cls._resolve_operator(value)

            # Define the filter
            filter = cls._define_filter(
                column=key, value=value, operator=operator
            )
            list_of_filters.append(filter)

        return list_of_filters

    @staticmethod
    def _resolve_operator(value: Any) -> Tuple[Any, GenericFilterOps]:
        """Determine the operator and filter value from a user-provided value.

        If the user-provided value is a string of the form "operator:value",
        then the operator is extracted and the value is returned. Otherwise,
        `GenericFilterOps.EQUALS` is used as default operator and the value
        is returned as-is.

        Args:
            value: The user-provided value.

        Returns:
            A tuple of the filter value and the operator.
        """
        operator = GenericFilterOps.EQUALS  # Default operator
        if isinstance(value, str):
            split_value = value.split(":", 1)
            if (
                len(split_value) == 2
                and split_value[0] in GenericFilterOps.values()
            ):
                value = split_value[1]
                operator = GenericFilterOps(split_value[0])
        return value, operator

    @classmethod
    def _define_filter(
        cls, column: str, value: Any, operator: GenericFilterOps
    ) -> Filter:
        """Define a filter for a given column.

        Args:
            column: The column to filter on.
            value: The value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.
        """
        # Create datetime filters
        if cls.is_datetime_field(column):
            return cls._define_datetime_filter(
                column=column,
                value=value,
                operator=operator,
            )

        # Create UUID filters
        if cls.is_uuid_field(column):
            return cls._define_uuid_filter(
                column=column,
                value=value,
                operator=operator,
            )

        # Create int filters
        if cls.is_int_field(column):
            return NumericFilter(
                operation=GenericFilterOps(operator),
                column=column,
                value=int(value),
            )

        # Create bool filters
        if cls.is_bool_field(column):
            return cls._define_bool_filter(
                column=column,
                value=value,
                operator=operator,
            )

        # Create str filters
        if cls.is_str_field(column):
            return StrFilter(
                operation=GenericFilterOps(operator),
                column=column,
                value=value,
            )

        # Handle unsupported datatypes
        logger.warning(
            f"The Datatype {cls.__fields__[column].type_} might not be "
            "supported for filtering. Defaulting to a string filter."
        )
        return StrFilter(
            operation=GenericFilterOps(operator),
            column=column,
            value=str(value),
        )

    @classmethod
    def is_datetime_field(cls, k: str) -> bool:
        """Checks if it's a datetime field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a datetime field, False otherwise.
        """
        return (
            issubclass(datetime, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is datetime
        )

    @classmethod
    def is_uuid_field(cls, k: str) -> bool:
        """Checks if it's a uuid field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a uuid field, False otherwise.
        """
        return (
            issubclass(UUID, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is UUID
        )

    @classmethod
    def is_int_field(cls, k: str) -> bool:
        """Checks if it's a int field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a int field, False otherwise.
        """
        return (
            issubclass(int, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is int
        )

    @classmethod
    def is_bool_field(cls, k: str) -> bool:
        """Checks if it's a bool field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a bool field, False otherwise.
        """
        return (
            issubclass(bool, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is bool
        )

    @classmethod
    def is_str_field(cls, k: str) -> bool:
        """Checks if it's a string field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a string field, False otherwise.
        """
        return (
            issubclass(str, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is str
        )

    @classmethod
    def is_sort_by_field(cls, k: str) -> bool:
        """Checks if it's a sort by field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a sort by field, False otherwise.
        """
        return (
            issubclass(str, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ == str
        ) and k == "sort_by"

    @staticmethod
    def _define_datetime_filter(
        column: str, value: Any, operator: GenericFilterOps
    ) -> NumericFilter:
        """Define a datetime filter for a given column.

        Args:
            column: The column to filter on.
            value: The datetime value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.

        Raises:
            ValueError: If the value is not a valid datetime.
        """
        try:
            if isinstance(value, datetime):
                datetime_value = value
            else:
                datetime_value = datetime.strptime(
                    value, FILTERING_DATETIME_FORMAT
                )
        except ValueError as e:
            raise ValueError(
                "The datetime filter only works with values in the following "
                f"format: {FILTERING_DATETIME_FORMAT}"
            ) from e
        datetime_filter = NumericFilter(
            operation=GenericFilterOps(operator),
            column=column,
            value=datetime_value,
        )
        return datetime_filter

    @staticmethod
    def _define_uuid_filter(
        column: str, value: Any, operator: GenericFilterOps
    ) -> UUIDFilter:
        """Define a UUID filter for a given column.

        Args:
            column: The column to filter on.
            value: The UUID value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.

        Raises:
            ValueError: If the value is not a valid UUID.
        """
        # For equality checks, ensure that the value is a valid UUID.
        if operator == GenericFilterOps.EQUALS and not isinstance(value, UUID):
            try:
                UUID(value)
            except ValueError as e:
                raise ValueError(
                    "Invalid value passed as UUID query parameter."
                ) from e

        # Cast the value to string for further comparisons.
        value = str(value)

        # Generate the filter.
        uuid_filter = UUIDFilter(
            operation=GenericFilterOps(operator),
            column=column,
            value=value,
        )
        return uuid_filter

    @staticmethod
    def _define_bool_filter(
        column: str, value: Any, operator: GenericFilterOps
    ) -> BoolFilter:
        """Define a bool filter for a given column.

        Args:
            column: The column to filter on.
            value: The bool value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.
        """
        if GenericFilterOps(operator) != GenericFilterOps.EQUALS:
            logger.warning(
                "Boolean filters do not support any"
                "operation except for equals. Defaulting"
                "to an `equals` comparison."
            )
        return BoolFilter(
            operation=GenericFilterOps.EQUALS,
            column=column,
            value=bool(value),
        )

    @property
    def offset(self) -> int:
        """Returns the offset needed for the query on the data persistence layer.

        Returns:
            The offset for the query.
        """
        return self.size * (self.page - 1)

    def generate_filter(
        self, table: Type[SQLModel]
    ) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
        """Generate the filter for the query.

        Args:
            table: The Table that is being queried from.

        Returns:
            The filter expression for the query.

        Raises:
            RuntimeError: If a valid logical operator is not supplied.
        """
        from sqlalchemy import and_
        from sqlmodel import or_

        filters = []
        for column_filter in self.list_of_filters:
            filters.append(
                column_filter.generate_query_conditions(table=table)
            )
        for custom_filter in self.get_custom_filters():
            filters.append(custom_filter)
        if self.logical_operator == LogicalOperators.OR:
            return or_(False, *filters)
        elif self.logical_operator == LogicalOperators.AND:
            return and_(True, *filters)
        else:
            raise RuntimeError("No valid logical operator was supplied.")

    def get_custom_filters(
        self,
    ) -> List[Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]]:
        """Get custom filters.

        This can be overridden by subclasses to define custom filters that are
        not based on the columns of the underlying table.

        Returns:
            A list of custom filters.
        """
        return []

    def apply_filter(
        self,
        query: AnyQuery,
        table: Type["AnySchema"],
    ) -> AnyQuery:
        """Applies the filter to a query.

        Args:
            query: The query to which to apply the filter.
            table: The query table.

        Returns:
            The query with filter applied.
        """
        rbac_filter = self.generate_rbac_filter(table=table)

        if rbac_filter is not None:
            query = query.where(rbac_filter)

        filters = self.generate_filter(table=table)

        if filters is not None:
            query = query.where(filters)

        return query

    class Config:
        """Pydantic configuration class."""

        # all attributes with leading underscore are private and therefore
        # are mutable and not included in serialization
        underscore_attrs_are_private = True
created: Union[datetime.datetime, str] pydantic-field

Created

id: Union[uuid.UUID, str] pydantic-field

Id for this resource

list_of_filters: List[zenml.models.v2.base.filter.Filter] property readonly

Converts the class variables into a list of usable Filter Models.

Returns:

Type Description
List[zenml.models.v2.base.filter.Filter]

A list of Filter models.

logical_operator: LogicalOperators pydantic-field

Which logical operator to use between all filters ['and', 'or']

offset: int property readonly

Returns the offset needed for the query on the data persistence layer.

Returns:

Type Description
int

The offset for the query.

page: ConstrainedIntValue pydantic-field

Page number

size: ConstrainedIntValue pydantic-field

Page size

sort_by: str pydantic-field

Which column to sort by.

sorting_params: Tuple[str, zenml.enums.SorterOps] property readonly

Converts the class variables into a list of usable Filter Models.

Returns:

Type Description
Tuple[str, zenml.enums.SorterOps]

A tuple of the column to sort by and the sorting operand.

updated: Union[datetime.datetime, str] pydantic-field

Updated

Config

Pydantic configuration class.

Source code in zenml/models/v2/base/page.py
class Config:
    """Pydantic configuration class."""

    # all attributes with leading underscore are private and therefore
    # are mutable and not included in serialization
    underscore_attrs_are_private = True
apply_filter(self, query, table)

Applies the filter to a query.

Parameters:

Name Type Description Default
query ~AnyQuery

The query to which to apply the filter.

required
table Type[AnySchema]

The query table.

required

Returns:

Type Description
~AnyQuery

The query with filter applied.

Source code in zenml/models/v2/base/page.py
def apply_filter(
    self,
    query: AnyQuery,
    table: Type["AnySchema"],
) -> AnyQuery:
    """Applies the filter to a query.

    Args:
        query: The query to which to apply the filter.
        table: The query table.

    Returns:
        The query with filter applied.
    """
    rbac_filter = self.generate_rbac_filter(table=table)

    if rbac_filter is not None:
        query = query.where(rbac_filter)

    filters = self.generate_filter(table=table)

    if filters is not None:
        query = query.where(filters)

    return query
configure_rbac(self, authenticated_user_id, **column_allowed_ids)

Configure RBAC allowed column values.

Parameters:

Name Type Description Default
authenticated_user_id UUID

ID of the authenticated user. All entities owned by this user will be included.

required
column_allowed_ids Optional[Set[uuid.UUID]]

Set of IDs per column to limit the query to. If given, the remaining filters will be applied to entities within this set only. If None, the remaining filters will applied to all entries in the table.

{}
Source code in zenml/models/v2/base/page.py
def configure_rbac(
    self,
    authenticated_user_id: UUID,
    **column_allowed_ids: Optional[Set[UUID]],
) -> None:
    """Configure RBAC allowed column values.

    Args:
        authenticated_user_id: ID of the authenticated user. All entities
            owned by this user will be included.
        column_allowed_ids: Set of IDs per column to limit the query to.
            If given, the remaining filters will be applied to entities
            within this set only. If `None`, the remaining filters will
            applied to all entries in the table.
    """
    self._rbac_configuration = (authenticated_user_id, column_allowed_ids)
filter_ops(values) classmethod

Parse incoming filters to ensure all filters are legal.

Parameters:

Name Type Description Default
values Dict[str, Any]

The values of the class.

required

Returns:

Type Description
Dict[str, Any]

The values of the class.

Source code in zenml/models/v2/base/page.py
@root_validator(pre=True)
def filter_ops(cls, values: Dict[str, Any]) -> Dict[str, Any]:
    """Parse incoming filters to ensure all filters are legal.

    Args:
        values: The values of the class.

    Returns:
        The values of the class.
    """
    cls._generate_filter_list(values)
    return values
generate_filter(self, table)

Generate the filter for the query.

Parameters:

Name Type Description Default
table Type[sqlmodel.main.SQLModel]

The Table that is being queried from.

required

Returns:

Type Description
Union[BinaryExpression[Any], BooleanClauseList[Any]]

The filter expression for the query.

Exceptions:

Type Description
RuntimeError

If a valid logical operator is not supplied.

Source code in zenml/models/v2/base/page.py
def generate_filter(
    self, table: Type[SQLModel]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
    """Generate the filter for the query.

    Args:
        table: The Table that is being queried from.

    Returns:
        The filter expression for the query.

    Raises:
        RuntimeError: If a valid logical operator is not supplied.
    """
    from sqlalchemy import and_
    from sqlmodel import or_

    filters = []
    for column_filter in self.list_of_filters:
        filters.append(
            column_filter.generate_query_conditions(table=table)
        )
    for custom_filter in self.get_custom_filters():
        filters.append(custom_filter)
    if self.logical_operator == LogicalOperators.OR:
        return or_(False, *filters)
    elif self.logical_operator == LogicalOperators.AND:
        return and_(True, *filters)
    else:
        raise RuntimeError("No valid logical operator was supplied.")
generate_rbac_filter(self, table)

Generates an optional RBAC filter.

Parameters:

Name Type Description Default
table Type[AnySchema]

The query table.

required

Returns:

Type Description
Optional[BooleanClauseList[Any]]

The RBAC filter.

Source code in zenml/models/v2/base/page.py
def generate_rbac_filter(
    self,
    table: Type["AnySchema"],
) -> Optional["BooleanClauseList[Any]"]:
    """Generates an optional RBAC filter.

    Args:
        table: The query table.

    Returns:
        The RBAC filter.
    """
    from sqlmodel import or_

    if not self._rbac_configuration:
        return None

    expressions = []

    for column_name, allowed_ids in self._rbac_configuration[1].items():
        if allowed_ids is not None:
            expression = getattr(table, column_name).in_(allowed_ids)
            expressions.append(expression)

    if expressions and hasattr(table, "user_id"):
        # If `expressions` is not empty, we do not have full access to all
        # rows of the table. In this case, we also include rows which the
        # user owns.

        # Unowned entities are considered server-owned and can be seen
        # by anyone
        expressions.append(getattr(table, "user_id").is_(None))
        # The authenticated user owns this entity
        expressions.append(
            getattr(table, "user_id") == self._rbac_configuration[0]
        )

    if expressions:
        return or_(*expressions)
    else:
        return None
get_custom_filters(self)

Get custom filters.

This can be overridden by subclasses to define custom filters that are not based on the columns of the underlying table.

Returns:

Type Description
List[Union[BinaryExpression[Any], BooleanClauseList[Any]]]

A list of custom filters.

Source code in zenml/models/v2/base/page.py
def get_custom_filters(
    self,
) -> List[Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]]:
    """Get custom filters.

    This can be overridden by subclasses to define custom filters that are
    not based on the columns of the underlying table.

    Returns:
        A list of custom filters.
    """
    return []
is_bool_field(k) classmethod

Checks if it's a bool field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a bool field, False otherwise.

Source code in zenml/models/v2/base/page.py
@classmethod
def is_bool_field(cls, k: str) -> bool:
    """Checks if it's a bool field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a bool field, False otherwise.
    """
    return (
        issubclass(bool, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is bool
    )
is_datetime_field(k) classmethod

Checks if it's a datetime field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a datetime field, False otherwise.

Source code in zenml/models/v2/base/page.py
@classmethod
def is_datetime_field(cls, k: str) -> bool:
    """Checks if it's a datetime field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a datetime field, False otherwise.
    """
    return (
        issubclass(datetime, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is datetime
    )
is_int_field(k) classmethod

Checks if it's a int field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a int field, False otherwise.

Source code in zenml/models/v2/base/page.py
@classmethod
def is_int_field(cls, k: str) -> bool:
    """Checks if it's a int field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a int field, False otherwise.
    """
    return (
        issubclass(int, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is int
    )
is_sort_by_field(k) classmethod

Checks if it's a sort by field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a sort by field, False otherwise.

Source code in zenml/models/v2/base/page.py
@classmethod
def is_sort_by_field(cls, k: str) -> bool:
    """Checks if it's a sort by field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a sort by field, False otherwise.
    """
    return (
        issubclass(str, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ == str
    ) and k == "sort_by"
is_str_field(k) classmethod

Checks if it's a string field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a string field, False otherwise.

Source code in zenml/models/v2/base/page.py
@classmethod
def is_str_field(cls, k: str) -> bool:
    """Checks if it's a string field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a string field, False otherwise.
    """
    return (
        issubclass(str, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is str
    )
is_uuid_field(k) classmethod

Checks if it's a uuid field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a uuid field, False otherwise.

Source code in zenml/models/v2/base/page.py
@classmethod
def is_uuid_field(cls, k: str) -> bool:
    """Checks if it's a uuid field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a uuid field, False otherwise.
    """
    return (
        issubclass(UUID, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is UUID
    )
validate_sort_by(v) classmethod

Validate that the sort_column is a valid column with a valid operand.

Parameters:

Name Type Description Default
v str

The sort_by field value.

required

Returns:

Type Description
str

The validated sort_by field value.

Exceptions:

Type Description
ValidationError

If the sort_by field is not a string.

ValueError

If the resource can't be sorted by this field.

Source code in zenml/models/v2/base/page.py
@validator("sort_by", pre=True)
def validate_sort_by(cls, v: str) -> str:
    """Validate that the sort_column is a valid column with a valid operand.

    Args:
        v: The sort_by field value.

    Returns:
        The validated sort_by field value.

    Raises:
        ValidationError: If the sort_by field is not a string.
        ValueError: If the resource can't be sorted by this field.
    """
    # Somehow pydantic allows you to pass in int values, which will be
    #  interpreted as string, however within the validator they are still
    #  integers, which don't have a .split() method
    if not isinstance(v, str):
        raise ValidationError(
            f"str type expected for the sort_by field. "
            f"Received a {type(v)}"
        )
    column = v
    split_value = v.split(":", 1)
    if len(split_value) == 2:
        column = split_value[1]

        if split_value[0] not in SorterOps.values():
            logger.warning(
                "Invalid operand used for column sorting. "
                "Only the following operands are supported `%s`. "
                "Defaulting to 'asc' on column `%s`.",
                SorterOps.values(),
                column,
            )
            v = column

    if column in cls.FILTER_EXCLUDE_FIELDS:
        raise ValueError(
            f"This resource can not be sorted by this field: '{v}'"
        )
    elif column in cls.__fields__:
        return v
    else:
        raise ValueError(
            "You can only sort by valid fields of this resource"
        )
__contains__(self, item) special

Returns whether the page contains a specific item.

This enables item in page checks.

Parameters:

Name Type Description Default
item ~B

The item to check for.

required

Returns:

Type Description
bool

Whether the item is in the page.

Source code in zenml/models/v2/base/page.py
def __contains__(self, item: B) -> bool:
    """Returns whether the page contains a specific item.

    This enables `item in page` checks.

    Args:
        item: The item to check for.

    Returns:
        Whether the item is in the page.
    """
    return item in self.items
__getitem__(self, index) special

Return the item at the given index.

This enables page[index].

Parameters:

Name Type Description Default
index int

The index to get the item from.

required

Returns:

Type Description
~B

The item at the given index.

Source code in zenml/models/v2/base/page.py
def __getitem__(self, index: int) -> B:
    """Return the item at the given index.

    This enables `page[index]`.

    Args:
        index: The index to get the item from.

    Returns:
        The item at the given index.
    """
    return self.items[index]
__iter__(self) special

Return an iterator over the items in the page.

This enables for item in page loops, but breaks dict(page).

Yields:

Type Description
Generator[~B, NoneType, NoneType]

An iterator over the items in the page.

Source code in zenml/models/v2/base/page.py
def __iter__(self) -> Generator[B, None, None]:  # type: ignore[override]
    """Return an iterator over the items in the page.

    This enables `for item in page` loops, but breaks `dict(page)`.

    Yields:
        An iterator over the items in the page.
    """
    for item in self.items.__iter__():
        yield item
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

__len__(self) special

Return the item count of the page.

This enables len(page).

Returns:

Type Description
int

The amount of items in the page.

Source code in zenml/models/v2/base/page.py
def __len__(self) -> int:
    """Return the item count of the page.

    This enables `len(page)`.

    Returns:
        The amount of items in the page.
    """
    return len(self.items)
scoped

Scoped model definitions.

BaseResponse[UserBody, UserMetadata] (BaseResponse) pydantic-model
Config

Pydantic configuration class.

Source code in zenml/models/v2/base/scoped.py
class Config:
    """Pydantic configuration class."""

    # This is needed to allow the REST client and server to unpack SecretStr
    # values correctly.
    json_encoders = {
        SecretStr: lambda v: v.get_secret_value()
        if v is not None
        else None
    }

    # Allow extras on all models to support forwards and backwards
    # compatibility (e.g. new fields in newer versions of ZenML servers
    # are allowed to be present in older versions of ZenML clients and
    # vice versa).
    extra = "allow"
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

UserScopedFilter (BaseFilter) pydantic-model

Model to enable advanced user-based scoping.

Source code in zenml/models/v2/base/scoped.py
class UserScopedFilter(BaseFilter):
    """Model to enable advanced user-based scoping."""

    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *BaseFilter.FILTER_EXCLUDE_FIELDS,
        "scope_user",
    ]
    CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *BaseFilter.CLI_EXCLUDE_FIELDS,
        "scope_user",
    ]
    scope_user: Optional[UUID] = Field(
        default=None,
        description="The user to scope this query to.",
    )

    def set_scope_user(self, user_id: UUID) -> None:
        """Set the user that is performing the filtering to scope the response.

        Args:
            user_id: The user ID to scope the response to.
        """
        self.scope_user = user_id

    def apply_filter(
        self,
        query: AnyQuery,
        table: Type["AnySchema"],
    ) -> AnyQuery:
        """Applies the filter to a query.

        Args:
            query: The query to which to apply the filter.
            table: The query table.

        Returns:
            The query with filter applied.
        """
        query = super().apply_filter(query=query, table=table)

        if self.scope_user:
            query = query.where(getattr(table, "user_id") == self.scope_user)

        return query
scope_user: UUID pydantic-field

The user to scope this query to.

apply_filter(self, query, table)

Applies the filter to a query.

Parameters:

Name Type Description Default
query ~AnyQuery

The query to which to apply the filter.

required
table Type[AnySchema]

The query table.

required

Returns:

Type Description
~AnyQuery

The query with filter applied.

Source code in zenml/models/v2/base/scoped.py
def apply_filter(
    self,
    query: AnyQuery,
    table: Type["AnySchema"],
) -> AnyQuery:
    """Applies the filter to a query.

    Args:
        query: The query to which to apply the filter.
        table: The query table.

    Returns:
        The query with filter applied.
    """
    query = super().apply_filter(query=query, table=table)

    if self.scope_user:
        query = query.where(getattr(table, "user_id") == self.scope_user)

    return query
set_scope_user(self, user_id)

Set the user that is performing the filtering to scope the response.

Parameters:

Name Type Description Default
user_id UUID

The user ID to scope the response to.

required
Source code in zenml/models/v2/base/scoped.py
def set_scope_user(self, user_id: UUID) -> None:
    """Set the user that is performing the filtering to scope the response.

    Args:
        user_id: The user ID to scope the response to.
    """
    self.scope_user = user_id
UserScopedRequest (BaseRequest) pydantic-model

Base user-owned request model.

Used as a base class for all domain models that are "owned" by a user.

Source code in zenml/models/v2/base/scoped.py
class UserScopedRequest(BaseRequest):
    """Base user-owned request model.

    Used as a base class for all domain models that are "owned" by a user.
    """

    user: UUID = Field(title="The id of the user that created this resource.")

    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Fetches the analytics metadata for user scoped models.

        Returns:
            The analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        metadata["user_id"] = self.user
        return metadata
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_analytics_metadata(self)

Fetches the analytics metadata for user scoped models.

Returns:

Type Description
Dict[str, Any]

The analytics metadata.

Source code in zenml/models/v2/base/scoped.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Fetches the analytics metadata for user scoped models.

    Returns:
        The analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    metadata["user_id"] = self.user
    return metadata
UserScopedResponse (BaseResponse[UserBody, UserMetadata], Generic) pydantic-model

Base user-owned model.

Used as a base class for all domain models that are "owned" by a user.

Source code in zenml/models/v2/base/scoped.py
class UserScopedResponse(
    BaseResponse[UserBody, UserMetadata], Generic[UserBody, UserMetadata]
):
    """Base user-owned model.

    Used as a base class for all domain models that are "owned" by a user.
    """

    # Analytics
    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Fetches the analytics metadata for user scoped models.

        Returns:
            The analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        if self.user is not None:
            metadata["user_id"] = self.user.id
        return metadata

    # Body and metadata properties
    @property
    def user(self) -> Optional["UserResponse"]:
        """The `user` property.

        Returns:
            the value of the property.
        """
        return self.get_body().user
user: Optional[UserResponse] property readonly

The user property.

Returns:

Type Description
Optional[UserResponse]

the value of the property.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_analytics_metadata(self)

Fetches the analytics metadata for user scoped models.

Returns:

Type Description
Dict[str, Any]

The analytics metadata.

Source code in zenml/models/v2/base/scoped.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Fetches the analytics metadata for user scoped models.

    Returns:
        The analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    if self.user is not None:
        metadata["user_id"] = self.user.id
    return metadata
UserScopedResponseBody (BaseResponseBody) pydantic-model

Base user-owned body.

Source code in zenml/models/v2/base/scoped.py
class UserScopedResponseBody(BaseResponseBody):
    """Base user-owned body."""

    user: Optional["UserResponse"] = Field(
        title="The user who created this resource."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

UserScopedResponseMetadata (BaseResponseMetadata) pydantic-model

Base user-owned metadata.

Source code in zenml/models/v2/base/scoped.py
class UserScopedResponseMetadata(BaseResponseMetadata):
    """Base user-owned metadata."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

UserScopedResponse[WorkspaceBody, WorkspaceMetadata] (UserScopedResponse, BaseResponse[UserBody, UserMetadata][WorkspaceBody, WorkspaceMetadata]) pydantic-model
Config

Pydantic configuration class.

Source code in zenml/models/v2/base/scoped.py
class Config:
    """Pydantic configuration class."""

    # This is needed to allow the REST client and server to unpack SecretStr
    # values correctly.
    json_encoders = {
        SecretStr: lambda v: v.get_secret_value()
        if v is not None
        else None
    }

    # Allow extras on all models to support forwards and backwards
    # compatibility (e.g. new fields in newer versions of ZenML servers
    # are allowed to be present in older versions of ZenML clients and
    # vice versa).
    extra = "allow"
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

WorkspaceScopedFilter (BaseFilter) pydantic-model

Model to enable advanced scoping with workspace.

Source code in zenml/models/v2/base/scoped.py
class WorkspaceScopedFilter(BaseFilter):
    """Model to enable advanced scoping with workspace."""

    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *BaseFilter.FILTER_EXCLUDE_FIELDS,
        "scope_workspace",
    ]
    CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *BaseFilter.CLI_EXCLUDE_FIELDS,
        "scope_workspace",
    ]
    scope_workspace: Optional[UUID] = Field(
        default=None,
        description="The workspace to scope this query to.",
    )

    def set_scope_workspace(self, workspace_id: UUID) -> None:
        """Set the workspace to scope this response.

        Args:
            workspace_id: The workspace to scope this response to.
        """
        self.scope_workspace = workspace_id

    def apply_filter(
        self,
        query: AnyQuery,
        table: Type["AnySchema"],
    ) -> AnyQuery:
        """Applies the filter to a query.

        Args:
            query: The query to which to apply the filter.
            table: The query table.

        Returns:
            The query with filter applied.
        """
        from sqlmodel import or_

        query = super().apply_filter(query=query, table=table)

        if self.scope_workspace:
            scope_filter = or_(
                getattr(table, "workspace_id") == self.scope_workspace,
                getattr(table, "workspace_id").is_(None),
            )
            query = query.where(scope_filter)

        return query
scope_workspace: UUID pydantic-field

The workspace to scope this query to.

apply_filter(self, query, table)

Applies the filter to a query.

Parameters:

Name Type Description Default
query ~AnyQuery

The query to which to apply the filter.

required
table Type[AnySchema]

The query table.

required

Returns:

Type Description
~AnyQuery

The query with filter applied.

Source code in zenml/models/v2/base/scoped.py
def apply_filter(
    self,
    query: AnyQuery,
    table: Type["AnySchema"],
) -> AnyQuery:
    """Applies the filter to a query.

    Args:
        query: The query to which to apply the filter.
        table: The query table.

    Returns:
        The query with filter applied.
    """
    from sqlmodel import or_

    query = super().apply_filter(query=query, table=table)

    if self.scope_workspace:
        scope_filter = or_(
            getattr(table, "workspace_id") == self.scope_workspace,
            getattr(table, "workspace_id").is_(None),
        )
        query = query.where(scope_filter)

    return query
set_scope_workspace(self, workspace_id)

Set the workspace to scope this response.

Parameters:

Name Type Description Default
workspace_id UUID

The workspace to scope this response to.

required
Source code in zenml/models/v2/base/scoped.py
def set_scope_workspace(self, workspace_id: UUID) -> None:
    """Set the workspace to scope this response.

    Args:
        workspace_id: The workspace to scope this response to.
    """
    self.scope_workspace = workspace_id
WorkspaceScopedRequest (UserScopedRequest) pydantic-model

Base workspace-scoped request domain model.

Used as a base class for all domain models that are workspace-scoped.

Source code in zenml/models/v2/base/scoped.py
class WorkspaceScopedRequest(UserScopedRequest):
    """Base workspace-scoped request domain model.

    Used as a base class for all domain models that are workspace-scoped.
    """

    workspace: UUID = Field(
        title="The workspace to which this resource belongs."
    )

    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Fetches the analytics metadata for workspace scoped models.

        Returns:
            The analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        metadata["workspace_id"] = self.workspace
        return metadata
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_analytics_metadata(self)

Fetches the analytics metadata for workspace scoped models.

Returns:

Type Description
Dict[str, Any]

The analytics metadata.

Source code in zenml/models/v2/base/scoped.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Fetches the analytics metadata for workspace scoped models.

    Returns:
        The analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    metadata["workspace_id"] = self.workspace
    return metadata
WorkspaceScopedResponse (UserScopedResponse[WorkspaceBody, WorkspaceMetadata], Generic) pydantic-model

Base workspace-scoped domain model.

Used as a base class for all domain models that are workspace-scoped.

Source code in zenml/models/v2/base/scoped.py
class WorkspaceScopedResponse(
    UserScopedResponse[WorkspaceBody, WorkspaceMetadata],
    Generic[WorkspaceBody, WorkspaceMetadata],
):
    """Base workspace-scoped domain model.

    Used as a base class for all domain models that are workspace-scoped.
    """

    # Body and metadata properties
    @property
    def workspace(self) -> "WorkspaceResponse":
        """The workspace property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().workspace
workspace: WorkspaceResponse property readonly

The workspace property.

Returns:

Type Description
WorkspaceResponse

the value of the property.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

WorkspaceScopedResponseBody (UserScopedResponseBody) pydantic-model

Base workspace-scoped body.

Source code in zenml/models/v2/base/scoped.py
class WorkspaceScopedResponseBody(UserScopedResponseBody):
    """Base workspace-scoped body."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

WorkspaceScopedResponseMetadata (UserScopedResponseMetadata) pydantic-model

Base workspace-scoped metadata.

Source code in zenml/models/v2/base/scoped.py
class WorkspaceScopedResponseMetadata(UserScopedResponseMetadata):
    """Base workspace-scoped metadata."""

    workspace: "WorkspaceResponse" = Field(
        title="The workspace of this resource."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

WorkspaceScopedTaggableFilter (WorkspaceScopedFilter) pydantic-model

Model to enable advanced scoping with workspace and tagging.

Source code in zenml/models/v2/base/scoped.py
class WorkspaceScopedTaggableFilter(WorkspaceScopedFilter):
    """Model to enable advanced scoping with workspace and tagging."""

    tag: Optional[str] = Field(
        description="Tag to apply to the filter query.", default=None
    )

    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *WorkspaceScopedFilter.FILTER_EXCLUDE_FIELDS,
        "tag",
    ]

    def apply_filter(
        self,
        query: AnyQuery,
        table: Type["AnySchema"],
    ) -> AnyQuery:
        """Applies the filter to a query.

        Args:
            query: The query to which to apply the filter.
            table: The query table.

        Returns:
            The query with filter applied.
        """
        from zenml.zen_stores.schemas import TagResourceSchema

        query = super().apply_filter(query=query, table=table)
        if self.tag:
            query = (
                query.join(getattr(table, "tags"))
                .join(TagResourceSchema.tag)
                .distinct()
            )

        return query

    def get_custom_filters(
        self,
    ) -> List[Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]]:
        """Get custom tag filters.

        Returns:
            A list of custom filters.
        """
        from zenml.zen_stores.schemas import TagSchema

        custom_filters = super().get_custom_filters()
        if self.tag:
            custom_filters.append(col(TagSchema.name) == self.tag)  # type: ignore[arg-type]

        return custom_filters
tag: str pydantic-field

Tag to apply to the filter query.

apply_filter(self, query, table)

Applies the filter to a query.

Parameters:

Name Type Description Default
query ~AnyQuery

The query to which to apply the filter.

required
table Type[AnySchema]

The query table.

required

Returns:

Type Description
~AnyQuery

The query with filter applied.

Source code in zenml/models/v2/base/scoped.py
def apply_filter(
    self,
    query: AnyQuery,
    table: Type["AnySchema"],
) -> AnyQuery:
    """Applies the filter to a query.

    Args:
        query: The query to which to apply the filter.
        table: The query table.

    Returns:
        The query with filter applied.
    """
    from zenml.zen_stores.schemas import TagResourceSchema

    query = super().apply_filter(query=query, table=table)
    if self.tag:
        query = (
            query.join(getattr(table, "tags"))
            .join(TagResourceSchema.tag)
            .distinct()
        )

    return query
get_custom_filters(self)

Get custom tag filters.

Returns:

Type Description
List[Union[BinaryExpression[Any], BooleanClauseList[Any]]]

A list of custom filters.

Source code in zenml/models/v2/base/scoped.py
def get_custom_filters(
    self,
) -> List[Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]]:
    """Get custom tag filters.

    Returns:
        A list of custom filters.
    """
    from zenml.zen_stores.schemas import TagSchema

    custom_filters = super().get_custom_filters()
    if self.tag:
        custom_filters.append(col(TagSchema.name) == self.tag)  # type: ignore[arg-type]

    return custom_filters
update

Utility methods for base models.

update_model(_cls)

Base update model.

This is used as a decorator on top of request models to convert them into update models where the fields are optional and can be set to None.

Parameters:

Name Type Description Default
_cls Type[T]

The class to decorate

required

Returns:

Type Description
Type[T]

The decorated class.

Source code in zenml/models/v2/base/update.py
def update_model(_cls: Type["T"]) -> Type["T"]:
    """Base update model.

    This is used as a decorator on top of request models to convert them
    into update models where the fields are optional and can be set to None.

    Args:
        _cls: The class to decorate

    Returns:
        The decorated class.
    """
    for _, value in _cls.__fields__.items():
        value.required = False
        value.allow_none = True

    _cls.__config__.extra = Extra.ignore

    return _cls

core special

api_key

Models representing API keys.

APIKey (BaseModel) pydantic-model

Encoded model for API keys.

Source code in zenml/models/v2/core/api_key.py
class APIKey(BaseModel):
    """Encoded model for API keys."""

    id: UUID
    key: str

    @classmethod
    def decode_api_key(cls, encoded_key: str) -> "APIKey":
        """Decodes an API key from a base64 string.

        Args:
            encoded_key: The encoded API key.

        Returns:
            The decoded API key.

        Raises:
            ValueError: If the key is not valid.
        """
        if encoded_key.startswith(ZENML_API_KEY_PREFIX):
            encoded_key = encoded_key[len(ZENML_API_KEY_PREFIX) :]
        try:
            json_key = b64_decode(encoded_key)
            return cls.parse_raw(json_key)
        except Exception:
            raise ValueError("Invalid API key.")

    def encode(self) -> str:
        """Encodes the API key in a base64 string that includes the key ID and prefix.

        Returns:
            The encoded API key.
        """
        encoded_key = b64_encode(self.json())
        return f"{ZENML_API_KEY_PREFIX}{encoded_key}"
decode_api_key(encoded_key) classmethod

Decodes an API key from a base64 string.

Parameters:

Name Type Description Default
encoded_key str

The encoded API key.

required

Returns:

Type Description
APIKey

The decoded API key.

Exceptions:

Type Description
ValueError

If the key is not valid.

Source code in zenml/models/v2/core/api_key.py
@classmethod
def decode_api_key(cls, encoded_key: str) -> "APIKey":
    """Decodes an API key from a base64 string.

    Args:
        encoded_key: The encoded API key.

    Returns:
        The decoded API key.

    Raises:
        ValueError: If the key is not valid.
    """
    if encoded_key.startswith(ZENML_API_KEY_PREFIX):
        encoded_key = encoded_key[len(ZENML_API_KEY_PREFIX) :]
    try:
        json_key = b64_decode(encoded_key)
        return cls.parse_raw(json_key)
    except Exception:
        raise ValueError("Invalid API key.")
encode(self)

Encodes the API key in a base64 string that includes the key ID and prefix.

Returns:

Type Description
str

The encoded API key.

Source code in zenml/models/v2/core/api_key.py
def encode(self) -> str:
    """Encodes the API key in a base64 string that includes the key ID and prefix.

    Returns:
        The encoded API key.
    """
    encoded_key = b64_encode(self.json())
    return f"{ZENML_API_KEY_PREFIX}{encoded_key}"
APIKeyFilter (BaseFilter) pydantic-model

Filter model for API keys.

Source code in zenml/models/v2/core/api_key.py
class APIKeyFilter(BaseFilter):
    """Filter model for API keys."""

    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *BaseFilter.FILTER_EXCLUDE_FIELDS,
        "service_account",
    ]
    CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *BaseFilter.CLI_EXCLUDE_FIELDS,
        "service_account",
    ]

    service_account: Optional[UUID] = Field(
        default=None,
        description="The service account to scope this query to.",
    )
    name: Optional[str] = Field(
        default=None,
        description="Name of the API key",
    )
    description: Optional[str] = Field(
        default=None,
        title="Filter by the API key description.",
    )
    active: Optional[Union[bool, str]] = Field(
        default=None,
        title="Whether the API key is active.",
    )
    last_login: Optional[Union[datetime, str]] = Field(
        default=None, title="Time when the API key was last used to log in."
    )
    last_rotated: Optional[Union[datetime, str]] = Field(
        default=None, title="Time when the API key was last rotated."
    )

    def set_service_account(self, service_account_id: UUID) -> None:
        """Set the service account by which to scope this query.

        Args:
            service_account_id: The service account ID.
        """
        self.service_account = service_account_id

    def apply_filter(
        self,
        query: AnyQuery,
        table: Type["AnySchema"],
    ) -> AnyQuery:
        """Override to apply the service account scope as an additional filter.

        Args:
            query: The query to which to apply the filter.
            table: The query table.

        Returns:
            The query with filter applied.
        """
        query = super().apply_filter(query=query, table=table)

        if self.service_account:
            scope_filter = (
                getattr(table, "service_account_id") == self.service_account
            )
            query = query.where(scope_filter)

        return query
name: str pydantic-field

Name of the API key

service_account: UUID pydantic-field

The service account to scope this query to.

apply_filter(self, query, table)

Override to apply the service account scope as an additional filter.

Parameters:

Name Type Description Default
query ~AnyQuery

The query to which to apply the filter.

required
table Type[AnySchema]

The query table.

required

Returns:

Type Description
~AnyQuery

The query with filter applied.

Source code in zenml/models/v2/core/api_key.py
def apply_filter(
    self,
    query: AnyQuery,
    table: Type["AnySchema"],
) -> AnyQuery:
    """Override to apply the service account scope as an additional filter.

    Args:
        query: The query to which to apply the filter.
        table: The query table.

    Returns:
        The query with filter applied.
    """
    query = super().apply_filter(query=query, table=table)

    if self.service_account:
        scope_filter = (
            getattr(table, "service_account_id") == self.service_account
        )
        query = query.where(scope_filter)

    return query
set_service_account(self, service_account_id)

Set the service account by which to scope this query.

Parameters:

Name Type Description Default
service_account_id UUID

The service account ID.

required
Source code in zenml/models/v2/core/api_key.py
def set_service_account(self, service_account_id: UUID) -> None:
    """Set the service account by which to scope this query.

    Args:
        service_account_id: The service account ID.
    """
    self.service_account = service_account_id
APIKeyInternalResponse (APIKeyResponse) pydantic-model

Response model for API keys used internally.

Source code in zenml/models/v2/core/api_key.py
class APIKeyInternalResponse(APIKeyResponse):
    """Response model for API keys used internally."""

    previous_key: Optional[str] = Field(
        default=None,
        title="The previous API key. Only set if the key was rotated.",
    )

    def verify_key(
        self,
        key: str,
    ) -> bool:
        """Verifies a given key against the stored (hashed) key(s).

        Args:
            key: Input key to be verified.

        Returns:
            True if the keys match.
        """
        # even when the hashed key is not set, we still want to execute
        # the hash verification to protect against response discrepancy
        # attacks (https://cwe.mitre.org/data/definitions/204.html)
        key_hash: Optional[str] = None
        context = CryptContext(schemes=["bcrypt"], deprecated="auto")
        if self.key is not None and self.active:
            key_hash = self.key
        result = context.verify(key, key_hash)

        # same for the previous key, if set and if it's still valid
        key_hash = None
        if (
            self.previous_key is not None
            and self.last_rotated is not None
            and self.active
            and self.retain_period_minutes > 0
        ):
            # check if the previous key is still valid
            if datetime.utcnow() - self.last_rotated < timedelta(
                minutes=self.retain_period_minutes
            ):
                key_hash = self.previous_key
        previous_result = context.verify(key, key_hash)

        return result or previous_result
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

verify_key(self, key)

Verifies a given key against the stored (hashed) key(s).

Parameters:

Name Type Description Default
key str

Input key to be verified.

required

Returns:

Type Description
bool

True if the keys match.

Source code in zenml/models/v2/core/api_key.py
def verify_key(
    self,
    key: str,
) -> bool:
    """Verifies a given key against the stored (hashed) key(s).

    Args:
        key: Input key to be verified.

    Returns:
        True if the keys match.
    """
    # even when the hashed key is not set, we still want to execute
    # the hash verification to protect against response discrepancy
    # attacks (https://cwe.mitre.org/data/definitions/204.html)
    key_hash: Optional[str] = None
    context = CryptContext(schemes=["bcrypt"], deprecated="auto")
    if self.key is not None and self.active:
        key_hash = self.key
    result = context.verify(key, key_hash)

    # same for the previous key, if set and if it's still valid
    key_hash = None
    if (
        self.previous_key is not None
        and self.last_rotated is not None
        and self.active
        and self.retain_period_minutes > 0
    ):
        # check if the previous key is still valid
        if datetime.utcnow() - self.last_rotated < timedelta(
            minutes=self.retain_period_minutes
        ):
            key_hash = self.previous_key
    previous_result = context.verify(key, key_hash)

    return result or previous_result
APIKeyInternalUpdate (APIKeyUpdate) pydantic-model

Update model for API keys used internally.

Source code in zenml/models/v2/core/api_key.py
class APIKeyInternalUpdate(APIKeyUpdate):
    """Update model for API keys used internally."""

    update_last_login: bool = Field(
        default=False,
        title="Whether to update the last login timestamp.",
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

APIKeyRequest (BaseRequest) pydantic-model

Request model for API keys.

Source code in zenml/models/v2/core/api_key.py
class APIKeyRequest(BaseRequest):
    """Request model for API keys."""

    name: str = Field(
        title="The name of the API Key.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

    description: Optional[str] = Field(
        default=None,
        title="The description of the API Key.",
        max_length=TEXT_FIELD_MAX_LENGTH,
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

APIKeyResponse (BaseResponse[APIKeyResponseBody, APIKeyResponseMetadata]) pydantic-model

Response model for API keys.

Source code in zenml/models/v2/core/api_key.py
class APIKeyResponse(BaseResponse[APIKeyResponseBody, APIKeyResponseMetadata]):
    """Response model for API keys."""

    name: str = Field(
        title="The name of the API Key.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

    _warn_on_response_updates = False

    def get_hydrated_version(self) -> "APIKeyResponse":
        """Get the hydrated version of this API key.

        Returns:
            an instance of the same entity with the metadata field attached.
        """
        from zenml.client import Client

        return Client().zen_store.get_api_key(
            service_account_id=self.service_account.id,
            api_key_name_or_id=self.id,
        )

    # Helper functions
    def set_key(self, key: str) -> None:
        """Sets the API key and encodes it.

        Args:
            key: The API key value to be set.
        """
        self.get_body().key = APIKey(id=self.id, key=key).encode()

    # Body and metadata properties
    @property
    def key(self) -> Optional[str]:
        """The `key` property.

        Returns:
            the value of the property.
        """
        return self.get_body().key

    @property
    def active(self) -> bool:
        """The `active` property.

        Returns:
            the value of the property.
        """
        return self.get_body().active

    @property
    def service_account(self) -> "ServiceAccountResponse":
        """The `service_account` property.

        Returns:
            the value of the property.
        """
        return self.get_body().service_account

    @property
    def description(self) -> str:
        """The `description` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().description

    @property
    def retain_period_minutes(self) -> int:
        """The `retain_period_minutes` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().retain_period_minutes

    @property
    def last_login(self) -> Optional[datetime]:
        """The `last_login` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().last_login

    @property
    def last_rotated(self) -> Optional[datetime]:
        """The `last_rotated` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().last_rotated
active: bool property readonly

The active property.

Returns:

Type Description
bool

the value of the property.

description: str property readonly

The description property.

Returns:

Type Description
str

the value of the property.

key: Optional[str] property readonly

The key property.

Returns:

Type Description
Optional[str]

the value of the property.

last_login: Optional[datetime.datetime] property readonly

The last_login property.

Returns:

Type Description
Optional[datetime.datetime]

the value of the property.

last_rotated: Optional[datetime.datetime] property readonly

The last_rotated property.

Returns:

Type Description
Optional[datetime.datetime]

the value of the property.

retain_period_minutes: int property readonly

The retain_period_minutes property.

Returns:

Type Description
int

the value of the property.

service_account: ServiceAccountResponse property readonly

The service_account property.

Returns:

Type Description
ServiceAccountResponse

the value of the property.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_hydrated_version(self)

Get the hydrated version of this API key.

Returns:

Type Description
APIKeyResponse

an instance of the same entity with the metadata field attached.

Source code in zenml/models/v2/core/api_key.py
def get_hydrated_version(self) -> "APIKeyResponse":
    """Get the hydrated version of this API key.

    Returns:
        an instance of the same entity with the metadata field attached.
    """
    from zenml.client import Client

    return Client().zen_store.get_api_key(
        service_account_id=self.service_account.id,
        api_key_name_or_id=self.id,
    )
set_key(self, key)

Sets the API key and encodes it.

Parameters:

Name Type Description Default
key str

The API key value to be set.

required
Source code in zenml/models/v2/core/api_key.py
def set_key(self, key: str) -> None:
    """Sets the API key and encodes it.

    Args:
        key: The API key value to be set.
    """
    self.get_body().key = APIKey(id=self.id, key=key).encode()
APIKeyResponseBody (BaseResponseBody) pydantic-model

Response body for API keys.

Source code in zenml/models/v2/core/api_key.py
class APIKeyResponseBody(BaseResponseBody):
    """Response body for API keys."""

    key: Optional[str] = Field(
        default=None,
        title="The API key. Only set immediately after creation or rotation.",
    )
    active: bool = Field(
        default=True,
        title="Whether the API key is active.",
    )
    service_account: "ServiceAccountResponse" = Field(
        title="The service account associated with this API key."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

APIKeyResponseMetadata (BaseResponseMetadata) pydantic-model

Response metadata for API keys.

Source code in zenml/models/v2/core/api_key.py
class APIKeyResponseMetadata(BaseResponseMetadata):
    """Response metadata for API keys."""

    description: str = Field(
        default="",
        title="The description of the API Key.",
        max_length=TEXT_FIELD_MAX_LENGTH,
    )
    retain_period_minutes: int = Field(
        title="Number of minutes for which the previous key is still valid "
        "after it has been rotated.",
    )
    last_login: Optional[datetime] = Field(
        default=None, title="Time when the API key was last used to log in."
    )
    last_rotated: Optional[datetime] = Field(
        default=None, title="Time when the API key was last rotated."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

APIKeyRotateRequest (BaseModel) pydantic-model

Request model for API key rotation.

Source code in zenml/models/v2/core/api_key.py
class APIKeyRotateRequest(BaseModel):
    """Request model for API key rotation."""

    retain_period_minutes: int = Field(
        default=0,
        title="Number of minutes for which the previous key is still valid "
        "after it has been rotated.",
    )
APIKeyUpdate (APIKeyRequest) pydantic-model

Update model for API keys.

Source code in zenml/models/v2/core/api_key.py
@update_model
class APIKeyUpdate(APIKeyRequest):
    """Update model for API keys."""

    active: Optional[bool] = Field(
        default=True,
        title="Whether the API key is active.",
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

BaseResponse[APIKeyResponseBody, APIKeyResponseMetadata] (BaseResponse) pydantic-model
Config

Pydantic configuration class.

Source code in zenml/models/v2/core/api_key.py
class Config:
    """Pydantic configuration class."""

    # This is needed to allow the REST client and server to unpack SecretStr
    # values correctly.
    json_encoders = {
        SecretStr: lambda v: v.get_secret_value()
        if v is not None
        else None
    }

    # Allow extras on all models to support forwards and backwards
    # compatibility (e.g. new fields in newer versions of ZenML servers
    # are allowed to be present in older versions of ZenML clients and
    # vice versa).
    extra = "allow"
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

artifact

Models representing artifacts.

ArtifactFilter (WorkspaceScopedTaggableFilter) pydantic-model

Model to enable advanced filtering of artifacts.

Source code in zenml/models/v2/core/artifact.py
class ArtifactFilter(WorkspaceScopedTaggableFilter):
    """Model to enable advanced filtering of artifacts."""

    name: Optional[str] = None
    has_custom_name: Optional[bool] = None
ArtifactRequest (BaseRequest) pydantic-model

Artifact request model.

Source code in zenml/models/v2/core/artifact.py
class ArtifactRequest(BaseRequest):
    """Artifact request model."""

    name: str = Field(
        title="Name of the artifact.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    has_custom_name: bool = Field(
        title="Whether the name is custom (True) or auto-generated (False).",
        default=False,
    )
    tags: Optional[List[str]] = Field(
        title="Artifact tags.",
        description="Should be a list of plain strings, e.g., ['tag1', 'tag2']",
        default=None,
    )
tags: List[str] pydantic-field

Should be a list of plain strings, e.g., ['tag1', 'tag2']

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

ArtifactResponse (BaseResponse[ArtifactResponseBody, ArtifactResponseMetadata]) pydantic-model

Artifact response model.

Source code in zenml/models/v2/core/artifact.py
class ArtifactResponse(
    BaseResponse[ArtifactResponseBody, ArtifactResponseMetadata]
):
    """Artifact response model."""

    def get_hydrated_version(self) -> "ArtifactResponse":
        """Get the hydrated version of this artifact.

        Returns:
            an instance of the same entity with the metadata field attached.
        """
        from zenml.client import Client

        return Client().zen_store.get_artifact(self.id)

    name: str = Field(
        title="Name of the output in the parent step.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

    # Body and metadata properties
    @property
    def tags(self) -> List[TagResponse]:
        """The `tags` property.

        Returns:
            the value of the property.
        """
        return self.get_body().tags

    @property
    def latest_version_name(self) -> Optional[str]:
        """The `latest_version_name` property.

        Returns:
            the value of the property.
        """
        return self.get_body().latest_version_name

    @property
    def latest_version_id(self) -> Optional[UUID]:
        """The `latest_version_id` property.

        Returns:
            the value of the property.
        """
        return self.get_body().latest_version_id

    @property
    def has_custom_name(self) -> bool:
        """The `has_custom_name` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().has_custom_name

    # Helper methods
    @property
    def versions(self) -> Dict[str, "ArtifactVersionResponse"]:
        """Get a list of all versions of this artifact.

        Returns:
            A list of all versions of this artifact.
        """
        from zenml.client import Client

        responses = Client().list_artifact_versions(name=self.name)
        return {str(response.version): response for response in responses}
has_custom_name: bool property readonly

The has_custom_name property.

Returns:

Type Description
bool

the value of the property.

latest_version_id: Optional[uuid.UUID] property readonly

The latest_version_id property.

Returns:

Type Description
Optional[uuid.UUID]

the value of the property.

latest_version_name: Optional[str] property readonly

The latest_version_name property.

Returns:

Type Description
Optional[str]

the value of the property.

tags: List[zenml.models.v2.core.tag.TagResponse] property readonly

The tags property.

Returns:

Type Description
List[zenml.models.v2.core.tag.TagResponse]

the value of the property.

versions: Dict[str, ArtifactVersionResponse] property readonly

Get a list of all versions of this artifact.

Returns:

Type Description
Dict[str, ArtifactVersionResponse]

A list of all versions of this artifact.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_hydrated_version(self)

Get the hydrated version of this artifact.

Returns:

Type Description
ArtifactResponse

an instance of the same entity with the metadata field attached.

Source code in zenml/models/v2/core/artifact.py
def get_hydrated_version(self) -> "ArtifactResponse":
    """Get the hydrated version of this artifact.

    Returns:
        an instance of the same entity with the metadata field attached.
    """
    from zenml.client import Client

    return Client().zen_store.get_artifact(self.id)
ArtifactResponseBody (BaseResponseBody) pydantic-model

Response body for artifacts.

Source code in zenml/models/v2/core/artifact.py
class ArtifactResponseBody(BaseResponseBody):
    """Response body for artifacts."""

    tags: List[TagResponse] = Field(
        title="Tags associated with the model",
    )
    latest_version_name: Optional[str]
    latest_version_id: Optional[UUID]
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

ArtifactResponseMetadata (BaseResponseMetadata) pydantic-model

Response metadata for artifacts.

Source code in zenml/models/v2/core/artifact.py
class ArtifactResponseMetadata(BaseResponseMetadata):
    """Response metadata for artifacts."""

    has_custom_name: bool = Field(
        title="Whether the name is custom (True) or auto-generated (False).",
        default=False,
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

ArtifactUpdate (BaseModel) pydantic-model

Artifact update model.

Source code in zenml/models/v2/core/artifact.py
class ArtifactUpdate(BaseModel):
    """Artifact update model."""

    name: Optional[str] = None
    add_tags: Optional[List[str]] = None
    remove_tags: Optional[List[str]] = None
    has_custom_name: Optional[bool] = None
BaseResponse[ArtifactResponseBody, ArtifactResponseMetadata] (BaseResponse) pydantic-model
Config

Pydantic configuration class.

Source code in zenml/models/v2/core/artifact.py
class Config:
    """Pydantic configuration class."""

    # This is needed to allow the REST client and server to unpack SecretStr
    # values correctly.
    json_encoders = {
        SecretStr: lambda v: v.get_secret_value()
        if v is not None
        else None
    }

    # Allow extras on all models to support forwards and backwards
    # compatibility (e.g. new fields in newer versions of ZenML servers
    # are allowed to be present in older versions of ZenML clients and
    # vice versa).
    extra = "allow"
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

artifact_version

Models representing artifact versions.

ArtifactVersionFilter (WorkspaceScopedTaggableFilter) pydantic-model

Model to enable advanced filtering of artifact versions.

Source code in zenml/models/v2/core/artifact_version.py
class ArtifactVersionFilter(WorkspaceScopedTaggableFilter):
    """Model to enable advanced filtering of artifact versions."""

    # `name` and `only_unused` refer to properties related to other entities
    #  rather than a field in the db, hence they needs to be handled
    #  explicitly
    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *WorkspaceScopedTaggableFilter.FILTER_EXCLUDE_FIELDS,
        "name",
        "only_unused",
        "has_custom_name",
    ]
    artifact_id: Optional[Union[UUID, str]] = Field(
        default=None,
        description="ID of the artifact to which this version belongs.",
    )
    name: Optional[str] = Field(
        default=None,
        description="Name of the artifact to which this version belongs.",
    )
    version: Optional[str] = Field(
        default=None,
        description="Version of the artifact",
    )
    version_number: Optional[Union[int, str]] = Field(
        default=None,
        description="Version of the artifact if it is an integer",
    )
    uri: Optional[str] = Field(
        default=None,
        description="Uri of the artifact",
    )
    materializer: Optional[str] = Field(
        default=None,
        description="Materializer used to produce the artifact",
    )
    type: Optional[str] = Field(
        default=None,
        description="Type of the artifact",
    )
    data_type: Optional[str] = Field(
        default=None,
        description="Datatype of the artifact",
    )
    artifact_store_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Artifact store for this artifact"
    )
    workspace_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Workspace for this artifact"
    )
    user_id: Optional[Union[UUID, str]] = Field(
        default=None, description="User that produced this artifact"
    )
    only_unused: Optional[bool] = Field(
        default=False, description="Filter only for unused artifacts"
    )
    has_custom_name: Optional[bool] = Field(
        default=None,
        description="Filter only artifacts with/without custom names.",
    )

    def get_custom_filters(
        self,
    ) -> List[Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]]:
        """Get custom filters.

        Returns:
            A list of custom filters.
        """
        custom_filters = super().get_custom_filters()

        from sqlalchemy import and_
        from sqlmodel import select

        from zenml.zen_stores.schemas.artifact_schemas import (
            ArtifactSchema,
            ArtifactVersionSchema,
        )
        from zenml.zen_stores.schemas.step_run_schemas import (
            StepRunInputArtifactSchema,
            StepRunOutputArtifactSchema,
        )

        if self.name:
            value, filter_operator = self._resolve_operator(self.name)
            filter_ = StrFilter(
                operation=GenericFilterOps(filter_operator),
                column="name",
                value=value,
            )
            artifact_name_filter = and_(  # type: ignore[type-var]
                ArtifactVersionSchema.artifact_id == ArtifactSchema.id,
                filter_.generate_query_conditions(ArtifactSchema),
            )
            custom_filters.append(artifact_name_filter)

        if self.only_unused:
            unused_filter = and_(
                ArtifactVersionSchema.id.notin_(  # type: ignore[attr-defined]
                    select(StepRunOutputArtifactSchema.artifact_id)
                ),
                ArtifactVersionSchema.id.notin_(  # type: ignore[attr-defined]
                    select(StepRunInputArtifactSchema.artifact_id)
                ),
            )
            custom_filters.append(unused_filter)

        if self.has_custom_name is not None:
            custom_name_filter = and_(  # type: ignore[type-var]
                ArtifactVersionSchema.artifact_id == ArtifactSchema.id,
                ArtifactSchema.has_custom_name == self.has_custom_name,
            )
            custom_filters.append(custom_name_filter)

        return custom_filters
artifact_id: Union[uuid.UUID, str] pydantic-field

ID of the artifact to which this version belongs.

artifact_store_id: Union[uuid.UUID, str] pydantic-field

Artifact store for this artifact

data_type: str pydantic-field

Datatype of the artifact

has_custom_name: bool pydantic-field

Filter only artifacts with/without custom names.

materializer: str pydantic-field

Materializer used to produce the artifact

name: str pydantic-field

Name of the artifact to which this version belongs.

only_unused: bool pydantic-field

Filter only for unused artifacts

type: str pydantic-field

Type of the artifact

uri: str pydantic-field

Uri of the artifact

user_id: Union[uuid.UUID, str] pydantic-field

User that produced this artifact

version: str pydantic-field

Version of the artifact

version_number: Union[int, str] pydantic-field

Version of the artifact if it is an integer

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace for this artifact

get_custom_filters(self)

Get custom filters.

Returns:

Type Description
List[Union[BinaryExpression[Any], BooleanClauseList[Any]]]

A list of custom filters.

Source code in zenml/models/v2/core/artifact_version.py
def get_custom_filters(
    self,
) -> List[Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]]:
    """Get custom filters.

    Returns:
        A list of custom filters.
    """
    custom_filters = super().get_custom_filters()

    from sqlalchemy import and_
    from sqlmodel import select

    from zenml.zen_stores.schemas.artifact_schemas import (
        ArtifactSchema,
        ArtifactVersionSchema,
    )
    from zenml.zen_stores.schemas.step_run_schemas import (
        StepRunInputArtifactSchema,
        StepRunOutputArtifactSchema,
    )

    if self.name:
        value, filter_operator = self._resolve_operator(self.name)
        filter_ = StrFilter(
            operation=GenericFilterOps(filter_operator),
            column="name",
            value=value,
        )
        artifact_name_filter = and_(  # type: ignore[type-var]
            ArtifactVersionSchema.artifact_id == ArtifactSchema.id,
            filter_.generate_query_conditions(ArtifactSchema),
        )
        custom_filters.append(artifact_name_filter)

    if self.only_unused:
        unused_filter = and_(
            ArtifactVersionSchema.id.notin_(  # type: ignore[attr-defined]
                select(StepRunOutputArtifactSchema.artifact_id)
            ),
            ArtifactVersionSchema.id.notin_(  # type: ignore[attr-defined]
                select(StepRunInputArtifactSchema.artifact_id)
            ),
        )
        custom_filters.append(unused_filter)

    if self.has_custom_name is not None:
        custom_name_filter = and_(  # type: ignore[type-var]
            ArtifactVersionSchema.artifact_id == ArtifactSchema.id,
            ArtifactSchema.has_custom_name == self.has_custom_name,
        )
        custom_filters.append(custom_name_filter)

    return custom_filters
ArtifactVersionRequest (WorkspaceScopedRequest) pydantic-model

Request model for artifact versions.

Source code in zenml/models/v2/core/artifact_version.py
class ArtifactVersionRequest(WorkspaceScopedRequest):
    """Request model for artifact versions."""

    artifact_id: UUID = Field(
        title="ID of the artifact to which this version belongs.",
    )
    version: Union[str, int] = Field(
        title="Version of the artifact.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    has_custom_name: bool = Field(
        title="Whether the name is custom (True) or auto-generated (False).",
        default=False,
    )
    type: ArtifactType = Field(title="Type of the artifact.")
    artifact_store_id: Optional[UUID] = Field(
        title="ID of the artifact store in which this artifact is stored.",
        default=None,
    )
    uri: str = Field(
        title="URI of the artifact.", max_length=TEXT_FIELD_MAX_LENGTH
    )
    materializer: Source = Field(
        title="Materializer class to use for this artifact.",
    )
    data_type: Source = Field(
        title="Data type of the artifact.",
    )
    tags: Optional[List[str]] = Field(
        title="Tags of the artifact.",
        description="Should be a list of plain strings, e.g., ['tag1', 'tag2']",
        default=None,
    )
    visualizations: Optional[List["ArtifactVisualizationRequest"]] = Field(
        default=None, title="Visualizations of the artifact."
    )

    _convert_source = convert_source_validator("materializer", "data_type")
tags: List[str] pydantic-field

Should be a list of plain strings, e.g., ['tag1', 'tag2']

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

ArtifactVersionResponse (WorkspaceScopedResponse[ArtifactVersionResponseBody, ArtifactVersionResponseMetadata]) pydantic-model

Response model for artifact versions.

Source code in zenml/models/v2/core/artifact_version.py
class ArtifactVersionResponse(
    WorkspaceScopedResponse[
        ArtifactVersionResponseBody, ArtifactVersionResponseMetadata
    ]
):
    """Response model for artifact versions."""

    def get_hydrated_version(self) -> "ArtifactVersionResponse":
        """Get the hydrated version of this artifact version.

        Returns:
            an instance of the same entity with the metadata field attached.
        """
        from zenml.client import Client

        return Client().zen_store.get_artifact_version(self.id)

    # Body and metadata properties
    @property
    def artifact(self) -> "ArtifactResponse":
        """The `artifact` property.

        Returns:
            the value of the property.
        """
        return self.get_body().artifact

    @property
    def version(self) -> Union[str, int]:
        """The `version` property.

        Returns:
            the value of the property.
        """
        return self.get_body().version

    @property
    def uri(self) -> str:
        """The `uri` property.

        Returns:
            the value of the property.
        """
        return self.get_body().uri

    @property
    def type(self) -> ArtifactType:
        """The `type` property.

        Returns:
            the value of the property.
        """
        return self.get_body().type

    @property
    def tags(self) -> List[TagResponse]:
        """The `tags` property.

        Returns:
            the value of the property.
        """
        return self.get_body().tags

    @property
    def producer_pipeline_run_id(self) -> Optional[UUID]:
        """The `producer_pipeline_run_id` property.

        Returns:
            the value of the property.
        """
        return self.get_body().producer_pipeline_run_id

    @property
    def artifact_store_id(self) -> Optional[UUID]:
        """The `artifact_store_id` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().artifact_store_id

    @property
    def producer_step_run_id(self) -> Optional[UUID]:
        """The `producer_step_run_id` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().producer_step_run_id

    @property
    def visualizations(
        self,
    ) -> Optional[List["ArtifactVisualizationResponse"]]:
        """The `visualizations` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().visualizations

    @property
    def run_metadata(self) -> Dict[str, "RunMetadataResponse"]:
        """The `metadata` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().run_metadata

    @property
    def materializer(self) -> Source:
        """The `materializer` property.

        Returns:
            the value of the property.
        """
        return self.get_body().materializer

    @property
    def data_type(self) -> Source:
        """The `data_type` property.

        Returns:
            the value of the property.
        """
        return self.get_body().data_type

    # Helper methods
    @property
    def name(self) -> str:
        """The `name` property.

        Returns:
            the value of the property.
        """
        return self.artifact.name

    @property
    def step(self) -> "StepRunResponse":
        """Get the step that produced this artifact.

        Returns:
            The step that produced this artifact.
        """
        from zenml.artifacts.utils import get_producer_step_of_artifact

        return get_producer_step_of_artifact(self)

    @property
    def run(self) -> "PipelineRunResponse":
        """Get the pipeline run that produced this artifact.

        Returns:
            The pipeline run that produced this artifact.
        """
        from zenml.client import Client

        return Client().get_pipeline_run(self.step.pipeline_run_id)

    def load(self) -> Any:
        """Materializes (loads) the data stored in this artifact.

        Returns:
            The materialized data.
        """
        from zenml.artifacts.utils import load_artifact_from_response

        return load_artifact_from_response(self)

    def download_files(self, path: str, overwrite: bool = False) -> None:
        """Downloads data for an artifact with no materializing.

        Any artifacts will be saved as a zip file to the given path.

        Args:
            path: The path to save the binary data to.
            overwrite: Whether to overwrite the file if it already exists.

        Raises:
            ValueError: If the path does not end with '.zip'.
        """
        if not path.endswith(".zip"):
            raise ValueError(
                "The path should end with '.zip' to save the binary data."
            )
        from zenml.artifacts.utils import (
            download_artifact_files_from_response,
        )

        download_artifact_files_from_response(
            self,
            path=path,
            overwrite=overwrite,
        )

    def read(self) -> Any:
        """(Deprecated) Materializes (loads) the data stored in this artifact.

        Returns:
            The materialized data.
        """
        logger.warning(
            "`artifact.read()` is deprecated and will be removed in a future "
            "release. Please use `artifact.load()` instead."
        )
        return self.load()

    def visualize(self, title: Optional[str] = None) -> None:
        """Visualize the artifact in notebook environments.

        Args:
            title: Optional title to show before the visualizations.
        """
        from zenml.utils.visualization_utils import visualize_artifact

        visualize_artifact(self, title=title)
artifact: ArtifactResponse property readonly

The artifact property.

Returns:

Type Description
ArtifactResponse

the value of the property.

artifact_store_id: Optional[uuid.UUID] property readonly

The artifact_store_id property.

Returns:

Type Description
Optional[uuid.UUID]

the value of the property.

data_type: Source property readonly

The data_type property.

Returns:

Type Description
Source

the value of the property.

materializer: Source property readonly

The materializer property.

Returns:

Type Description
Source

the value of the property.

name: str property readonly

The name property.

Returns:

Type Description
str

the value of the property.

producer_pipeline_run_id: Optional[uuid.UUID] property readonly

The producer_pipeline_run_id property.

Returns:

Type Description
Optional[uuid.UUID]

the value of the property.

producer_step_run_id: Optional[uuid.UUID] property readonly

The producer_step_run_id property.

Returns:

Type Description
Optional[uuid.UUID]

the value of the property.

run: PipelineRunResponse property readonly

Get the pipeline run that produced this artifact.

Returns:

Type Description
PipelineRunResponse

The pipeline run that produced this artifact.

run_metadata: Dict[str, RunMetadataResponse] property readonly

The metadata property.

Returns:

Type Description
Dict[str, RunMetadataResponse]

the value of the property.

step: StepRunResponse property readonly

Get the step that produced this artifact.

Returns:

Type Description
StepRunResponse

The step that produced this artifact.

tags: List[zenml.models.v2.core.tag.TagResponse] property readonly

The tags property.

Returns:

Type Description
List[zenml.models.v2.core.tag.TagResponse]

the value of the property.

type: ArtifactType property readonly

The type property.

Returns:

Type Description
ArtifactType

the value of the property.

uri: str property readonly

The uri property.

Returns:

Type Description
str

the value of the property.

version: Union[str, int] property readonly

The version property.

Returns:

Type Description
Union[str, int]

the value of the property.

visualizations: Optional[List[ArtifactVisualizationResponse]] property readonly

The visualizations property.

Returns:

Type Description
Optional[List[ArtifactVisualizationResponse]]

the value of the property.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

download_files(self, path, overwrite=False)

Downloads data for an artifact with no materializing.

Any artifacts will be saved as a zip file to the given path.

Parameters:

Name Type Description Default
path str

The path to save the binary data to.

required
overwrite bool

Whether to overwrite the file if it already exists.

False

Exceptions:

Type Description
ValueError

If the path does not end with '.zip'.

Source code in zenml/models/v2/core/artifact_version.py
def download_files(self, path: str, overwrite: bool = False) -> None:
    """Downloads data for an artifact with no materializing.

    Any artifacts will be saved as a zip file to the given path.

    Args:
        path: The path to save the binary data to.
        overwrite: Whether to overwrite the file if it already exists.

    Raises:
        ValueError: If the path does not end with '.zip'.
    """
    if not path.endswith(".zip"):
        raise ValueError(
            "The path should end with '.zip' to save the binary data."
        )
    from zenml.artifacts.utils import (
        download_artifact_files_from_response,
    )

    download_artifact_files_from_response(
        self,
        path=path,
        overwrite=overwrite,
    )
get_hydrated_version(self)

Get the hydrated version of this artifact version.

Returns:

Type Description
ArtifactVersionResponse

an instance of the same entity with the metadata field attached.

Source code in zenml/models/v2/core/artifact_version.py
def get_hydrated_version(self) -> "ArtifactVersionResponse":
    """Get the hydrated version of this artifact version.

    Returns:
        an instance of the same entity with the metadata field attached.
    """
    from zenml.client import Client

    return Client().zen_store.get_artifact_version(self.id)
load(self)

Materializes (loads) the data stored in this artifact.

Returns:

Type Description
Any

The materialized data.

Source code in zenml/models/v2/core/artifact_version.py
def load(self) -> Any:
    """Materializes (loads) the data stored in this artifact.

    Returns:
        The materialized data.
    """
    from zenml.artifacts.utils import load_artifact_from_response

    return load_artifact_from_response(self)
read(self)

(Deprecated) Materializes (loads) the data stored in this artifact.

Returns:

Type Description
Any

The materialized data.

Source code in zenml/models/v2/core/artifact_version.py
def read(self) -> Any:
    """(Deprecated) Materializes (loads) the data stored in this artifact.

    Returns:
        The materialized data.
    """
    logger.warning(
        "`artifact.read()` is deprecated and will be removed in a future "
        "release. Please use `artifact.load()` instead."
    )
    return self.load()
visualize(self, title=None)

Visualize the artifact in notebook environments.

Parameters:

Name Type Description Default
title Optional[str]

Optional title to show before the visualizations.

None
Source code in zenml/models/v2/core/artifact_version.py
def visualize(self, title: Optional[str] = None) -> None:
    """Visualize the artifact in notebook environments.

    Args:
        title: Optional title to show before the visualizations.
    """
    from zenml.utils.visualization_utils import visualize_artifact

    visualize_artifact(self, title=title)
ArtifactVersionResponseBody (WorkspaceScopedResponseBody) pydantic-model

Response body for artifact versions.

Source code in zenml/models/v2/core/artifact_version.py
class ArtifactVersionResponseBody(WorkspaceScopedResponseBody):
    """Response body for artifact versions."""

    artifact: ArtifactResponse = Field(
        title="Artifact to which this version belongs."
    )
    version: Union[str, int] = Field(
        title="Version of the artifact.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    uri: str = Field(
        title="URI of the artifact.", max_length=TEXT_FIELD_MAX_LENGTH
    )
    type: ArtifactType = Field(title="Type of the artifact.")
    materializer: Source = Field(
        title="Materializer class to use for this artifact.",
    )
    data_type: Source = Field(
        title="Data type of the artifact.",
    )
    tags: List[TagResponse] = Field(
        title="Tags associated with the model",
    )
    producer_pipeline_run_id: Optional[UUID] = Field(
        title="The ID of the pipeline run that generated this artifact version."
    )

    _convert_source = convert_source_validator("materializer", "data_type")
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

ArtifactVersionResponseMetadata (WorkspaceScopedResponseMetadata) pydantic-model

Response metadata for artifact versions.

Source code in zenml/models/v2/core/artifact_version.py
class ArtifactVersionResponseMetadata(WorkspaceScopedResponseMetadata):
    """Response metadata for artifact versions."""

    artifact_store_id: Optional[UUID] = Field(
        title="ID of the artifact store in which this artifact is stored.",
        default=None,
    )
    producer_step_run_id: Optional[UUID] = Field(
        title="ID of the step run that produced this artifact.",
        default=None,
    )
    visualizations: Optional[List["ArtifactVisualizationResponse"]] = Field(
        default=None, title="Visualizations of the artifact."
    )
    run_metadata: Dict[str, "RunMetadataResponse"] = Field(
        default={}, title="Metadata of the artifact."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

ArtifactVersionUpdate (BaseModel) pydantic-model

Artifact version update model.

Source code in zenml/models/v2/core/artifact_version.py
class ArtifactVersionUpdate(BaseModel):
    """Artifact version update model."""

    name: Optional[str] = None
    add_tags: Optional[List[str]] = None
    remove_tags: Optional[List[str]] = None
LazyArtifactVersionResponse (ArtifactVersionResponse) pydantic-model

Lazy artifact version response.

Used if the artifact version is accessed from the model in a pipeline context available only during pipeline compilation.

Source code in zenml/models/v2/core/artifact_version.py
class LazyArtifactVersionResponse(ArtifactVersionResponse):
    """Lazy artifact version response.

    Used if the artifact version is accessed from the model in
    a pipeline context available only during pipeline compilation.
    """

    id: Optional[UUID] = None  # type: ignore[assignment]
    _lazy_load_name: Optional[str] = None
    _lazy_load_version: Optional[str] = None
    _lazy_load_model: "Model"

    def get_body(self) -> None:  # type: ignore[override]
        """Protects from misuse of the lazy loader.

        Raises:
            RuntimeError: always
        """
        raise RuntimeError("Cannot access artifact body before pipeline runs.")

    def get_metadata(self) -> None:  # type: ignore[override]
        """Protects from misuse of the lazy loader.

        Raises:
            RuntimeError: always
        """
        raise RuntimeError(
            "Cannot access artifact metadata before pipeline runs."
        )

    @property
    def run_metadata(self) -> Dict[str, "RunMetadataResponse"]:
        """The `metadata` property in lazy loading mode.

        Returns:
            getter of lazy responses for internal use.
        """
        from zenml.metadata.lazy_load import RunMetadataLazyGetter

        return RunMetadataLazyGetter(  # type: ignore[return-value]
            self._lazy_load_model,
            self._lazy_load_name,
            self._lazy_load_version,
        )
run_metadata: Dict[str, RunMetadataResponse] property readonly

The metadata property in lazy loading mode.

Returns:

Type Description
Dict[str, RunMetadataResponse]

getter of lazy responses for internal use.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_body(self)

Protects from misuse of the lazy loader.

Exceptions:

Type Description
RuntimeError

always

Source code in zenml/models/v2/core/artifact_version.py
def get_body(self) -> None:  # type: ignore[override]
    """Protects from misuse of the lazy loader.

    Raises:
        RuntimeError: always
    """
    raise RuntimeError("Cannot access artifact body before pipeline runs.")
get_metadata(self)

Protects from misuse of the lazy loader.

Exceptions:

Type Description
RuntimeError

always

Source code in zenml/models/v2/core/artifact_version.py
def get_metadata(self) -> None:  # type: ignore[override]
    """Protects from misuse of the lazy loader.

    Raises:
        RuntimeError: always
    """
    raise RuntimeError(
        "Cannot access artifact metadata before pipeline runs."
    )
WorkspaceScopedResponse[ArtifactVersionResponseBody, ArtifactVersionResponseMetadata] (WorkspaceScopedResponse, UserScopedResponse[WorkspaceBody, WorkspaceMetadata][ArtifactVersionResponseBody, ArtifactVersionResponseMetadata]) pydantic-model
Config

Pydantic configuration class.

Source code in zenml/models/v2/core/artifact_version.py
class Config:
    """Pydantic configuration class."""

    # This is needed to allow the REST client and server to unpack SecretStr
    # values correctly.
    json_encoders = {
        SecretStr: lambda v: v.get_secret_value()
        if v is not None
        else None
    }

    # Allow extras on all models to support forwards and backwards
    # compatibility (e.g. new fields in newer versions of ZenML servers
    # are allowed to be present in older versions of ZenML clients and
    # vice versa).
    extra = "allow"
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

artifact_visualization

Models representing artifact visualizations.

ArtifactVisualizationRequest (BaseRequest) pydantic-model

Request model for artifact visualization.

Source code in zenml/models/v2/core/artifact_visualization.py
class ArtifactVisualizationRequest(BaseRequest):
    """Request model for artifact visualization."""

    type: VisualizationType
    uri: str
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

ArtifactVisualizationResponse (BaseResponse[ArtifactVisualizationResponseBody, ArtifactVisualizationResponseMetadata]) pydantic-model

Response model for artifact visualizations.

Source code in zenml/models/v2/core/artifact_visualization.py
class ArtifactVisualizationResponse(
    BaseResponse[
        ArtifactVisualizationResponseBody,
        ArtifactVisualizationResponseMetadata,
    ]
):
    """Response model for artifact visualizations."""

    def get_hydrated_version(self) -> "ArtifactVisualizationResponse":
        """Get the hydrated version of this artifact visualization.

        Returns:
            an instance of the same entity with the metadata field attached.
        """
        from zenml.client import Client

        return Client().zen_store.get_artifact_visualization(self.id)

    # Body and metadata properties
    @property
    def type(self) -> VisualizationType:
        """The `type` property.

        Returns:
            the value of the property.
        """
        return self.get_body().type

    @property
    def uri(self) -> str:
        """The `uri` property.

        Returns:
            the value of the property.
        """
        return self.get_body().uri

    @property
    def artifact_version_id(self) -> UUID:
        """The `artifact_version_id` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().artifact_version_id
artifact_version_id: UUID property readonly

The artifact_version_id property.

Returns:

Type Description
UUID

the value of the property.

type: VisualizationType property readonly

The type property.

Returns:

Type Description
VisualizationType

the value of the property.

uri: str property readonly

The uri property.

Returns:

Type Description
str

the value of the property.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_hydrated_version(self)

Get the hydrated version of this artifact visualization.

Returns:

Type Description
ArtifactVisualizationResponse

an instance of the same entity with the metadata field attached.

Source code in zenml/models/v2/core/artifact_visualization.py
def get_hydrated_version(self) -> "ArtifactVisualizationResponse":
    """Get the hydrated version of this artifact visualization.

    Returns:
        an instance of the same entity with the metadata field attached.
    """
    from zenml.client import Client

    return Client().zen_store.get_artifact_visualization(self.id)
ArtifactVisualizationResponseBody (BaseResponseBody) pydantic-model

Response body for artifact visualizations.

Source code in zenml/models/v2/core/artifact_visualization.py
class ArtifactVisualizationResponseBody(BaseResponseBody):
    """Response body for artifact visualizations."""

    type: VisualizationType
    uri: str
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

ArtifactVisualizationResponseMetadata (BaseResponseMetadata) pydantic-model

Response metadata model for artifact visualizations.

Source code in zenml/models/v2/core/artifact_visualization.py
class ArtifactVisualizationResponseMetadata(BaseResponseMetadata):
    """Response metadata model for artifact visualizations."""

    artifact_version_id: UUID
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

BaseResponse[ArtifactVisualizationResponseBody, ArtifactVisualizationResponseMetadata] (BaseResponse) pydantic-model
Config

Pydantic configuration class.

Source code in zenml/models/v2/core/artifact_visualization.py
class Config:
    """Pydantic configuration class."""

    # This is needed to allow the REST client and server to unpack SecretStr
    # values correctly.
    json_encoders = {
        SecretStr: lambda v: v.get_secret_value()
        if v is not None
        else None
    }

    # Allow extras on all models to support forwards and backwards
    # compatibility (e.g. new fields in newer versions of ZenML servers
    # are allowed to be present in older versions of ZenML clients and
    # vice versa).
    extra = "allow"
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

code_reference

Models representing code references.

BaseResponse[CodeReferenceResponseBody, CodeReferenceResponseMetadata] (BaseResponse) pydantic-model
Config

Pydantic configuration class.

Source code in zenml/models/v2/core/code_reference.py
class Config:
    """Pydantic configuration class."""

    # This is needed to allow the REST client and server to unpack SecretStr
    # values correctly.
    json_encoders = {
        SecretStr: lambda v: v.get_secret_value()
        if v is not None
        else None
    }

    # Allow extras on all models to support forwards and backwards
    # compatibility (e.g. new fields in newer versions of ZenML servers
    # are allowed to be present in older versions of ZenML clients and
    # vice versa).
    extra = "allow"
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

CodeReferenceRequest (BaseRequest) pydantic-model

Request model for code references.

Source code in zenml/models/v2/core/code_reference.py
class CodeReferenceRequest(BaseRequest):
    """Request model for code references."""

    commit: str = Field(description="The commit of the code reference.")
    subdirectory: str = Field(
        description="The subdirectory of the code reference."
    )
    code_repository: UUID = Field(
        description="The repository of the code reference."
    )
code_repository: UUID pydantic-field required

The repository of the code reference.

commit: str pydantic-field required

The commit of the code reference.

subdirectory: str pydantic-field required

The subdirectory of the code reference.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

CodeReferenceResponse (BaseResponse[CodeReferenceResponseBody, CodeReferenceResponseMetadata]) pydantic-model

Response model for code references.

Source code in zenml/models/v2/core/code_reference.py
class CodeReferenceResponse(
    BaseResponse[CodeReferenceResponseBody, CodeReferenceResponseMetadata]
):
    """Response model for code references."""

    def get_hydrated_version(self) -> "CodeReferenceResponse":
        """Get the hydrated version of this code reference.

        Returns:
            an instance of the same entity with the metadata field attached.
        """
        from zenml.client import Client

        return Client().zen_store.get_code_reference(self.id)

    # Body and metadata properties
    @property
    def commit(self) -> str:
        """The `commit` property.

        Returns:
            the value of the property.
        """
        return self.get_body().commit

    @property
    def subdirectory(self) -> str:
        """The `subdirectory` property.

        Returns:
            the value of the property.
        """
        return self.get_body().subdirectory

    @property
    def code_repository(self) -> "CodeRepositoryResponse":
        """The `code_repository` property.

        Returns:
            the value of the property.
        """
        return self.get_body().code_repository
code_repository: CodeRepositoryResponse property readonly

The code_repository property.

Returns:

Type Description
CodeRepositoryResponse

the value of the property.

commit: str property readonly

The commit property.

Returns:

Type Description
str

the value of the property.

subdirectory: str property readonly

The subdirectory property.

Returns:

Type Description
str

the value of the property.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_hydrated_version(self)

Get the hydrated version of this code reference.

Returns:

Type Description
CodeReferenceResponse

an instance of the same entity with the metadata field attached.

Source code in zenml/models/v2/core/code_reference.py
def get_hydrated_version(self) -> "CodeReferenceResponse":
    """Get the hydrated version of this code reference.

    Returns:
        an instance of the same entity with the metadata field attached.
    """
    from zenml.client import Client

    return Client().zen_store.get_code_reference(self.id)
CodeReferenceResponseBody (BaseResponseBody) pydantic-model

Response body for code references.

Source code in zenml/models/v2/core/code_reference.py
class CodeReferenceResponseBody(BaseResponseBody):
    """Response body for code references."""

    commit: str = Field(description="The commit of the code reference.")
    subdirectory: str = Field(
        description="The subdirectory of the code reference."
    )
    code_repository: "CodeRepositoryResponse" = Field(
        description="The repository of the code reference."
    )
code_repository: CodeRepositoryResponse pydantic-field required

The repository of the code reference.

commit: str pydantic-field required

The commit of the code reference.

subdirectory: str pydantic-field required

The subdirectory of the code reference.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

CodeReferenceResponseMetadata (BaseResponseMetadata) pydantic-model

Response metadata for code references.

Source code in zenml/models/v2/core/code_reference.py
class CodeReferenceResponseMetadata(BaseResponseMetadata):
    """Response metadata for code references."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

code_repository

Models representing code repositories.

CodeRepositoryFilter (WorkspaceScopedFilter) pydantic-model

Model to enable advanced filtering of all code repositories.

Source code in zenml/models/v2/core/code_repository.py
class CodeRepositoryFilter(WorkspaceScopedFilter):
    """Model to enable advanced filtering of all code repositories."""

    name: Optional[str] = Field(
        description="Name of the code repository.",
    )
    workspace_id: Union[UUID, str, None] = Field(
        description="Workspace of the code repository."
    )
    user_id: Union[UUID, str, None] = Field(
        description="User that created the code repository."
    )
name: str pydantic-field

Name of the code repository.

user_id: Union[uuid.UUID, str] pydantic-field

User that created the code repository.

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace of the code repository.

CodeRepositoryRequest (WorkspaceScopedRequest) pydantic-model

Request model for code repositories.

Source code in zenml/models/v2/core/code_repository.py
class CodeRepositoryRequest(WorkspaceScopedRequest):
    """Request model for code repositories."""

    name: str = Field(
        title="The name of the code repository.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    config: Dict[str, Any] = Field(
        description="Configuration for the code repository."
    )
    source: Source = Field(description="The code repository source.")
    logo_url: Optional[str] = Field(
        description="Optional URL of a logo (png, jpg or svg) for the "
        "code repository."
    )
    description: Optional[str] = Field(
        description="Code repository description.",
        max_length=TEXT_FIELD_MAX_LENGTH,
    )
config: Dict[str, Any] pydantic-field required

Configuration for the code repository.

description: ConstrainedStrValue pydantic-field

Code repository description.

logo_url: str pydantic-field

Optional URL of a logo (png, jpg or svg) for the code repository.

source: Source pydantic-field required

The code repository source.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

CodeRepositoryResponse (WorkspaceScopedResponse[CodeRepositoryResponseBody, CodeRepositoryResponseMetadata]) pydantic-model

Response model for code repositories.

Source code in zenml/models/v2/core/code_repository.py
class CodeRepositoryResponse(
    WorkspaceScopedResponse[
        CodeRepositoryResponseBody, CodeRepositoryResponseMetadata
    ]
):
    """Response model for code repositories."""

    name: str = Field(
        title="The name of the code repository.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

    def get_hydrated_version(self) -> "CodeRepositoryResponse":
        """Get the hydrated version of this code repository.

        Returns:
            an instance of the same entity with the metadata field attached.
        """
        from zenml.client import Client

        return Client().zen_store.get_code_repository(self.id)

    # Body and metadata properties
    @property
    def source(self) -> Source:
        """The `source` property.

        Returns:
            the value of the property.
        """
        return self.get_body().source

    @property
    def logo_url(self) -> Optional[str]:
        """The `logo_url` property.

        Returns:
            the value of the property.
        """
        return self.get_body().logo_url

    @property
    def config(self) -> Dict[str, Any]:
        """The `config` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().config

    @property
    def description(self) -> Optional[str]:
        """The `description` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().description
config: Dict[str, Any] property readonly

The config property.

Returns:

Type Description
Dict[str, Any]

the value of the property.

description: Optional[str] property readonly

The description property.

Returns:

Type Description
Optional[str]

the value of the property.

logo_url: Optional[str] property readonly

The logo_url property.

Returns:

Type Description
Optional[str]

the value of the property.

source: Source property readonly

The source property.

Returns:

Type Description
Source

the value of the property.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_hydrated_version(self)

Get the hydrated version of this code repository.

Returns:

Type Description
CodeRepositoryResponse

an instance of the same entity with the metadata field attached.

Source code in zenml/models/v2/core/code_repository.py
def get_hydrated_version(self) -> "CodeRepositoryResponse":
    """Get the hydrated version of this code repository.

    Returns:
        an instance of the same entity with the metadata field attached.
    """
    from zenml.client import Client

    return Client().zen_store.get_code_repository(self.id)
CodeRepositoryResponseBody (WorkspaceScopedResponseBody) pydantic-model

Response body for code repositories.

Source code in zenml/models/v2/core/code_repository.py
class CodeRepositoryResponseBody(WorkspaceScopedResponseBody):
    """Response body for code repositories."""

    source: Source = Field(description="The code repository source.")
    logo_url: Optional[str] = Field(
        default=None,
        description="Optional URL of a logo (png, jpg or svg) for the "
        "code repository.",
    )
logo_url: str pydantic-field

Optional URL of a logo (png, jpg or svg) for the code repository.

source: Source pydantic-field required

The code repository source.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

CodeRepositoryResponseMetadata (WorkspaceScopedResponseMetadata) pydantic-model

Response metadata for code repositories.

Source code in zenml/models/v2/core/code_repository.py
class CodeRepositoryResponseMetadata(WorkspaceScopedResponseMetadata):
    """Response metadata for code repositories."""

    config: Dict[str, Any] = Field(
        description="Configuration for the code repository."
    )
    description: Optional[str] = Field(
        default=None,
        description="Code repository description.",
        max_length=TEXT_FIELD_MAX_LENGTH,
    )
config: Dict[str, Any] pydantic-field required

Configuration for the code repository.

description: ConstrainedStrValue pydantic-field

Code repository description.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

CodeRepositoryUpdate (CodeRepositoryRequest) pydantic-model

Update model for code repositories.

Source code in zenml/models/v2/core/code_repository.py
@update_model
class CodeRepositoryUpdate(CodeRepositoryRequest):
    """Update model for code repositories."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

WorkspaceScopedResponse[CodeRepositoryResponseBody, CodeRepositoryResponseMetadata] (WorkspaceScopedResponse, UserScopedResponse[WorkspaceBody, WorkspaceMetadata][CodeRepositoryResponseBody, CodeRepositoryResponseMetadata]) pydantic-model
Config

Pydantic configuration class.

Source code in zenml/models/v2/core/code_repository.py
class Config:
    """Pydantic configuration class."""

    # This is needed to allow the REST client and server to unpack SecretStr
    # values correctly.
    json_encoders = {
        SecretStr: lambda v: v.get_secret_value()
        if v is not None
        else None
    }

    # Allow extras on all models to support forwards and backwards
    # compatibility (e.g. new fields in newer versions of ZenML servers
    # are allowed to be present in older versions of ZenML clients and
    # vice versa).
    extra = "allow"
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

component

Models representing components.

ComponentBase (BaseModel) pydantic-model

Base model for components.

Source code in zenml/models/v2/core/component.py
class ComponentBase(BaseModel):
    """Base model for components."""

    name: str = Field(
        title="The name of the stack component.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

    type: StackComponentType = Field(
        title="The type of the stack component.",
    )

    flavor: str = Field(
        title="The flavor of the stack component.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

    configuration: Dict[str, Any] = Field(
        title="The stack component configuration.",
    )

    connector_resource_id: Optional[str] = Field(
        default=None,
        description="The ID of a specific resource instance to "
        "gain access to through the connector",
    )

    labels: Optional[Dict[str, Any]] = Field(
        default=None,
        title="The stack component labels.",
    )

    component_spec_path: Optional[str] = Field(
        default=None,
        title="The path to the component spec used for mlstacks deployments.",
    )
connector_resource_id: str pydantic-field

The ID of a specific resource instance to gain access to through the connector

ComponentFilter (WorkspaceScopedFilter) pydantic-model

Model to enable advanced filtering of all ComponentModels.

The Component Model needs additional scoping. As such the _scope_user field can be set to the user that is doing the filtering. The generate_filter() method of the baseclass is overwritten to include the scoping.

Source code in zenml/models/v2/core/component.py
class ComponentFilter(WorkspaceScopedFilter):
    """Model to enable advanced filtering of all ComponentModels.

    The Component Model needs additional scoping. As such the `_scope_user`
    field can be set to the user that is doing the filtering. The
    `generate_filter()` method of the baseclass is overwritten to include the
    scoping.
    """

    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *WorkspaceScopedFilter.FILTER_EXCLUDE_FIELDS,
        "scope_type",
        "stack_id",
    ]
    CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *WorkspaceScopedFilter.CLI_EXCLUDE_FIELDS,
        "scope_type",
    ]
    scope_type: Optional[str] = Field(
        default=None,
        description="The type to scope this query to.",
    )

    name: Optional[str] = Field(
        default=None,
        description="Name of the stack component",
    )
    flavor: Optional[str] = Field(
        default=None,
        description="Flavor of the stack component",
    )
    type: Optional[str] = Field(
        default=None,
        description="Type of the stack component",
    )
    workspace_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Workspace of the stack component"
    )
    user_id: Optional[Union[UUID, str]] = Field(
        default=None, description="User of the stack component"
    )
    connector_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Connector linked to the stack component"
    )
    stack_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Stack of the stack component"
    )

    def set_scope_type(self, component_type: str) -> None:
        """Set the type of component on which to perform the filtering to scope the response.

        Args:
            component_type: The type of component to scope the query to.
        """
        self.scope_type = component_type

    def generate_filter(
        self, table: Type["SQLModel"]
    ) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
        """Generate the filter for the query.

        Stack components can be scoped by type to narrow the search.

        Args:
            table: The Table that is being queried from.

        Returns:
            The filter expression for the query.
        """
        from sqlalchemy import and_, or_

        from zenml.zen_stores.schemas import (
            StackComponentSchema,
            StackCompositionSchema,
        )

        base_filter = super().generate_filter(table)
        if self.scope_type:
            type_filter = getattr(table, "type") == self.scope_type
            return and_(base_filter, type_filter)

        if self.stack_id:
            operator = (
                or_ if self.logical_operator == LogicalOperators.OR else and_
            )

            stack_filter = and_(  # type: ignore[type-var]
                StackCompositionSchema.stack_id == self.stack_id,
                StackCompositionSchema.component_id == StackComponentSchema.id,
            )
            base_filter = operator(base_filter, stack_filter)

        return base_filter
connector_id: Union[uuid.UUID, str] pydantic-field

Connector linked to the stack component

flavor: str pydantic-field

Flavor of the stack component

name: str pydantic-field

Name of the stack component

scope_type: str pydantic-field

The type to scope this query to.

stack_id: Union[uuid.UUID, str] pydantic-field

Stack of the stack component

type: str pydantic-field

Type of the stack component

user_id: Union[uuid.UUID, str] pydantic-field

User of the stack component

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace of the stack component

generate_filter(self, table)

Generate the filter for the query.

Stack components can be scoped by type to narrow the search.

Parameters:

Name Type Description Default
table Type[SQLModel]

The Table that is being queried from.

required

Returns:

Type Description
Union[BinaryExpression[Any], BooleanClauseList[Any]]

The filter expression for the query.

Source code in zenml/models/v2/core/component.py
def generate_filter(
    self, table: Type["SQLModel"]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
    """Generate the filter for the query.

    Stack components can be scoped by type to narrow the search.

    Args:
        table: The Table that is being queried from.

    Returns:
        The filter expression for the query.
    """
    from sqlalchemy import and_, or_

    from zenml.zen_stores.schemas import (
        StackComponentSchema,
        StackCompositionSchema,
    )

    base_filter = super().generate_filter(table)
    if self.scope_type:
        type_filter = getattr(table, "type") == self.scope_type
        return and_(base_filter, type_filter)

    if self.stack_id:
        operator = (
            or_ if self.logical_operator == LogicalOperators.OR else and_
        )

        stack_filter = and_(  # type: ignore[type-var]
            StackCompositionSchema.stack_id == self.stack_id,
            StackCompositionSchema.component_id == StackComponentSchema.id,
        )
        base_filter = operator(base_filter, stack_filter)

    return base_filter
set_scope_type(self, component_type)

Set the type of component on which to perform the filtering to scope the response.

Parameters:

Name Type Description Default
component_type str

The type of component to scope the query to.

required
Source code in zenml/models/v2/core/component.py
def set_scope_type(self, component_type: str) -> None:
    """Set the type of component on which to perform the filtering to scope the response.

    Args:
        component_type: The type of component to scope the query to.
    """
    self.scope_type = component_type
ComponentRequest (ComponentBase, WorkspaceScopedRequest) pydantic-model

Request model for components.

Source code in zenml/models/v2/core/component.py
class ComponentRequest(ComponentBase, WorkspaceScopedRequest):
    """Request model for components."""

    ANALYTICS_FIELDS: ClassVar[List[str]] = ["type", "flavor"]

    connector: Optional[UUID] = Field(
        default=None,
        title="The service connector linked to this stack component.",
    )

    @validator("name")
    def name_cant_be_a_secret_reference(cls, name: str) -> str:
        """Validator to ensure that the given name is not a secret reference.

        Args:
            name: The name to validate.

        Returns:
            The name if it is not a secret reference.

        Raises:
            ValueError: If the name is a secret reference.
        """
        if secret_utils.is_secret_reference(name):
            raise ValueError(
                "Passing the `name` attribute of a stack component as a "
                "secret reference is not allowed."
            )
        return name
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

name_cant_be_a_secret_reference(name) classmethod

Validator to ensure that the given name is not a secret reference.

Parameters:

Name Type Description Default
name str

The name to validate.

required

Returns:

Type Description
str

The name if it is not a secret reference.

Exceptions:

Type Description
ValueError

If the name is a secret reference.

Source code in zenml/models/v2/core/component.py
@validator("name")
def name_cant_be_a_secret_reference(cls, name: str) -> str:
    """Validator to ensure that the given name is not a secret reference.

    Args:
        name: The name to validate.

    Returns:
        The name if it is not a secret reference.

    Raises:
        ValueError: If the name is a secret reference.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )
    return name
ComponentResponse (WorkspaceScopedResponse[ComponentResponseBody, ComponentResponseMetadata]) pydantic-model

Response model for components.

Source code in zenml/models/v2/core/component.py
class ComponentResponse(
    WorkspaceScopedResponse[ComponentResponseBody, ComponentResponseMetadata]
):
    """Response model for components."""

    ANALYTICS_FIELDS: ClassVar[List[str]] = ["type", "flavor"]

    name: str = Field(
        title="The name of the stack component.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

    def get_hydrated_version(self) -> "ComponentResponse":
        """Get the hydrated version of this component.

        Returns:
            an instance of the same entity with the metadata field attached.
        """
        from zenml.client import Client

        return Client().zen_store.get_stack_component(self.id)

    # Body and metadata properties
    @property
    def type(self) -> StackComponentType:
        """The `type` property.

        Returns:
            the value of the property.
        """
        return self.get_body().type

    @property
    def flavor(self) -> str:
        """The `flavor` property.

        Returns:
            the value of the property.
        """
        return self.get_body().flavor

    @property
    def created(self) -> datetime:
        """The`created` property.

        Returns:
            the value of the property.
        """
        return self.get_body().created

    @property
    def updated(self) -> datetime:
        """The `updated` property.

        Returns:
            the value of the property.
        """
        return self.get_body().updated

    @property
    def configuration(self) -> Dict[str, Any]:
        """The `configuration` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().configuration

    @property
    def labels(self) -> Optional[Dict[str, Any]]:
        """The `labels` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().labels

    @property
    def component_spec_path(self) -> Optional[str]:
        """The `component_spec_path` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().component_spec_path

    @property
    def connector_resource_id(self) -> Optional[str]:
        """The `connector_resource_id` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().connector_resource_id

    @property
    def connector(self) -> Optional["ServiceConnectorResponse"]:
        """The `connector` property.

        Returns:
            the value of the property.
        """
        return self.get_metadata().connector
component_spec_path: Optional[str] property readonly

The component_spec_path property.

Returns:

Type Description
Optional[str]

the value of the property.

configuration: Dict[str, Any] property readonly

The configuration property.

Returns:

Type Description
Dict[str, Any]

the value of the property.

connector: Optional[ServiceConnectorResponse] property readonly

The connector property.

Returns:

Type Description
Optional[ServiceConnectorResponse]

the value of the property.

connector_resource_id: Optional[str] property readonly

The connector_resource_id property.

Returns:

Type Description
Optional[str]

the value of the property.

created: datetime property readonly

Thecreated property.

Returns:

Type Description
datetime

the value of the property.

flavor: str property readonly

The flavor property.

Returns:

Type Description
str

the value of the property.

labels: Optional[Dict[str, Any]] property readonly

The labels property.

Returns:

Type Description
Optional[Dict[str, Any]]

the value of the property.

type: StackComponentType property readonly

The type property.

Returns:

Type Description
StackComponentType

the value of the property.

updated: datetime property readonly

The updated property.

Returns:

Type Description
datetime

the value of the property.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_hydrated_version(self)

Get the hydrated version of this component.

Returns:

Type Description
ComponentResponse

an instance of the same entity with the metadata field attached.

Source code in zenml/models/v2/core/component.py
def get_hydrated_version(self) -> "ComponentResponse":
    """Get the hydrated version of this component.

    Returns:
        an instance of the same entity with the metadata field attached.
    """
    from zenml.client import Client

    return Client().zen_store.get_stack_component(self.id)
ComponentResponseBody (WorkspaceScopedResponseBody) pydantic-model

Response body for components.

Source code in zenml/models/v2/core/component.py
class ComponentResponseBody(WorkspaceScopedResponseBody):
    """Response body for components."""

    type: StackComponentType = Field(
        title="The type of the stack component.",
    )
    flavor: str = Field(
        title="The flavor of the stack component.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    created: datetime = Field(
        title="The timestamp when this component was created."
    )
    updated: datetime = Field(
        title="The timestamp when this component was last updated.",
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

ComponentResponseMetadata (WorkspaceScopedResponseMetadata) pydantic-model

Response metadata for components.

Source code in zenml/models/v2/core/component.py
class ComponentResponseMetadata(WorkspaceScopedResponseMetadata):
    """Response metadata for components."""

    configuration: Dict[str, Any] = Field(
        title="The stack component configuration.",
    )
    labels: Optional[Dict[str, Any]] = Field(
        default=None,
        title="The stack component labels.",
    )
    component_spec_path: Optional[str] = Field(
        default=None,
        title="The path to the component spec used for mlstacks deployments.",
    )
    connector_resource_id: Optional[str] = Field(
        default=None,
        description="The ID of a specific resource instance to "
        "gain access to through the connector",
    )
    connector: Optional["ServiceConnectorResponse"] = Field(
        default=None,
        title="The service connector linked to this stack component.",
    )
connector_resource_id: str pydantic-field

The ID of a specific resource instance to gain access to through the connector

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

ComponentUpdate (ComponentRequest) pydantic-model

Update model for stack components.

Source code in zenml/models/v2/core/component.py
@update_model
class ComponentUpdate(ComponentRequest):
    """Update model for stack components."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

InternalComponentRequest (ComponentRequest) pydantic-model

Internal component request model.

Source code in zenml/models/v2/core/component.py
@server_owned_request_model
class InternalComponentRequest(ComponentRequest):
    """Internal component request model."""

    pass
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

WorkspaceScopedResponse[ComponentResponseBody, ComponentResponseMetadata] (WorkspaceScopedResponse, UserScopedResponse[WorkspaceBody, WorkspaceMetadata][ComponentResponseBody, ComponentResponseMetadata]) pydantic-model
Config

Pydantic configuration class.

Source code in zenml/models/v2/core/component.py
class Config:
    """Pydantic configuration class."""

    # This is needed to allow the REST client and server to unpack SecretStr
    # values correctly.
    json_encoders = {
        SecretStr: lambda v: v.get_secret_value()
        if v is not None
        else None
    }

    # Allow extras on all models to support forwards and backwards
    # compatibility (e.g. new fields in newer versions of ZenML servers
    # are allowed to be present in older versions of ZenML clients and
    # vice versa).
    extra = "allow"
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

device

Models representing devices.

OAuthDeviceFilter (UserScopedFilter) pydantic-model

Model to enable advanced filtering of OAuth2 devices.

Source code in zenml/models/v2/core/device.py
class OAuthDeviceFilter(UserScopedFilter):
    """Model to enable advanced filtering of OAuth2 devices."""

    expires: Optional[Union[datetime, str, None]] = Field(
        default=None, description="The expiration date of the OAuth2 device."
    )
    client_id: Union[UUID, str, None] = Field(
        default=None, description="The client ID of the OAuth2 device."
    )
    status: Union[OAuthDeviceStatus, str, None] = Field(
        default=None, description="The status of the OAuth2 device."
    )
    trusted_device: Union[bool, str, None] = Field(
        default=None,
        description="Whether the OAuth2 device was marked as trusted.",
    )
    failed_auth_attempts: Union[int, str, None] = Field(
        default=None,
        description="The number of failed authentication attempts.",
    )
    last_login: Optional[Union[datetime, str, None]] = Field(
        default=None, description="The date of the last successful login."
    )
client_id: Union[uuid.UUID, str] pydantic-field

The client ID of the OAuth2 device.

expires: Union[datetime.datetime, str] pydantic-field

The expiration date of the OAuth2 device.

failed_auth_attempts: Union[int, str] pydantic-field

The number of failed authentication attempts.

last_login: Union[datetime.datetime, str] pydantic-field

The date of the last successful login.

status: Union[zenml.enums.OAuthDeviceStatus, str] pydantic-field

The status of the OAuth2 device.

trusted_device: Union[bool, str] pydantic-field

Whether the OAuth2 device was marked as trusted.

OAuthDeviceInternalRequest (BaseRequest) pydantic-model

Internal request model for OAuth2 devices.

Source code in zenml/models/v2/core/device.py
class OAuthDeviceInternalRequest(BaseRequest):
    """Internal request model for OAuth2 devices."""

    client_id: UUID = Field(description="The client ID of the OAuth2 device.")
    expires_in: int = Field(
        description="The number of seconds after which the OAuth2 device "
        "expires and can no longer be used for authentication."
    )
    os: Optional[str] = Field(
        default=None,
        description="The operating system of the device used for "
        "authentication.",
    )
    ip_address: Optional[str] = Field(
        default=None,
        description="The IP address of the device used for authentication.",
    )
    hostname: Optional[str] = Field(
        default=None,
        description="The hostname of the device used for authentication.",
    )
    python_version: Optional[str] = Field(
        default=None,
        description="The Python version of the device used for authentication.",
    )
    zenml_version: Optional[str] = Field(
        default=None,
        description="The ZenML version of the device used for authentication.",
    )
    city: Optional[str] = Field(
        default=None,
        description="The city where the device is located.",
    )
    region: Optional[str] = Field(
        default=None,
        description="The region where the device is located.",
    )
    country: Optional[str] = Field(
        default=None,
        description="The country where the device is located.",
    )
city: str pydantic-field

The city where the device is located.

client_id: UUID pydantic-field required

The client ID of the OAuth2 device.

country: str pydantic-field

The country where the device is located.

expires_in: int pydantic-field required

The number of seconds after which the OAuth2 device expires and can no longer be used for authentication.

hostname: str pydantic-field

The hostname of the device used for authentication.

ip_address: str pydantic-field

The IP address of the device used for authentication.

os: str pydantic-field

The operating system of the device used for authentication.

python_version: str pydantic-field

The Python version of the device used for authentication.

region: str pydantic-field

The region where the device is located.

zenml_version: str pydantic-field

The ZenML version of the device used for authentication.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

OAuthDeviceInternalResponse (OAuthDeviceResponse) pydantic-model

OAuth2 device response model used internally for authentication.

Source code in zenml/models/v2/core/device.py
class OAuthDeviceInternalResponse(OAuthDeviceResponse):
    """OAuth2 device response model used internally for authentication."""

    user_code: str = Field(
        title="The user code.",
    )
    device_code: str = Field(
        title="The device code.",
    )

    def _verify_code(
        self,
        code: str,
        code_hash: Optional[str],
    ) -> bool:
        """Verifies a given code against the stored (hashed) code.

        Args:
            code: The code to verify.
            code_hash: The hashed code to verify against.

        Returns:
            True if the code is valid, False otherwise.
        """
        context = CryptContext(schemes=["bcrypt"], deprecated="auto")
        result = context.verify(code, code_hash)

        return result

    def verify_user_code(
        self,
        user_code: str,
    ) -> bool:
        """Verifies a given user code against the stored (hashed) user code.

        Args:
            user_code: The user code to verify.

        Returns:
            True if the user code is valid, False otherwise.
        """
        return self._verify_code(user_code, self.user_code)

    def verify_device_code(
        self,
        device_code: str,
    ) -> bool:
        """Verifies a given device code against the stored (hashed) device code.

        Args:
            device_code: The device code to verify.

        Returns:
            True if the device code is valid, False otherwise.
        """
        return self._verify_code(device_code, self.device_code)
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

verify_device_code(self, device_code)

Verifies a given device code against the stored (hashed) device code.

Parameters:

Name Type Description Default
device_code str

The device code to verify.

required

Returns:

Type Description
bool

True if the device code is valid, False otherwise.

Source code in zenml/models/v2/core/device.py
def verify_device_code(
    self,
    device_code: str,
) -> bool:
    """Verifies a given device code against the stored (hashed) device code.

    Args:
        device_code: The device code to verify.

    Returns:
        True if the device code is valid, False otherwise.
    """
    return self._verify_code(device_code, self.device_code)
verify_user_code(self, user_code)

Verifies a given user code against the stored (hashed) user code.

Parameters:

Name Type Description Default
user_code str

The user code to verify.

required

Returns:

Type Description
bool

True if the user code is valid, False otherwise.

Source code in zenml/models/v2/core/device.py
def verify_user_code(
    self,
    user_code: str,
) -> bool:
    """Verifies a given user code against the stored (hashed) user code.

    Args:
        user_code: The user code to verify.

    Returns:
        True if the user code is valid, False otherwise.
    """
    return self._verify_code(user_code, self.user_code)
OAuthDeviceInternalUpdate (OAuthDeviceUpdate) pydantic-model

OAuth2 device update model used internally for authentication.

Source code in zenml/models/v2/core/device.py
class OAuthDeviceInternalUpdate(OAuthDeviceUpdate):
    """OAuth2 device update model used internally for authentication."""

    user_id: Optional[UUID] = Field(
        default=None, description="User that owns the OAuth2 device."
    )
    status: Optional[OAuthDeviceStatus] = Field(
        default=None, description="The new status of the OAuth2 device."
    )
    expires_in: Optional[int] = Field(
        default=None,
        description="Set the device to expire in the given number of seconds. "
        "If the value is 0 or negative, the device is set to never expire.",
    )
    failed_auth_attempts: Optional[int] = Field(
        default=None,
        description="Set the number of failed authentication attempts.",
    )
    trusted_device: Optional[bool] = Field(
        default=None,
        description="Whether to mark the OAuth2 device as trusted. A trusted "
        "device has a much longer validity time.",
    )
    update_last_login: bool = Field(
        default=False, description="Whether to update the last login date."
    )
    generate_new_codes: bool = Field(
        default=False,
        description="Whether to generate new user and device codes.",
    )
    os: Optional[str] = Field(
        default=None,
        description="The operating system of the device used for "
        "authentication.",
    )
    ip_address: Optional[str] = Field(
        default=None,
        description="The IP address of the device used for authentication.",
    )
    hostname: Optional[str] = Field(
        default=None,
        description="The hostname of the device used for authentication.",
    )
    python_version: Optional[str] = Field(
        default=None,
        description="The Python version of the device used for authentication.",
    )
    zenml_version: Optional[str] = Field(
        default=None,
        description="The ZenML version of the device used for authentication.",
    )
    city: Optional[str] = Field(
        default=None,
        description="The city where the device is located.",
    )
    region: Optional[str] = Field(
        default=None,
        description="The