Skip to content

Models

zenml.models special

Pydantic models for the various concepts in ZenML.

artifact_models

Models representing artifacts.

ArtifactBaseModel (BaseModel) pydantic-model

Base model for artifacts.

Source code in zenml/models/artifact_models.py
class ArtifactBaseModel(BaseModel):
    """Base model for artifacts."""

    name: str = Field(
        title="Name of the output in the parent step.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    artifact_store_id: Optional[UUID] = Field(
        title="ID of the artifact store in which this artifact is stored.",
        default=None,
    )
    type: ArtifactType = Field(title="Type of the artifact.")
    uri: str = Field(
        title="URI of the artifact.", max_length=STR_FIELD_MAX_LENGTH
    )
    materializer: Source = Field(
        title="Materializer class to use for this artifact.",
    )
    data_type: Source = Field(
        title="Data type of the artifact.",
    )
    visualizations: Optional[List[VisualizationModel]] = Field(
        default=None, title="Visualizations of the artifact."
    )

    _convert_source = convert_source_validator("materializer", "data_type")

ArtifactFilterModel (WorkspaceScopedFilterModel) pydantic-model

Model to enable advanced filtering of all Artifacts.

Source code in zenml/models/artifact_models.py
class ArtifactFilterModel(WorkspaceScopedFilterModel):
    """Model to enable advanced filtering of all Artifacts."""

    # `only_unused` refers to a property of the artifacts relationship
    #  rather than a field in the db, hence it needs to be handled
    #  explicitly
    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *WorkspaceScopedFilterModel.FILTER_EXCLUDE_FIELDS,
        "only_unused",
    ]

    name: Optional[str] = Field(
        default=None,
        description="Name of the artifact",
    )
    uri: Optional[str] = Field(
        default=None,
        description="Uri of the artifact",
    )
    materializer: Optional[str] = Field(
        default=None,
        description="Materializer used to produce the artifact",
    )
    type: Optional[str] = Field(
        default=None,
        description="Type of the artifact",
    )
    data_type: Optional[str] = Field(
        default=None,
        description="Datatype of the artifact",
    )
    artifact_store_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Artifact store for this artifact"
    )
    workspace_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Workspace for this artifact"
    )
    user_id: Optional[Union[UUID, str]] = Field(
        default=None, description="User that produced this artifact"
    )
    only_unused: Optional[bool] = Field(
        default=False, description="Filter only for unused artifacts"
    )
artifact_store_id: Union[uuid.UUID, str] pydantic-field

Artifact store for this artifact

data_type: str pydantic-field

Datatype of the artifact

materializer: str pydantic-field

Materializer used to produce the artifact

name: str pydantic-field

Name of the artifact

only_unused: bool pydantic-field

Filter only for unused artifacts

type: str pydantic-field

Type of the artifact

uri: str pydantic-field

Uri of the artifact

user_id: Union[uuid.UUID, str] pydantic-field

User that produced this artifact

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace for this artifact

ArtifactRequestModel (ArtifactBaseModel, WorkspaceScopedRequestModel) pydantic-model

Request model for artifacts.

Source code in zenml/models/artifact_models.py
class ArtifactRequestModel(ArtifactBaseModel, WorkspaceScopedRequestModel):
    """Request model for artifacts."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

ArtifactResponseModel (ArtifactBaseModel, WorkspaceScopedResponseModel) pydantic-model

Response model for artifacts.

Source code in zenml/models/artifact_models.py
class ArtifactResponseModel(ArtifactBaseModel, WorkspaceScopedResponseModel):
    """Response model for artifacts."""

    producer_step_run_id: Optional[UUID] = Field(
        title="ID of the step run that produced this artifact.",
        default=None,
    )
    metadata: Dict[str, "RunMetadataResponseModel"] = Field(
        default={}, title="Metadata of the artifact."
    )

    @property
    def step(self) -> "StepRunResponseModel":
        """Get the step that produced this artifact.

        Returns:
            The step that produced this artifact.
        """
        from zenml.utils.artifact_utils import get_producer_step_of_artifact

        return get_producer_step_of_artifact(self)

    @property
    def run(self) -> "PipelineRunResponseModel":
        """Get the pipeline run that produced this artifact.

        Returns:
            The pipeline run that produced this artifact.
        """
        return self.step.run

    def load(self) -> Any:
        """Materializes (loads) the data stored in this artifact.

        Returns:
            The materialized data.
        """
        from zenml.utils.artifact_utils import load_artifact

        return load_artifact(self)

    def read(self) -> Any:
        """(Deprecated) Materializes (loads) the data stored in this artifact.

        Returns:
            The materialized data.
        """
        logger.warning(
            "`artifact.read()` is deprecated and will be removed in a future "
            "release. Please use `artifact.load()` instead."
        )
        return self.load()

    def visualize(self, title: Optional[str] = None) -> None:
        """Visualize the artifact in notebook environments.

        Args:
            title: Optional title to show before the visualizations.
        """
        from zenml.utils.visualization_utils import visualize_artifact

        visualize_artifact(self, title=title)
run: PipelineRunResponseModel property readonly

Get the pipeline run that produced this artifact.

Returns:

Type Description
PipelineRunResponseModel

The pipeline run that produced this artifact.

step: StepRunResponseModel property readonly

Get the step that produced this artifact.

Returns:

Type Description
StepRunResponseModel

The step that produced this artifact.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

load(self)

Materializes (loads) the data stored in this artifact.

Returns:

Type Description
Any

The materialized data.

Source code in zenml/models/artifact_models.py
def load(self) -> Any:
    """Materializes (loads) the data stored in this artifact.

    Returns:
        The materialized data.
    """
    from zenml.utils.artifact_utils import load_artifact

    return load_artifact(self)
read(self)

(Deprecated) Materializes (loads) the data stored in this artifact.

Returns:

Type Description
Any

The materialized data.

Source code in zenml/models/artifact_models.py
def read(self) -> Any:
    """(Deprecated) Materializes (loads) the data stored in this artifact.

    Returns:
        The materialized data.
    """
    logger.warning(
        "`artifact.read()` is deprecated and will be removed in a future "
        "release. Please use `artifact.load()` instead."
    )
    return self.load()
visualize(self, title=None)

Visualize the artifact in notebook environments.

Parameters:

Name Type Description Default
title Optional[str]

Optional title to show before the visualizations.

None
Source code in zenml/models/artifact_models.py
def visualize(self, title: Optional[str] = None) -> None:
    """Visualize the artifact in notebook environments.

    Args:
        title: Optional title to show before the visualizations.
    """
    from zenml.utils.visualization_utils import visualize_artifact

    visualize_artifact(self, title=title)

base_models

Base domain model definitions.

BaseRequestModel (BaseZenModel) pydantic-model

Base request model.

Used as a base class for all request models.

Source code in zenml/models/base_models.py
class BaseRequestModel(BaseZenModel):
    """Base request model.

    Used as a base class for all request models.
    """
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

BaseResponseModel (BaseZenModel) pydantic-model

Base domain model.

Used as a base class for all domain models that have the following common characteristics:

  • are uniquely identified by a UUID
  • have a creation timestamp and a last modified timestamp
Source code in zenml/models/base_models.py
class BaseResponseModel(BaseZenModel):
    """Base domain model.

    Used as a base class for all domain models that have the following common
    characteristics:

      * are uniquely identified by a UUID
      * have a creation timestamp and a last modified timestamp
    """

    id: UUID = Field(title="The unique resource id.")

    created: datetime = Field(title="Time when this resource was created.")
    updated: datetime = Field(
        title="Time when this resource was last updated."
    )

    def __hash__(self) -> int:
        """Implementation of hash magic method.

        Returns:
            Hash of the UUID.
        """
        return hash((type(self),) + tuple([self.id]))

    def __eq__(self, other: Any) -> bool:
        """Implementation of equality magic method.

        Args:
            other: The other object to compare to.

        Returns:
            True if the other object is of the same type and has the same UUID.
        """
        if isinstance(other, BaseResponseModel):
            return self.id == other.id
        else:
            return False

    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Fetches the analytics metadata for base response models.

        Returns:
            The analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        metadata["entity_id"] = self.id
        return metadata
__eq__(self, other) special

Implementation of equality magic method.

Parameters:

Name Type Description Default
other Any

The other object to compare to.

required

Returns:

Type Description
bool

True if the other object is of the same type and has the same UUID.

Source code in zenml/models/base_models.py
def __eq__(self, other: Any) -> bool:
    """Implementation of equality magic method.

    Args:
        other: The other object to compare to.

    Returns:
        True if the other object is of the same type and has the same UUID.
    """
    if isinstance(other, BaseResponseModel):
        return self.id == other.id
    else:
        return False
__hash__(self) special

Implementation of hash magic method.

Returns:

Type Description
int

Hash of the UUID.

Source code in zenml/models/base_models.py
def __hash__(self) -> int:
    """Implementation of hash magic method.

    Returns:
        Hash of the UUID.
    """
    return hash((type(self),) + tuple([self.id]))
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_analytics_metadata(self)

Fetches the analytics metadata for base response models.

Returns:

Type Description
Dict[str, Any]

The analytics metadata.

Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Fetches the analytics metadata for base response models.

    Returns:
        The analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    metadata["entity_id"] = self.id
    return metadata

BaseZenModel (AnalyticsTrackedModelMixin) pydantic-model

Base model class for all ZenML models.

This class is used as a base class for all ZenML models. It provides functionality for tracking analytics events and proper encoding of SecretStr values.

Source code in zenml/models/base_models.py
class BaseZenModel(AnalyticsTrackedModelMixin):
    """Base model class for all ZenML models.

    This class is used as a base class for all ZenML models. It provides
    functionality for tracking analytics events and proper encoding of
    SecretStr values.
    """

    class Config:
        """Pydantic configuration class."""

        # This is needed to allow the REST client and server to unpack SecretStr
        # values correctly.
        json_encoders = {
            SecretStr: lambda v: v.get_secret_value()
            if v is not None
            else None
        }

        # Allow extras on all models to support forwards and backwards
        # compatibility (e.g. new fields in newer versions of ZenML servers
        # are allowed to be present in older versions of ZenML clients and
        # vice versa).
        extra = "allow"
Config

Pydantic configuration class.

Source code in zenml/models/base_models.py
class Config:
    """Pydantic configuration class."""

    # This is needed to allow the REST client and server to unpack SecretStr
    # values correctly.
    json_encoders = {
        SecretStr: lambda v: v.get_secret_value()
        if v is not None
        else None
    }

    # Allow extras on all models to support forwards and backwards
    # compatibility (e.g. new fields in newer versions of ZenML servers
    # are allowed to be present in older versions of ZenML clients and
    # vice versa).
    extra = "allow"
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

ShareableRequestModel (WorkspaceScopedRequestModel) pydantic-model

Base shareable workspace-scoped domain model.

Used as a base class for all domain models that are workspace-scoped and are shareable.

Source code in zenml/models/base_models.py
class ShareableRequestModel(WorkspaceScopedRequestModel):
    """Base shareable workspace-scoped domain model.

    Used as a base class for all domain models that are workspace-scoped and are
    shareable.
    """

    is_shared: bool = Field(
        default=False,
        title=(
            "Flag describing if this resource is shared with other users in "
            "the same workspace."
        ),
    )

    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Fetches the analytics metadata for workspace scoped models.

        Returns:
            The analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        metadata["is_shared"] = self.is_shared
        return metadata
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_analytics_metadata(self)

Fetches the analytics metadata for workspace scoped models.

Returns:

Type Description
Dict[str, Any]

The analytics metadata.

Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Fetches the analytics metadata for workspace scoped models.

    Returns:
        The analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    metadata["is_shared"] = self.is_shared
    return metadata

ShareableResponseModel (WorkspaceScopedResponseModel) pydantic-model

Base shareable workspace-scoped domain model.

Used as a base class for all domain models that are workspace-scoped and are shareable.

Source code in zenml/models/base_models.py
class ShareableResponseModel(WorkspaceScopedResponseModel):
    """Base shareable workspace-scoped domain model.

    Used as a base class for all domain models that are workspace-scoped and are
    shareable.
    """

    is_shared: bool = Field(
        title=(
            "Flag describing if this resource is shared with other users in "
            "the same workspace."
        ),
    )

    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Fetches the analytics metadata for workspace scoped models.

        Returns:
            The analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        metadata["is_shared"] = self.is_shared
        return metadata
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_analytics_metadata(self)

Fetches the analytics metadata for workspace scoped models.

Returns:

Type Description
Dict[str, Any]

The analytics metadata.

Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Fetches the analytics metadata for workspace scoped models.

    Returns:
        The analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    metadata["is_shared"] = self.is_shared
    return metadata

UserScopedRequestModel (BaseRequestModel) pydantic-model

Base user-owned request model.

Used as a base class for all domain models that are "owned" by a user.

Source code in zenml/models/base_models.py
class UserScopedRequestModel(BaseRequestModel):
    """Base user-owned request model.

    Used as a base class for all domain models that are "owned" by a user.
    """

    user: UUID = Field(title="The id of the user that created this resource.")

    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Fetches the analytics metadata for user scoped models.

        Returns:
            The analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        metadata["user_id"] = self.user
        return metadata
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_analytics_metadata(self)

Fetches the analytics metadata for user scoped models.

Returns:

Type Description
Dict[str, Any]

The analytics metadata.

Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Fetches the analytics metadata for user scoped models.

    Returns:
        The analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    metadata["user_id"] = self.user
    return metadata

UserScopedResponseModel (BaseResponseModel) pydantic-model

Base user-owned domain model.

Used as a base class for all domain models that are "owned" by a user.

Source code in zenml/models/base_models.py
class UserScopedResponseModel(BaseResponseModel):
    """Base user-owned domain model.

    Used as a base class for all domain models that are "owned" by a user.
    """

    user: Union["UserResponseModel", None] = Field(
        title="The user that created this resource.", nullable=True
    )

    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Fetches the analytics metadata for user scoped models.

        Returns:
            The analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        if self.user is not None:
            metadata["user_id"] = self.user.id
        return metadata
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_analytics_metadata(self)

Fetches the analytics metadata for user scoped models.

Returns:

Type Description
Dict[str, Any]

The analytics metadata.

Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Fetches the analytics metadata for user scoped models.

    Returns:
        The analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    if self.user is not None:
        metadata["user_id"] = self.user.id
    return metadata

WorkspaceScopedRequestModel (UserScopedRequestModel) pydantic-model

Base workspace-scoped request domain model.

Used as a base class for all domain models that are workspace-scoped.

Source code in zenml/models/base_models.py
class WorkspaceScopedRequestModel(UserScopedRequestModel):
    """Base workspace-scoped request domain model.

    Used as a base class for all domain models that are workspace-scoped.
    """

    workspace: UUID = Field(
        title="The workspace to which this resource belongs."
    )

    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Fetches the analytics metadata for workspace scoped models.

        Returns:
            The analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        metadata["workspace_id"] = self.workspace
        return metadata
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_analytics_metadata(self)

Fetches the analytics metadata for workspace scoped models.

Returns:

Type Description
Dict[str, Any]

The analytics metadata.

Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Fetches the analytics metadata for workspace scoped models.

    Returns:
        The analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    metadata["workspace_id"] = self.workspace
    return metadata

WorkspaceScopedResponseModel (UserScopedResponseModel) pydantic-model

Base workspace-scoped domain model.

Used as a base class for all domain models that are workspace-scoped.

Source code in zenml/models/base_models.py
class WorkspaceScopedResponseModel(UserScopedResponseModel):
    """Base workspace-scoped domain model.

    Used as a base class for all domain models that are workspace-scoped.
    """

    workspace: "WorkspaceResponseModel" = Field(
        title="The workspace of this resource."
    )

    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Fetches the analytics metadata for workspace scoped models.

        Returns:
            The analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        metadata["workspace_id"] = self.workspace.id
        return metadata
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_analytics_metadata(self)

Fetches the analytics metadata for workspace scoped models.

Returns:

Type Description
Dict[str, Any]

The analytics metadata.

Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Fetches the analytics metadata for workspace scoped models.

    Returns:
        The analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    metadata["workspace_id"] = self.workspace.id
    return metadata

update_model(_cls)

Base update model.

This is used as a decorator on top of request models to convert them into update models where the fields are optional and can be set to None.

Parameters:

Name Type Description Default
_cls Type[~T]

The class to decorate

required

Returns:

Type Description
Type[~T]

The decorated class.

Source code in zenml/models/base_models.py
def update_model(_cls: Type[T]) -> Type[T]:
    """Base update model.

    This is used as a decorator on top of request models to convert them
    into update models where the fields are optional and can be set to None.

    Args:
        _cls: The class to decorate

    Returns:
        The decorated class.
    """
    for _, value in _cls.__fields__.items():
        value.required = False
        value.allow_none = True

    return _cls

code_repository_models

Models representing code repositories.

CodeReferenceBaseModel (BaseModel) pydantic-model

Base model for code references.

Source code in zenml/models/code_repository_models.py
class CodeReferenceBaseModel(BaseModel):
    """Base model for code references."""

    commit: str = Field(description="The commit of the code reference.")
    subdirectory: str = Field(
        description="The subdirectory of the code reference."
    )
commit: str pydantic-field required

The commit of the code reference.

subdirectory: str pydantic-field required

The subdirectory of the code reference.

CodeReferenceRequestModel (CodeReferenceBaseModel, BaseRequestModel) pydantic-model

Code reference request model.

Source code in zenml/models/code_repository_models.py
class CodeReferenceRequestModel(CodeReferenceBaseModel, BaseRequestModel):
    """Code reference request model."""

    code_repository: UUID = Field(
        description="The repository of the code reference."
    )
code_repository: UUID pydantic-field required

The repository of the code reference.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

CodeReferenceResponseModel (CodeReferenceBaseModel, BaseResponseModel) pydantic-model

Code reference response model.

Source code in zenml/models/code_repository_models.py
class CodeReferenceResponseModel(CodeReferenceBaseModel, BaseResponseModel):
    """Code reference response model."""

    code_repository: CodeRepositoryResponseModel = Field(
        description="The repository of the code reference."
    )
code_repository: CodeRepositoryResponseModel pydantic-field required

The repository of the code reference.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

CodeRepositoryBaseModel (BaseModel) pydantic-model

Base model for code repositories.

Source code in zenml/models/code_repository_models.py
class CodeRepositoryBaseModel(BaseModel):
    """Base model for code repositories."""

    name: str = Field(
        title="The name of the code repository.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    config: Dict[str, Any] = Field(
        description="Configuration for the code repository."
    )
    source: Source = Field(description="The code repository source.")
    logo_url: Optional[str] = Field(
        description="Optional URL of a logo (png, jpg or svg) for the code repository."
    )
    description: Optional[str] = Field(
        description="Code repository description.",
        max_length=TEXT_FIELD_MAX_LENGTH,
    )
config: Dict[str, Any] pydantic-field required

Configuration for the code repository.

description: ConstrainedStrValue pydantic-field

Code repository description.

logo_url: str pydantic-field

Optional URL of a logo (png, jpg or svg) for the code repository.

source: Source pydantic-field required

The code repository source.

CodeRepositoryFilterModel (WorkspaceScopedFilterModel) pydantic-model

Model to enable advanced filtering of all code repositories.

Source code in zenml/models/code_repository_models.py
class CodeRepositoryFilterModel(WorkspaceScopedFilterModel):
    """Model to enable advanced filtering of all code repositories."""

    name: Optional[str] = Field(
        description="Name of the code repository.",
    )
    workspace_id: Union[UUID, str, None] = Field(
        description="Workspace of the code repository."
    )
    user_id: Union[UUID, str, None] = Field(
        description="User that created the code repository."
    )
name: str pydantic-field

Name of the code repository.

user_id: Union[uuid.UUID, str] pydantic-field

User that created the code repository.

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace of the code repository.

CodeRepositoryRequestModel (CodeRepositoryBaseModel, WorkspaceScopedRequestModel) pydantic-model

Code repository request model.

Source code in zenml/models/code_repository_models.py
class CodeRepositoryRequestModel(
    CodeRepositoryBaseModel, WorkspaceScopedRequestModel
):
    """Code repository request model."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

CodeRepositoryResponseModel (CodeRepositoryBaseModel, WorkspaceScopedResponseModel) pydantic-model

Code repository response model.

Source code in zenml/models/code_repository_models.py
class CodeRepositoryResponseModel(
    CodeRepositoryBaseModel, WorkspaceScopedResponseModel
):
    """Code repository response model."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

CodeRepositoryUpdateModel (CodeRepositoryRequestModel) pydantic-model

Code repository update model.

Source code in zenml/models/code_repository_models.py
class CodeRepositoryUpdateModel(CodeRepositoryRequestModel):
    """Code repository update model."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

component_models

Models representing stack components.

ComponentBaseModel (BaseModel) pydantic-model

Base model for stack components.

Source code in zenml/models/component_models.py
class ComponentBaseModel(BaseModel):
    """Base model for stack components."""

    name: str = Field(
        title="The name of the stack component.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

    type: StackComponentType = Field(
        title="The type of the stack component.",
    )

    flavor: str = Field(
        title="The flavor of the stack component.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

    configuration: Dict[str, Any] = Field(
        title="The stack component configuration.",
    )

    connector_resource_id: Optional[str] = Field(
        default=None,
        description="The ID of a specific resource instance to "
        "gain access to through the connector",
    )

    labels: Optional[Dict[str, Any]] = Field(
        default=None,
        title="The stack component labels.",
    )
connector_resource_id: str pydantic-field

The ID of a specific resource instance to gain access to through the connector

ComponentFilterModel (ShareableWorkspaceScopedFilterModel) pydantic-model

Model to enable advanced filtering of all ComponentModels.

The Component Model needs additional scoping. As such the _scope_user field can be set to the user that is doing the filtering. The generate_filter() method of the baseclass is overwritten to include the scoping.

Source code in zenml/models/component_models.py
class ComponentFilterModel(ShareableWorkspaceScopedFilterModel):
    """Model to enable advanced filtering of all ComponentModels.

    The Component Model needs additional scoping. As such the `_scope_user`
    field can be set to the user that is doing the filtering. The
    `generate_filter()` method of the baseclass is overwritten to include the
    scoping.
    """

    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *ShareableWorkspaceScopedFilterModel.FILTER_EXCLUDE_FIELDS,
        "scope_type",
    ]
    CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *ShareableWorkspaceScopedFilterModel.CLI_EXCLUDE_FIELDS,
        "scope_type",
    ]
    scope_type: Optional[str] = Field(
        default=None,
        description="The type to scope this query to.",
    )

    is_shared: Optional[Union[bool, str]] = Field(
        default=None, description="If the stack is shared or private"
    )
    name: Optional[str] = Field(
        default=None,
        description="Name of the stack component",
    )
    flavor: Optional[str] = Field(
        default=None,
        description="Flavor of the stack component",
    )
    type: Optional[str] = Field(
        default=None,
        description="Type of the stack component",
    )
    workspace_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Workspace of the stack component"
    )
    user_id: Optional[Union[UUID, str]] = Field(
        default=None, description="User of the stack"
    )
    connector_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Connector linked to the stack component"
    )

    def set_scope_type(self, component_type: str) -> None:
        """Set the type of component on which to perform the filtering to scope the response.

        Args:
            component_type: The type of component to scope the query to.
        """
        self.scope_type = component_type

    def generate_filter(
        self, table: Type["SQLModel"]
    ) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
        """Generate the filter for the query.

        Stack components can be scoped by type to narrow the search.

        Args:
            table: The Table that is being queried from.

        Returns:
            The filter expression for the query.
        """
        from sqlalchemy import and_

        base_filter = super().generate_filter(table)
        if self.scope_type:
            type_filter = getattr(table, "type") == self.scope_type
            return and_(base_filter, type_filter)
        return base_filter
connector_id: Union[uuid.UUID, str] pydantic-field

Connector linked to the stack component

flavor: str pydantic-field

Flavor of the stack component

is_shared: Union[bool, str] pydantic-field

If the stack is shared or private

name: str pydantic-field

Name of the stack component

scope_type: str pydantic-field

The type to scope this query to.

type: str pydantic-field

Type of the stack component

user_id: Union[uuid.UUID, str] pydantic-field

User of the stack

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace of the stack component

generate_filter(self, table)

Generate the filter for the query.

Stack components can be scoped by type to narrow the search.

Parameters:

Name Type Description Default
table Type[SQLModel]

The Table that is being queried from.

required

Returns:

Type Description
Union[BinaryExpression[Any], BooleanClauseList[Any]]

The filter expression for the query.

Source code in zenml/models/component_models.py
def generate_filter(
    self, table: Type["SQLModel"]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
    """Generate the filter for the query.

    Stack components can be scoped by type to narrow the search.

    Args:
        table: The Table that is being queried from.

    Returns:
        The filter expression for the query.
    """
    from sqlalchemy import and_

    base_filter = super().generate_filter(table)
    if self.scope_type:
        type_filter = getattr(table, "type") == self.scope_type
        return and_(base_filter, type_filter)
    return base_filter
set_scope_type(self, component_type)

Set the type of component on which to perform the filtering to scope the response.

Parameters:

Name Type Description Default
component_type str

The type of component to scope the query to.

required
Source code in zenml/models/component_models.py
def set_scope_type(self, component_type: str) -> None:
    """Set the type of component on which to perform the filtering to scope the response.

    Args:
        component_type: The type of component to scope the query to.
    """
    self.scope_type = component_type

ComponentRequestModel (ComponentBaseModel, ShareableRequestModel) pydantic-model

Request model for stack components.

Source code in zenml/models/component_models.py
class ComponentRequestModel(ComponentBaseModel, ShareableRequestModel):
    """Request model for stack components."""

    ANALYTICS_FIELDS: ClassVar[List[str]] = ["type", "flavor"]

    connector: Optional[UUID] = Field(
        default=None,
        title="The service connector linked to this stack component.",
    )

    @validator("name")
    def name_cant_be_a_secret_reference(cls, name: str) -> str:
        """Validator to ensure that the given name is not a secret reference.

        Args:
            name: The name to validate.

        Returns:
            The name if it is not a secret reference.

        Raises:
            ValueError: If the name is a secret reference.
        """
        if secret_utils.is_secret_reference(name):
            raise ValueError(
                "Passing the `name` attribute of a stack component as a "
                "secret reference is not allowed."
            )
        return name
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

name_cant_be_a_secret_reference(name) classmethod

Validator to ensure that the given name is not a secret reference.

Parameters:

Name Type Description Default
name str

The name to validate.

required

Returns:

Type Description
str

The name if it is not a secret reference.

Exceptions:

Type Description
ValueError

If the name is a secret reference.

Source code in zenml/models/component_models.py
@validator("name")
def name_cant_be_a_secret_reference(cls, name: str) -> str:
    """Validator to ensure that the given name is not a secret reference.

    Args:
        name: The name to validate.

    Returns:
        The name if it is not a secret reference.

    Raises:
        ValueError: If the name is a secret reference.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )
    return name

ComponentResponseModel (ComponentBaseModel, ShareableResponseModel) pydantic-model

Response model for stack components.

Source code in zenml/models/component_models.py
class ComponentResponseModel(ComponentBaseModel, ShareableResponseModel):
    """Response model for stack components."""

    ANALYTICS_FIELDS: ClassVar[List[str]] = ["type", "flavor"]

    connector: Optional["ServiceConnectorResponseModel"] = Field(
        default=None,
        title="The service connector linked to this stack component.",
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

ComponentUpdateModel (ComponentRequestModel) pydantic-model

Update model for stack components.

Source code in zenml/models/component_models.py
class ComponentUpdateModel(ComponentRequestModel):
    """Update model for stack components."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

constants

Constants used by ZenML domain models.

filter_models

Base filter model definitions.

BaseFilterModel (BaseModel) pydantic-model

Class to unify all filter, paginate and sort request parameters.

This Model allows fine-grained filtering, sorting and pagination of resources.

Usage example for subclasses of this class:

ResourceListModel(
    name="contains:default",
    workspace="default"
    count_steps="gte:5"
    sort_by="created",
    page=2,
    size=50
)
Source code in zenml/models/filter_models.py
class BaseFilterModel(BaseModel):
    """Class to unify all filter, paginate and sort request parameters.

    This Model allows fine-grained filtering, sorting and pagination of
    resources.

    Usage example for subclasses of this class:
    ```
    ResourceListModel(
        name="contains:default",
        workspace="default"
        count_steps="gte:5"
        sort_by="created",
        page=2,
        size=50
    )
    ```
    """

    # List of fields that cannot be used as filters.
    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        "sort_by",
        "page",
        "size",
        "logical_operator",
    ]

    # List of fields that are not even mentioned as options in the CLI.
    CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = []

    sort_by: str = Field(
        default="created", description="Which column to sort by."
    )
    logical_operator: LogicalOperators = Field(
        default=LogicalOperators.AND,
        description="Which logical operator to use between all filters "
        "['and', 'or']",
    )
    page: int = Field(
        default=PAGINATION_STARTING_PAGE, ge=1, description="Page number"
    )
    size: int = Field(
        default=PAGE_SIZE_DEFAULT,
        ge=1,
        le=PAGE_SIZE_MAXIMUM,
        description="Page size",
    )

    id: Optional[Union[UUID, str]] = Field(
        default=None, description="Id for this resource"
    )
    created: Optional[Union[datetime, str]] = Field(
        default=None, description="Created"
    )
    updated: Optional[Union[datetime, str]] = Field(
        default=None, description="Updated"
    )

    @validator("sort_by", pre=True)
    def validate_sort_by(cls, v: str) -> str:
        """Validate that the sort_column is a valid column with a valid operand.

        Args:
            v: The sort_by field value.

        Returns:
            The validated sort_by field value.

        Raises:
            ValidationError: If the sort_by field is not a string.
            ValueError: If the resource can't be sorted by this field.
        """
        # Somehow pydantic allows you to pass in int values, which will be
        #  interpreted as string, however within the validator they are still
        #  integers, which don't have a .split() method
        if not isinstance(v, str):
            raise ValidationError(
                f"str type expected for the sort_by field. "
                f"Received a {type(v)}"
            )
        column = v
        split_value = v.split(":", 1)
        if len(split_value) == 2:
            column = split_value[1]

            if split_value[0] not in SorterOps.values():
                logger.warning(
                    "Invalid operand used for column sorting. "
                    "Only the following operands are supported `%s`. "
                    "Defaulting to 'asc' on column `%s`.",
                    SorterOps.values(),
                    column,
                )
                v = column

        if column in cls.FILTER_EXCLUDE_FIELDS:
            raise ValueError(
                f"This resource can not be sorted by this field: '{v}'"
            )
        elif column in cls.__fields__:
            return v
        else:
            raise ValueError(
                "You can only sort by valid fields of this resource"
            )

    @root_validator(pre=True)
    def filter_ops(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        """Parse incoming filters to ensure all filters are legal.

        Args:
            values: The values of the class.

        Returns:
            The values of the class.
        """
        cls._generate_filter_list(values)
        return values

    @property
    def list_of_filters(self) -> List[Filter]:
        """Converts the class variables into a list of usable Filter Models.

        Returns:
            A list of Filter models.
        """
        return self._generate_filter_list(
            {key: getattr(self, key) for key in self.__fields__}
        )

    @property
    def sorting_params(self) -> Tuple[str, SorterOps]:
        """Converts the class variables into a list of usable Filter Models.

        Returns:
            A tuple of the column to sort by and the sorting operand.
        """
        column = self.sort_by
        # The default sorting operand is asc
        operator = SorterOps.ASCENDING

        # Check if user explicitly set an operand
        split_value = self.sort_by.split(":", 1)
        if len(split_value) == 2:
            column = split_value[1]
            operator = SorterOps(split_value[0])

        return column, operator

    @classmethod
    def _generate_filter_list(cls, values: Dict[str, Any]) -> List[Filter]:
        """Create a list of filters from a (column, value) dictionary.

        Args:
            values: A dictionary of column names and values to filter on.

        Returns:
            A list of filters.
        """
        list_of_filters: List[Filter] = []

        for key, value in values.items():
            # Ignore excluded filters
            if key in cls.FILTER_EXCLUDE_FIELDS:
                continue

            # Skip filtering for None values
            if value is None:
                continue

            # Determine the operator and filter value
            value, operator = cls._resolve_operator(value)

            # Define the filter
            filter = cls._define_filter(
                column=key, value=value, operator=operator
            )
            list_of_filters.append(filter)

        return list_of_filters

    @staticmethod
    def _resolve_operator(value: Any) -> Tuple[Any, GenericFilterOps]:
        """Determine the operator and filter value from a user-provided value.

        If the user-provided value is a string of the form "operator:value",
        then the operator is extracted and the value is returned. Otherwise,
        `GenericFilterOps.EQUALS` is used as default operator and the value
        is returned as-is.

        Args:
            value: The user-provided value.

        Returns:
            A tuple of the filter value and the operator.
        """
        operator = GenericFilterOps.EQUALS  # Default operator
        if isinstance(value, str):
            split_value = value.split(":", 1)
            if (
                len(split_value) == 2
                and split_value[0] in GenericFilterOps.values()
            ):
                value = split_value[1]
                operator = GenericFilterOps(split_value[0])
        return value, operator

    @classmethod
    def _define_filter(
        cls, column: str, value: Any, operator: GenericFilterOps
    ) -> Filter:
        """Define a filter for a given column.

        Args:
            column: The column to filter on.
            value: The value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.
        """
        # Create datetime filters
        if cls.is_datetime_field(column):
            return cls._define_datetime_filter(
                column=column,
                value=value,
                operator=operator,
            )

        # Create UUID filters
        if cls.is_uuid_field(column):
            return cls._define_uuid_filter(
                column=column,
                value=value,
                operator=operator,
            )

        # Create int filters
        if cls.is_int_field(column):
            return NumericFilter(
                operation=GenericFilterOps(operator),
                column=column,
                value=int(value),
            )

        # Create bool filters
        if cls.is_bool_field(column):
            return cls._define_bool_filter(
                column=column,
                value=value,
                operator=operator,
            )

        # Create str filters
        if cls.is_str_field(column):
            return StrFilter(
                operation=GenericFilterOps(operator),
                column=column,
                value=value,
            )

        # Handle unsupported datatypes
        logger.warning(
            f"The Datatype {cls.__fields__[column].type_} might not be "
            "supported for filtering. Defaulting to a string filter."
        )
        return StrFilter(
            operation=GenericFilterOps(operator),
            column=column,
            value=str(value),
        )

    @classmethod
    def is_datetime_field(cls, k: str) -> bool:
        """Checks if it's a datetime field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a datetime field, False otherwise.
        """
        return (
            issubclass(datetime, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is datetime
        )

    @classmethod
    def is_uuid_field(cls, k: str) -> bool:
        """Checks if it's a uuid field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a uuid field, False otherwise.
        """
        return (
            issubclass(UUID, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is UUID
        )

    @classmethod
    def is_int_field(cls, k: str) -> bool:
        """Checks if it's a int field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a int field, False otherwise.
        """
        return (
            issubclass(int, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is int
        )

    @classmethod
    def is_bool_field(cls, k: str) -> bool:
        """Checks if it's a bool field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a bool field, False otherwise.
        """
        return (
            issubclass(bool, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is bool
        )

    @classmethod
    def is_str_field(cls, k: str) -> bool:
        """Checks if it's a string field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a string field, False otherwise.
        """
        return (
            issubclass(str, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is str
        )

    @classmethod
    def is_sort_by_field(cls, k: str) -> bool:
        """Checks if it's a sort by field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a sort by field, False otherwise.
        """
        return (
            issubclass(str, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ == str
        ) and k == "sort_by"

    @staticmethod
    def _define_datetime_filter(
        column: str, value: Any, operator: GenericFilterOps
    ) -> NumericFilter:
        """Define a datetime filter for a given column.

        Args:
            column: The column to filter on.
            value: The datetime value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.

        Raises:
            ValueError: If the value is not a valid datetime.
        """
        try:
            if isinstance(value, datetime):
                datetime_value = value
            else:
                datetime_value = datetime.strptime(
                    value, FILTERING_DATETIME_FORMAT
                )
        except ValueError as e:
            raise ValueError(
                "The datetime filter only works with values in the following "
                f"format: {FILTERING_DATETIME_FORMAT}"
            ) from e
        datetime_filter = NumericFilter(
            operation=GenericFilterOps(operator),
            column=column,
            value=datetime_value,
        )
        return datetime_filter

    @staticmethod
    def _define_uuid_filter(
        column: str, value: Any, operator: GenericFilterOps
    ) -> UUIDFilter:
        """Define a UUID filter for a given column.

        Args:
            column: The column to filter on.
            value: The UUID value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.

        Raises:
            ValueError: If the value is not a valid UUID.
        """
        # For equality checks, ensure that the value is a valid UUID.
        if operator == GenericFilterOps.EQUALS and not isinstance(value, UUID):
            try:
                UUID(value)
            except ValueError as e:
                raise ValueError(
                    "Invalid value passed as UUID query parameter."
                ) from e

        # Cast the value to string for further comparisons.
        value = str(value)

        # Generate the filter.
        uuid_filter = UUIDFilter(
            operation=GenericFilterOps(operator),
            column=column,
            value=value,
        )
        return uuid_filter

    @staticmethod
    def _define_bool_filter(
        column: str, value: Any, operator: GenericFilterOps
    ) -> BoolFilter:
        """Define a bool filter for a given column.

        Args:
            column: The column to filter on.
            value: The bool value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.
        """
        if GenericFilterOps(operator) != GenericFilterOps.EQUALS:
            logger.warning(
                "Boolean filters do not support any"
                "operation except for equals. Defaulting"
                "to an `equals` comparison."
            )
        return BoolFilter(
            operation=GenericFilterOps.EQUALS,
            column=column,
            value=bool(value),
        )

    @property
    def offset(self) -> int:
        """Returns the offset needed for the query on the data persistence layer.

        Returns:
            The offset for the query.
        """
        return self.size * (self.page - 1)

    def generate_filter(
        self, table: Type[SQLModel]
    ) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
        """Generate the filter for the query.

        Args:
            table: The Table that is being queried from.

        Returns:
            The filter expression for the query.

        Raises:
            RuntimeError: If a valid logical operator is not supplied.
        """
        from sqlalchemy import and_
        from sqlmodel import or_

        filters = []
        for column_filter in self.list_of_filters:
            filters.append(
                column_filter.generate_query_conditions(table=table)
            )
        if self.logical_operator == LogicalOperators.OR:
            return or_(False, *filters)
        elif self.logical_operator == LogicalOperators.AND:
            return and_(True, *filters)
        else:
            raise RuntimeError("No valid logical operator was supplied.")

    def apply_filter(
        self,
        query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
        table: Type["AnySchema"],
    ) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
        """Applies the filter to a query.

        Args:
            query: The query to which to apply the filter.
            table: The query table.

        Returns:
            The query with filter applied.
        """
        filters = self.generate_filter(table=table)

        if filters is not None:
            query = query.where(filters)

        return query
created: Union[datetime.datetime, str] pydantic-field

Created

id: Union[uuid.UUID, str] pydantic-field

Id for this resource

list_of_filters: List[zenml.models.filter_models.Filter] property readonly

Converts the class variables into a list of usable Filter Models.

Returns:

Type Description
List[zenml.models.filter_models.Filter]

A list of Filter models.

logical_operator: LogicalOperators pydantic-field

Which logical operator to use between all filters ['and', 'or']

offset: int property readonly

Returns the offset needed for the query on the data persistence layer.

Returns:

Type Description
int

The offset for the query.

page: ConstrainedIntValue pydantic-field

Page number

size: ConstrainedIntValue pydantic-field

Page size

sort_by: str pydantic-field

Which column to sort by.

sorting_params: Tuple[str, zenml.enums.SorterOps] property readonly

Converts the class variables into a list of usable Filter Models.

Returns:

Type Description
Tuple[str, zenml.enums.SorterOps]

A tuple of the column to sort by and the sorting operand.

updated: Union[datetime.datetime, str] pydantic-field

Updated

apply_filter(self, query, table)

Applies the filter to a query.

Parameters:

Name Type Description Default
query Union[Select[AnySchema], SelectOfScalar[AnySchema]]

The query to which to apply the filter.

required
table Type[AnySchema]

The query table.

required

Returns:

Type Description
Union[Select[AnySchema], SelectOfScalar[AnySchema]]

The query with filter applied.

Source code in zenml/models/filter_models.py
def apply_filter(
    self,
    query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
    table: Type["AnySchema"],
) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
    """Applies the filter to a query.

    Args:
        query: The query to which to apply the filter.
        table: The query table.

    Returns:
        The query with filter applied.
    """
    filters = self.generate_filter(table=table)

    if filters is not None:
        query = query.where(filters)

    return query
filter_ops(values) classmethod

Parse incoming filters to ensure all filters are legal.

Parameters:

Name Type Description Default
values Dict[str, Any]

The values of the class.

required

Returns:

Type Description
Dict[str, Any]

The values of the class.

Source code in zenml/models/filter_models.py
@root_validator(pre=True)
def filter_ops(cls, values: Dict[str, Any]) -> Dict[str, Any]:
    """Parse incoming filters to ensure all filters are legal.

    Args:
        values: The values of the class.

    Returns:
        The values of the class.
    """
    cls._generate_filter_list(values)
    return values
generate_filter(self, table)

Generate the filter for the query.

Parameters:

Name Type Description Default
table Type[sqlmodel.main.SQLModel]

The Table that is being queried from.

required

Returns:

Type Description
Union[BinaryExpression[Any], BooleanClauseList[Any]]

The filter expression for the query.

Exceptions:

Type Description
RuntimeError

If a valid logical operator is not supplied.

Source code in zenml/models/filter_models.py
def generate_filter(
    self, table: Type[SQLModel]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
    """Generate the filter for the query.

    Args:
        table: The Table that is being queried from.

    Returns:
        The filter expression for the query.

    Raises:
        RuntimeError: If a valid logical operator is not supplied.
    """
    from sqlalchemy import and_
    from sqlmodel import or_

    filters = []
    for column_filter in self.list_of_filters:
        filters.append(
            column_filter.generate_query_conditions(table=table)
        )
    if self.logical_operator == LogicalOperators.OR:
        return or_(False, *filters)
    elif self.logical_operator == LogicalOperators.AND:
        return and_(True, *filters)
    else:
        raise RuntimeError("No valid logical operator was supplied.")
is_bool_field(k) classmethod

Checks if it's a bool field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a bool field, False otherwise.

Source code in zenml/models/filter_models.py
@classmethod
def is_bool_field(cls, k: str) -> bool:
    """Checks if it's a bool field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a bool field, False otherwise.
    """
    return (
        issubclass(bool, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is bool
    )
is_datetime_field(k) classmethod

Checks if it's a datetime field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a datetime field, False otherwise.

Source code in zenml/models/filter_models.py
@classmethod
def is_datetime_field(cls, k: str) -> bool:
    """Checks if it's a datetime field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a datetime field, False otherwise.
    """
    return (
        issubclass(datetime, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is datetime
    )
is_int_field(k) classmethod

Checks if it's a int field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a int field, False otherwise.

Source code in zenml/models/filter_models.py
@classmethod
def is_int_field(cls, k: str) -> bool:
    """Checks if it's a int field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a int field, False otherwise.
    """
    return (
        issubclass(int, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is int
    )
is_sort_by_field(k) classmethod

Checks if it's a sort by field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a sort by field, False otherwise.

Source code in zenml/models/filter_models.py
@classmethod
def is_sort_by_field(cls, k: str) -> bool:
    """Checks if it's a sort by field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a sort by field, False otherwise.
    """
    return (
        issubclass(str, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ == str
    ) and k == "sort_by"
is_str_field(k) classmethod

Checks if it's a string field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a string field, False otherwise.

Source code in zenml/models/filter_models.py
@classmethod
def is_str_field(cls, k: str) -> bool:
    """Checks if it's a string field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a string field, False otherwise.
    """
    return (
        issubclass(str, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is str
    )
is_uuid_field(k) classmethod

Checks if it's a uuid field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a uuid field, False otherwise.

Source code in zenml/models/filter_models.py
@classmethod
def is_uuid_field(cls, k: str) -> bool:
    """Checks if it's a uuid field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a uuid field, False otherwise.
    """
    return (
        issubclass(UUID, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is UUID
    )
validate_sort_by(v) classmethod

Validate that the sort_column is a valid column with a valid operand.

Parameters:

Name Type Description Default
v str

The sort_by field value.

required

Returns:

Type Description
str

The validated sort_by field value.

Exceptions:

Type Description
ValidationError

If the sort_by field is not a string.

ValueError

If the resource can't be sorted by this field.

Source code in zenml/models/filter_models.py
@validator("sort_by", pre=True)
def validate_sort_by(cls, v: str) -> str:
    """Validate that the sort_column is a valid column with a valid operand.

    Args:
        v: The sort_by field value.

    Returns:
        The validated sort_by field value.

    Raises:
        ValidationError: If the sort_by field is not a string.
        ValueError: If the resource can't be sorted by this field.
    """
    # Somehow pydantic allows you to pass in int values, which will be
    #  interpreted as string, however within the validator they are still
    #  integers, which don't have a .split() method
    if not isinstance(v, str):
        raise ValidationError(
            f"str type expected for the sort_by field. "
            f"Received a {type(v)}"
        )
    column = v
    split_value = v.split(":", 1)
    if len(split_value) == 2:
        column = split_value[1]

        if split_value[0] not in SorterOps.values():
            logger.warning(
                "Invalid operand used for column sorting. "
                "Only the following operands are supported `%s`. "
                "Defaulting to 'asc' on column `%s`.",
                SorterOps.values(),
                column,
            )
            v = column

    if column in cls.FILTER_EXCLUDE_FIELDS:
        raise ValueError(
            f"This resource can not be sorted by this field: '{v}'"
        )
    elif column in cls.__fields__:
        return v
    else:
        raise ValueError(
            "You can only sort by valid fields of this resource"
        )

BoolFilter (Filter) pydantic-model

Filter for all Boolean fields.

Source code in zenml/models/filter_models.py
class BoolFilter(Filter):
    """Filter for all Boolean fields."""

    ALLOWED_OPS: ClassVar[List[str]] = [GenericFilterOps.EQUALS]

    def generate_query_conditions_from_column(self, column: Any) -> Any:
        """Generate query conditions for a boolean column.

        Args:
            column: The boolean column of an SQLModel table on which to filter.

        Returns:
            A list of query conditions.
        """
        return column == self.value
generate_query_conditions_from_column(self, column)

Generate query conditions for a boolean column.

Parameters:

Name Type Description Default
column Any

The boolean column of an SQLModel table on which to filter.

required

Returns:

Type Description
Any

A list of query conditions.

Source code in zenml/models/filter_models.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
    """Generate query conditions for a boolean column.

    Args:
        column: The boolean column of an SQLModel table on which to filter.

    Returns:
        A list of query conditions.
    """
    return column == self.value

Filter (BaseModel, ABC) pydantic-model

Filter for all fields.

A Filter is a combination of a column, a value that the user uses to filter on this column and an operation to use. The easiest example would be user equals aria with column=user, value=aria and the operation=equals.

All subclasses of this class will support different sets of operations. This operation set is defined in the ALLOWED_OPS class variable.

Source code in zenml/models/filter_models.py
class Filter(BaseModel, ABC):
    """Filter for all fields.

    A Filter is a combination of a column, a value that the user uses to
    filter on this column and an operation to use. The easiest example
    would be `user equals aria` with column=`user`, value=`aria` and the
    operation=`equals`.

    All subclasses of this class will support different sets of operations.
    This operation set is defined in the ALLOWED_OPS class variable.
    """

    ALLOWED_OPS: ClassVar[List[str]] = []

    operation: GenericFilterOps
    column: str
    value: Any

    @validator("operation", pre=True)
    def validate_operation(cls, op: str) -> str:
        """Validate that the operation is a valid op for the field type.

        Args:
            op: The operation of this filter.

        Returns:
            The operation if it is valid.

        Raises:
            ValueError: If the operation is not valid for this field type.
        """
        if op not in cls.ALLOWED_OPS:
            raise ValueError(
                f"This datatype can not be filtered using this operation: "
                f"'{op}'. The allowed operations are: {cls.ALLOWED_OPS}"
            )
        else:
            return op

    def generate_query_conditions(
        self,
        table: Type[SQLModel],
    ) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
        """Generate the query conditions for the database.

        This method converts the Filter class into an appropriate SQLModel
        query condition, to be used when filtering on the Database.

        Args:
            table: The SQLModel table to use for the query creation

        Returns:
            A list of conditions that will be combined using the `and` operation
        """
        column = getattr(table, self.column)
        conditions = self.generate_query_conditions_from_column(column)
        return conditions  # type:ignore[no-any-return]

    @abstractmethod
    def generate_query_conditions_from_column(self, column: Any) -> Any:
        """Generate query conditions given the corresponding database column.

        This method should be overridden by subclasses to define how each
        supported operation in `self.ALLOWED_OPS` can be used to filter the
        given column by `self.value`.

        Args:
            column: The column of an SQLModel table on which to filter.

        Returns:
            A list of query conditions.
        """
generate_query_conditions(self, table)

Generate the query conditions for the database.

This method converts the Filter class into an appropriate SQLModel query condition, to be used when filtering on the Database.

Parameters:

Name Type Description Default
table Type[sqlmodel.main.SQLModel]

The SQLModel table to use for the query creation

required

Returns:

Type Description
Union[BinaryExpression[Any], BooleanClauseList[Any]]

A list of conditions that will be combined using the and operation

Source code in zenml/models/filter_models.py
def generate_query_conditions(
    self,
    table: Type[SQLModel],
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
    """Generate the query conditions for the database.

    This method converts the Filter class into an appropriate SQLModel
    query condition, to be used when filtering on the Database.

    Args:
        table: The SQLModel table to use for the query creation

    Returns:
        A list of conditions that will be combined using the `and` operation
    """
    column = getattr(table, self.column)
    conditions = self.generate_query_conditions_from_column(column)
    return conditions  # type:ignore[no-any-return]
generate_query_conditions_from_column(self, column)

Generate query conditions given the corresponding database column.

This method should be overridden by subclasses to define how each supported operation in self.ALLOWED_OPS can be used to filter the given column by self.value.

Parameters:

Name Type Description Default
column Any

The column of an SQLModel table on which to filter.

required

Returns:

Type Description
Any

A list of query conditions.

Source code in zenml/models/filter_models.py
@abstractmethod
def generate_query_conditions_from_column(self, column: Any) -> Any:
    """Generate query conditions given the corresponding database column.

    This method should be overridden by subclasses to define how each
    supported operation in `self.ALLOWED_OPS` can be used to filter the
    given column by `self.value`.

    Args:
        column: The column of an SQLModel table on which to filter.

    Returns:
        A list of query conditions.
    """
validate_operation(op) classmethod

Validate that the operation is a valid op for the field type.

Parameters:

Name Type Description Default
op str

The operation of this filter.

required

Returns:

Type Description
str

The operation if it is valid.

Exceptions:

Type Description
ValueError

If the operation is not valid for this field type.

Source code in zenml/models/filter_models.py
@validator("operation", pre=True)
def validate_operation(cls, op: str) -> str:
    """Validate that the operation is a valid op for the field type.

    Args:
        op: The operation of this filter.

    Returns:
        The operation if it is valid.

    Raises:
        ValueError: If the operation is not valid for this field type.
    """
    if op not in cls.ALLOWED_OPS:
        raise ValueError(
            f"This datatype can not be filtered using this operation: "
            f"'{op}'. The allowed operations are: {cls.ALLOWED_OPS}"
        )
    else:
        return op

NumericFilter (Filter) pydantic-model

Filter for all numeric fields.

Source code in zenml/models/filter_models.py
class NumericFilter(Filter):
    """Filter for all numeric fields."""

    value: Union[float, datetime]

    ALLOWED_OPS: ClassVar[List[str]] = [
        GenericFilterOps.EQUALS,
        GenericFilterOps.GT,
        GenericFilterOps.GTE,
        GenericFilterOps.LT,
        GenericFilterOps.LTE,
    ]

    def generate_query_conditions_from_column(self, column: Any) -> Any:
        """Generate query conditions for a UUID column.

        Args:
            column: The UUID column of an SQLModel table on which to filter.

        Returns:
            A list of query conditions.
        """
        if self.operation == GenericFilterOps.GTE:
            return column >= self.value
        if self.operation == GenericFilterOps.GT:
            return column > self.value
        if self.operation == GenericFilterOps.LTE:
            return column <= self.value
        if self.operation == GenericFilterOps.LT:
            return column < self.value
        return column == self.value
generate_query_conditions_from_column(self, column)

Generate query conditions for a UUID column.

Parameters:

Name Type Description Default
column Any

The UUID column of an SQLModel table on which to filter.

required

Returns:

Type Description
Any

A list of query conditions.

Source code in zenml/models/filter_models.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
    """Generate query conditions for a UUID column.

    Args:
        column: The UUID column of an SQLModel table on which to filter.

    Returns:
        A list of query conditions.
    """
    if self.operation == GenericFilterOps.GTE:
        return column >= self.value
    if self.operation == GenericFilterOps.GT:
        return column > self.value
    if self.operation == GenericFilterOps.LTE:
        return column <= self.value
    if self.operation == GenericFilterOps.LT:
        return column < self.value
    return column == self.value

ShareableWorkspaceScopedFilterModel (WorkspaceScopedFilterModel) pydantic-model

Model to enable advanced scoping with workspace and user scoped shareable things.

Source code in zenml/models/filter_models.py
class ShareableWorkspaceScopedFilterModel(WorkspaceScopedFilterModel):
    """Model to enable advanced scoping with workspace and user scoped shareable things."""

    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *WorkspaceScopedFilterModel.FILTER_EXCLUDE_FIELDS,
        "scope_user",
    ]
    CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *WorkspaceScopedFilterModel.CLI_EXCLUDE_FIELDS,
        "scope_user",
    ]
    scope_user: Optional[UUID] = Field(
        default=None,
        description="The user to scope this query to.",
    )

    def set_scope_user(self, user_id: UUID) -> None:
        """Set the user that is performing the filtering to scope the response.

        Args:
            user_id: The user ID to scope the response to.
        """
        self.scope_user = user_id

    def apply_filter(
        self,
        query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
        table: Type["AnySchema"],
    ) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
        """Applies the filter to a query.

        Args:
            query: The query to which to apply the filter.
            table: The query table.

        Returns:
            The query with filter applied.
        """
        from sqlmodel import or_

        query = super().apply_filter(query=query, table=table)

        if self.scope_user:
            scope_filter = or_(
                getattr(table, "user_id") == self.scope_user,
                getattr(table, "is_shared").is_(True),
            )
            query = query.where(scope_filter)

        return query
scope_user: UUID pydantic-field

The user to scope this query to.

apply_filter(self, query, table)

Applies the filter to a query.

Parameters:

Name Type Description Default
query Union[Select[AnySchema], SelectOfScalar[AnySchema]]

The query to which to apply the filter.

required
table Type[AnySchema]

The query table.

required

Returns:

Type Description
Union[Select[AnySchema], SelectOfScalar[AnySchema]]

The query with filter applied.

Source code in zenml/models/filter_models.py
def apply_filter(
    self,
    query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
    table: Type["AnySchema"],
) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
    """Applies the filter to a query.

    Args:
        query: The query to which to apply the filter.
        table: The query table.

    Returns:
        The query with filter applied.
    """
    from sqlmodel import or_

    query = super().apply_filter(query=query, table=table)

    if self.scope_user:
        scope_filter = or_(
            getattr(table, "user_id") == self.scope_user,
            getattr(table, "is_shared").is_(True),
        )
        query = query.where(scope_filter)

    return query
set_scope_user(self, user_id)

Set the user that is performing the filtering to scope the response.

Parameters:

Name Type Description Default
user_id UUID

The user ID to scope the response to.

required
Source code in zenml/models/filter_models.py
def set_scope_user(self, user_id: UUID) -> None:
    """Set the user that is performing the filtering to scope the response.

    Args:
        user_id: The user ID to scope the response to.
    """
    self.scope_user = user_id

StrFilter (Filter) pydantic-model

Filter for all string fields.

Source code in zenml/models/filter_models.py
class StrFilter(Filter):
    """Filter for all string fields."""

    ALLOWED_OPS: ClassVar[List[str]] = [
        GenericFilterOps.EQUALS,
        GenericFilterOps.STARTSWITH,
        GenericFilterOps.CONTAINS,
        GenericFilterOps.ENDSWITH,
    ]

    def generate_query_conditions_from_column(self, column: Any) -> Any:
        """Generate query conditions for a string column.

        Args:
            column: The string column of an SQLModel table on which to filter.

        Returns:
            A list of query conditions.
        """
        if self.operation == GenericFilterOps.CONTAINS:
            return column.like(f"%{self.value}%")
        if self.operation == GenericFilterOps.STARTSWITH:
            return column.startswith(f"{self.value}")
        if self.operation == GenericFilterOps.ENDSWITH:
            return column.endswith(f"{self.value}")
        return column == self.value
generate_query_conditions_from_column(self, column)

Generate query conditions for a string column.

Parameters:

Name Type Description Default
column Any

The string column of an SQLModel table on which to filter.

required

Returns:

Type Description
Any

A list of query conditions.

Source code in zenml/models/filter_models.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
    """Generate query conditions for a string column.

    Args:
        column: The string column of an SQLModel table on which to filter.

    Returns:
        A list of query conditions.
    """
    if self.operation == GenericFilterOps.CONTAINS:
        return column.like(f"%{self.value}%")
    if self.operation == GenericFilterOps.STARTSWITH:
        return column.startswith(f"{self.value}")
    if self.operation == GenericFilterOps.ENDSWITH:
        return column.endswith(f"{self.value}")
    return column == self.value

UUIDFilter (StrFilter) pydantic-model

Filter for all uuid fields which are mostly treated like strings.

Source code in zenml/models/filter_models.py
class UUIDFilter(StrFilter):
    """Filter for all uuid fields which are mostly treated like strings."""

    def generate_query_conditions_from_column(self, column: Any) -> Any:
        """Generate query conditions for a UUID column.

        Args:
            column: The UUID column of an SQLModel table on which to filter.

        Returns:
            A list of query conditions.
        """
        import sqlalchemy
        from sqlalchemy_utils.functions import cast_if

        # For equality checks, compare the UUID directly
        if self.operation == GenericFilterOps.EQUALS:
            return column == self.value

        # For all other operations, cast and handle the column as string
        return super().generate_query_conditions_from_column(
            column=cast_if(column, sqlalchemy.String)
        )
generate_query_conditions_from_column(self, column)

Generate query conditions for a UUID column.

Parameters:

Name Type Description Default
column Any

The UUID column of an SQLModel table on which to filter.

required

Returns:

Type Description
Any

A list of query conditions.

Source code in zenml/models/filter_models.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
    """Generate query conditions for a UUID column.

    Args:
        column: The UUID column of an SQLModel table on which to filter.

    Returns:
        A list of query conditions.
    """
    import sqlalchemy
    from sqlalchemy_utils.functions import cast_if

    # For equality checks, compare the UUID directly
    if self.operation == GenericFilterOps.EQUALS:
        return column == self.value

    # For all other operations, cast and handle the column as string
    return super().generate_query_conditions_from_column(
        column=cast_if(column, sqlalchemy.String)
    )

WorkspaceScopedFilterModel (BaseFilterModel) pydantic-model

Model to enable advanced scoping with workspace.

Source code in zenml/models/filter_models.py
class WorkspaceScopedFilterModel(BaseFilterModel):
    """Model to enable advanced scoping with workspace."""

    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *BaseFilterModel.FILTER_EXCLUDE_FIELDS,
        "scope_workspace",
    ]
    CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *BaseFilterModel.CLI_EXCLUDE_FIELDS,
        "scope_workspace",
    ]
    scope_workspace: Optional[UUID] = Field(
        default=None,
        description="The workspace to scope this query to.",
    )

    def set_scope_workspace(self, workspace_id: UUID) -> None:
        """Set the workspace to scope this response.

        Args:
            workspace_id: The workspace to scope this response to.
        """
        self.scope_workspace = workspace_id

    def apply_filter(
        self,
        query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
        table: Type["AnySchema"],
    ) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
        """Applies the filter to a query.

        Args:
            query: The query to which to apply the filter.
            table: The query table.

        Returns:
            The query with filter applied.
        """
        from sqlmodel import or_

        query = super().apply_filter(query=query, table=table)

        if self.scope_workspace:
            scope_filter = or_(
                getattr(table, "workspace_id") == self.scope_workspace,
                getattr(table, "workspace_id").is_(None),
            )
            query = query.where(scope_filter)

        return query
scope_workspace: UUID pydantic-field

The workspace to scope this query to.

apply_filter(self, query, table)

Applies the filter to a query.

Parameters:

Name Type Description Default
query Union[Select[AnySchema], SelectOfScalar[AnySchema]]

The query to which to apply the filter.

required
table Type[AnySchema]

The query table.

required

Returns:

Type Description
Union[Select[AnySchema], SelectOfScalar[AnySchema]]

The query with filter applied.

Source code in zenml/models/filter_models.py
def apply_filter(
    self,
    query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
    table: Type["AnySchema"],
) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
    """Applies the filter to a query.

    Args:
        query: The query to which to apply the filter.
        table: The query table.

    Returns:
        The query with filter applied.
    """
    from sqlmodel import or_

    query = super().apply_filter(query=query, table=table)

    if self.scope_workspace:
        scope_filter = or_(
            getattr(table, "workspace_id") == self.scope_workspace,
            getattr(table, "workspace_id").is_(None),
        )
        query = query.where(scope_filter)

    return query
set_scope_workspace(self, workspace_id)

Set the workspace to scope this response.

Parameters:

Name Type Description Default
workspace_id UUID

The workspace to scope this response to.

required
Source code in zenml/models/filter_models.py
def set_scope_workspace(self, workspace_id: UUID) -> None:
    """Set the workspace to scope this response.

    Args:
        workspace_id: The workspace to scope this response to.
    """
    self.scope_workspace = workspace_id

flavor_models

Models representing stack component flavors.

FlavorBaseModel (BaseModel) pydantic-model

Base model for stack component flavors.

Source code in zenml/models/flavor_models.py
class FlavorBaseModel(BaseModel):
    """Base model for stack component flavors."""

    name: str = Field(
        title="The name of the Flavor.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    type: StackComponentType = Field(title="The type of the Flavor.")
    config_schema: Dict[str, Any] = Field(
        title="The JSON schema of this flavor's corresponding configuration.",
    )
    connector_type: Optional[str] = Field(
        default=None,
        title="The type of the connector that this flavor uses.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    connector_resource_type: Optional[str] = Field(
        default=None,
        title="The resource type of the connector that this flavor uses.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    connector_resource_id_attr: Optional[str] = Field(
        default=None,
        title="The name of an attribute in the stack component configuration "
        "that plays the role of resource ID when linked to a service connector.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    source: str = Field(
        title="The path to the module which contains this Flavor.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    integration: Optional[str] = Field(
        title="The name of the integration that the Flavor belongs to.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    logo_url: Optional[str] = Field(
        default=None,
        title="Optionally, a url pointing to a png,"
        "svg or jpg can be attached.",
    )
    docs_url: Optional[str] = Field(
        default=None,
        title="Optionally, a url pointing to docs, within docs.zenml.io.",
    )
    sdk_docs_url: Optional[str] = Field(
        default=None,
        title="Optionally, a url pointing to SDK docs,"
        "within sdkdocs.zenml.io.",
    )
    is_custom: bool = Field(
        title="Whether or not this flavor is a custom, user created flavor.",
        default=True,
    )

    @property
    def connector_requirements(self) -> Optional[ServiceConnectorRequirements]:
        """Returns the connector requirements for the flavor.

        Returns:
            The connector requirements for the flavor.
        """
        if not self.connector_resource_type:
            return None

        return ServiceConnectorRequirements(
            connector_type=self.connector_type,
            resource_type=self.connector_resource_type,
            resource_id_attr=self.connector_resource_id_attr,
        )
connector_requirements: Optional[zenml.models.service_connector_models.ServiceConnectorRequirements] property readonly

Returns the connector requirements for the flavor.

Returns:

Type Description
Optional[zenml.models.service_connector_models.ServiceConnectorRequirements]

The connector requirements for the flavor.

FlavorFilterModel (WorkspaceScopedFilterModel) pydantic-model

Model to enable advanced filtering of all Flavors.

Source code in zenml/models/flavor_models.py
class FlavorFilterModel(WorkspaceScopedFilterModel):
    """Model to enable advanced filtering of all Flavors."""

    name: Optional[str] = Field(
        default=None,
        description="Name of the flavor",
    )
    type: Optional[str] = Field(
        default=None,
        description="Stack Component Type of the stack flavor",
    )
    integration: Optional[str] = Field(
        default=None,
        description="Integration associated with the flavor",
    )
    workspace_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Workspace of the stack"
    )
    user_id: Optional[Union[UUID, str]] = Field(
        default=None, description="User of the stack"
    )
integration: str pydantic-field

Integration associated with the flavor

name: str pydantic-field

Name of the flavor

type: str pydantic-field

Stack Component Type of the stack flavor

user_id: Union[uuid.UUID, str] pydantic-field

User of the stack

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace of the stack

FlavorRequestModel (FlavorBaseModel, BaseRequestModel) pydantic-model

Request model for stack component flavors.

Source code in zenml/models/flavor_models.py
class FlavorRequestModel(FlavorBaseModel, BaseRequestModel):
    """Request model for stack component flavors."""

    ANALYTICS_FIELDS: ClassVar[List[str]] = [
        "type",
        "integration",
    ]

    user: Optional[UUID] = Field(
        default=None, title="The id of the user that created this resource."
    )

    workspace: Optional[UUID] = Field(
        default=None, title="The workspace to which this resource belongs."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

FlavorResponseModel (FlavorBaseModel, BaseResponseModel) pydantic-model

Response model for stack component flavors.

Source code in zenml/models/flavor_models.py
class FlavorResponseModel(FlavorBaseModel, BaseResponseModel):
    """Response model for stack component flavors."""

    ANALYTICS_FIELDS: ClassVar[List[str]] = [
        "id",
        "type",
        "integration",
    ]

    user: Union["UserResponseModel", None] = Field(
        title="The user that created this resource.", nullable=True
    )

    workspace: Optional["WorkspaceResponseModel"] = Field(
        title="The project of this resource."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

FlavorUpdateModel (FlavorRequestModel) pydantic-model

Update model for flavors.

Source code in zenml/models/flavor_models.py
class FlavorUpdateModel(FlavorRequestModel):
    """Update model for flavors."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

hub_plugin_models

Models representing ZenML Hub plugins.

HubPluginBaseModel (BaseModel) pydantic-model

Base model for a ZenML Hub plugin.

Source code in zenml/models/hub_plugin_models.py
class HubPluginBaseModel(BaseModel):
    """Base model for a ZenML Hub plugin."""

    name: str
    description: Optional[str]
    version: Optional[str]
    release_notes: Optional[str]
    repository_url: str
    repository_subdirectory: Optional[str]
    repository_branch: Optional[str]
    repository_commit: Optional[str]
    tags: Optional[List[str]]
    logo_url: Optional[str]

HubPluginRequestModel (HubPluginBaseModel) pydantic-model

Request model for a ZenML Hub plugin.

Source code in zenml/models/hub_plugin_models.py
class HubPluginRequestModel(HubPluginBaseModel):
    """Request model for a ZenML Hub plugin."""

HubPluginResponseModel (HubPluginBaseModel) pydantic-model

Response model for a ZenML Hub plugin.

Source code in zenml/models/hub_plugin_models.py
class HubPluginResponseModel(HubPluginBaseModel):
    """Response model for a ZenML Hub plugin."""

    id: UUID
    status: PluginStatus
    author: str
    version: str
    index_url: Optional[str]
    package_name: Optional[str]
    requirements: Optional[List[str]]
    build_logs: Optional[str]
    created: datetime
    updated: datetime

HubUserResponseModel (BaseModel) pydantic-model

Model for a ZenML Hub user.

Source code in zenml/models/hub_plugin_models.py
class HubUserResponseModel(BaseModel):
    """Model for a ZenML Hub user."""

    id: UUID
    email: str
    username: Optional[str]

PluginStatus (StrEnum)

Enum that represents the status of a plugin.

  • PENDING: Plugin is being built
  • FAILED: Plugin build failed
  • AVAILABLE: Plugin is available for installation
  • YANKED: Plugin was yanked and is no longer available
Source code in zenml/models/hub_plugin_models.py
class PluginStatus(StrEnum):
    """Enum that represents the status of a plugin.

    - PENDING: Plugin is being built
    - FAILED: Plugin build failed
    - AVAILABLE: Plugin is available for installation
    - YANKED: Plugin was yanked and is no longer available
    """

    PENDING = "pending"
    FAILED = "failed"
    AVAILABLE = "available"
    YANKED = "yanked"

logs_models

Models representing logs files.

LogsBaseModel (BaseModel) pydantic-model

Base model for logs.

Source code in zenml/models/logs_models.py
class LogsBaseModel(BaseModel):
    """Base model for logs."""

    uri: str = Field(
        title="The uri of the logs file",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    artifact_store_id: Union[str, UUID] = Field(
        title="The artifact store ID to associate the logs with.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

LogsRequestModel (LogsBaseModel, BaseRequestModel) pydantic-model

Request model for logs.

Source code in zenml/models/logs_models.py
class LogsRequestModel(LogsBaseModel, BaseRequestModel):
    """Request model for logs."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

LogsResponseModel (LogsBaseModel, BaseResponseModel) pydantic-model

Response model for logs.

Source code in zenml/models/logs_models.py
class LogsResponseModel(LogsBaseModel, BaseResponseModel):
    """Response model for logs."""

    step_run_id: Optional[Union[str, UUID]] = Field(
        title="Step ID to associate the logs with.",
        default=None,
        description="When this is set, pipeline_run_id should be set to None.",
    )
    pipeline_run_id: Optional[Union[str, UUID]] = Field(
        title="Pipeline run ID to associate the logs with.",
        default=None,
        description="When this is set, step_run_id should be set to None.",
    )
pipeline_run_id: Union[uuid.UUID, str] pydantic-field

When this is set, step_run_id should be set to None.

step_run_id: Union[uuid.UUID, str] pydantic-field

When this is set, pipeline_run_id should be set to None.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

page_model

Model implementation for easy pagination for Lists of ZenML Domain Models.

The code contained within this file has been inspired by the fastapi-pagination library: https://github.com/uriyyo/fastapi-pagination

Page (GenericModel, Generic) pydantic-model

Return Model for List Models to accommodate pagination.

Source code in zenml/models/page_model.py
class Page(GenericModel, Generic[B]):
    """Return Model for List Models to accommodate pagination."""

    index: PositiveInt
    max_size: PositiveInt
    total_pages: NonNegativeInt
    total: NonNegativeInt
    items: List[B]

    __params_type__ = BaseFilterModel

    @property
    def size(self) -> int:
        """Return the item count of the page.

        Returns:
            The amount of items in the page.
        """
        return len(self.items)

    def __len__(self) -> int:
        """Return the item count of the page.

        This enables `len(page)`.

        Returns:
            The amount of items in the page.
        """
        return len(self.items)

    def __getitem__(self, index: int) -> B:
        """Return the item at the given index.

        This enables `page[index]`.

        Args:
            index: The index to get the item from.

        Returns:
            The item at the given index.
        """
        return self.items[index]

    def __iter__(self) -> Generator[B, None, None]:  # type: ignore[override]
        """Return an iterator over the items in the page.

        This enables `for item in page` loops, but breaks `dict(page)`.

        Yields:
            An iterator over the items in the page.
        """
        for item in self.items.__iter__():
            yield item

    def __contains__(self, item: B) -> bool:
        """Returns whether the page contains a specific item.

        This enables `item in page` checks.

        Args:
            item: The item to check for.

        Returns:
            Whether the item is in the page.
        """
        return item in self.items

    class Config:
        """Pydantic configuration class."""

        # This is needed to allow the REST API server to unpack SecretStr
        # values correctly before sending them to the client.
        json_encoders = {
            SecretStr: lambda v: v.get_secret_value() if v else None
        }
size: int property readonly

Return the item count of the page.

Returns:

Type Description
int

The amount of items in the page.

Config

Pydantic configuration class.

Source code in zenml/models/page_model.py
class Config:
    """Pydantic configuration class."""

    # This is needed to allow the REST API server to unpack SecretStr
    # values correctly before sending them to the client.
    json_encoders = {
        SecretStr: lambda v: v.get_secret_value() if v else None
    }
__params_type__ (BaseModel) pydantic-model

Class to unify all filter, paginate and sort request parameters.

This Model allows fine-grained filtering, sorting and pagination of resources.

Usage example for subclasses of this class:

ResourceListModel(
    name="contains:default",
    workspace="default"
    count_steps="gte:5"
    sort_by="created",
    page=2,
    size=50
)
Source code in zenml/models/page_model.py
class BaseFilterModel(BaseModel):
    """Class to unify all filter, paginate and sort request parameters.

    This Model allows fine-grained filtering, sorting and pagination of
    resources.

    Usage example for subclasses of this class:
    ```
    ResourceListModel(
        name="contains:default",
        workspace="default"
        count_steps="gte:5"
        sort_by="created",
        page=2,
        size=50
    )
    ```
    """

    # List of fields that cannot be used as filters.
    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        "sort_by",
        "page",
        "size",
        "logical_operator",
    ]

    # List of fields that are not even mentioned as options in the CLI.
    CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = []

    sort_by: str = Field(
        default="created", description="Which column to sort by."
    )
    logical_operator: LogicalOperators = Field(
        default=LogicalOperators.AND,
        description="Which logical operator to use between all filters "
        "['and', 'or']",
    )
    page: int = Field(
        default=PAGINATION_STARTING_PAGE, ge=1, description="Page number"
    )
    size: int = Field(
        default=PAGE_SIZE_DEFAULT,
        ge=1,
        le=PAGE_SIZE_MAXIMUM,
        description="Page size",
    )

    id: Optional[Union[UUID, str]] = Field(
        default=None, description="Id for this resource"
    )
    created: Optional[Union[datetime, str]] = Field(
        default=None, description="Created"
    )
    updated: Optional[Union[datetime, str]] = Field(
        default=None, description="Updated"
    )

    @validator("sort_by", pre=True)
    def validate_sort_by(cls, v: str) -> str:
        """Validate that the sort_column is a valid column with a valid operand.

        Args:
            v: The sort_by field value.

        Returns:
            The validated sort_by field value.

        Raises:
            ValidationError: If the sort_by field is not a string.
            ValueError: If the resource can't be sorted by this field.
        """
        # Somehow pydantic allows you to pass in int values, which will be
        #  interpreted as string, however within the validator they are still
        #  integers, which don't have a .split() method
        if not isinstance(v, str):
            raise ValidationError(
                f"str type expected for the sort_by field. "
                f"Received a {type(v)}"
            )
        column = v
        split_value = v.split(":", 1)
        if len(split_value) == 2:
            column = split_value[1]

            if split_value[0] not in SorterOps.values():
                logger.warning(
                    "Invalid operand used for column sorting. "
                    "Only the following operands are supported `%s`. "
                    "Defaulting to 'asc' on column `%s`.",
                    SorterOps.values(),
                    column,
                )
                v = column

        if column in cls.FILTER_EXCLUDE_FIELDS:
            raise ValueError(
                f"This resource can not be sorted by this field: '{v}'"
            )
        elif column in cls.__fields__:
            return v
        else:
            raise ValueError(
                "You can only sort by valid fields of this resource"
            )

    @root_validator(pre=True)
    def filter_ops(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        """Parse incoming filters to ensure all filters are legal.

        Args:
            values: The values of the class.

        Returns:
            The values of the class.
        """
        cls._generate_filter_list(values)
        return values

    @property
    def list_of_filters(self) -> List[Filter]:
        """Converts the class variables into a list of usable Filter Models.

        Returns:
            A list of Filter models.
        """
        return self._generate_filter_list(
            {key: getattr(self, key) for key in self.__fields__}
        )

    @property
    def sorting_params(self) -> Tuple[str, SorterOps]:
        """Converts the class variables into a list of usable Filter Models.

        Returns:
            A tuple of the column to sort by and the sorting operand.
        """
        column = self.sort_by
        # The default sorting operand is asc
        operator = SorterOps.ASCENDING

        # Check if user explicitly set an operand
        split_value = self.sort_by.split(":", 1)
        if len(split_value) == 2:
            column = split_value[1]
            operator = SorterOps(split_value[0])

        return column, operator

    @classmethod
    def _generate_filter_list(cls, values: Dict[str, Any]) -> List[Filter]:
        """Create a list of filters from a (column, value) dictionary.

        Args:
            values: A dictionary of column names and values to filter on.

        Returns:
            A list of filters.
        """
        list_of_filters: List[Filter] = []

        for key, value in values.items():
            # Ignore excluded filters
            if key in cls.FILTER_EXCLUDE_FIELDS:
                continue

            # Skip filtering for None values
            if value is None:
                continue

            # Determine the operator and filter value
            value, operator = cls._resolve_operator(value)

            # Define the filter
            filter = cls._define_filter(
                column=key, value=value, operator=operator
            )
            list_of_filters.append(filter)

        return list_of_filters

    @staticmethod
    def _resolve_operator(value: Any) -> Tuple[Any, GenericFilterOps]:
        """Determine the operator and filter value from a user-provided value.

        If the user-provided value is a string of the form "operator:value",
        then the operator is extracted and the value is returned. Otherwise,
        `GenericFilterOps.EQUALS` is used as default operator and the value
        is returned as-is.

        Args:
            value: The user-provided value.

        Returns:
            A tuple of the filter value and the operator.
        """
        operator = GenericFilterOps.EQUALS  # Default operator
        if isinstance(value, str):
            split_value = value.split(":", 1)
            if (
                len(split_value) == 2
                and split_value[0] in GenericFilterOps.values()
            ):
                value = split_value[1]
                operator = GenericFilterOps(split_value[0])
        return value, operator

    @classmethod
    def _define_filter(
        cls, column: str, value: Any, operator: GenericFilterOps
    ) -> Filter:
        """Define a filter for a given column.

        Args:
            column: The column to filter on.
            value: The value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.
        """
        # Create datetime filters
        if cls.is_datetime_field(column):
            return cls._define_datetime_filter(
                column=column,
                value=value,
                operator=operator,
            )

        # Create UUID filters
        if cls.is_uuid_field(column):
            return cls._define_uuid_filter(
                column=column,
                value=value,
                operator=operator,
            )

        # Create int filters
        if cls.is_int_field(column):
            return NumericFilter(
                operation=GenericFilterOps(operator),
                column=column,
                value=int(value),
            )

        # Create bool filters
        if cls.is_bool_field(column):
            return cls._define_bool_filter(
                column=column,
                value=value,
                operator=operator,
            )

        # Create str filters
        if cls.is_str_field(column):
            return StrFilter(
                operation=GenericFilterOps(operator),
                column=column,
                value=value,
            )

        # Handle unsupported datatypes
        logger.warning(
            f"The Datatype {cls.__fields__[column].type_} might not be "
            "supported for filtering. Defaulting to a string filter."
        )
        return StrFilter(
            operation=GenericFilterOps(operator),
            column=column,
            value=str(value),
        )

    @classmethod
    def is_datetime_field(cls, k: str) -> bool:
        """Checks if it's a datetime field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a datetime field, False otherwise.
        """
        return (
            issubclass(datetime, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is datetime
        )

    @classmethod
    def is_uuid_field(cls, k: str) -> bool:
        """Checks if it's a uuid field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a uuid field, False otherwise.
        """
        return (
            issubclass(UUID, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is UUID
        )

    @classmethod
    def is_int_field(cls, k: str) -> bool:
        """Checks if it's a int field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a int field, False otherwise.
        """
        return (
            issubclass(int, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is int
        )

    @classmethod
    def is_bool_field(cls, k: str) -> bool:
        """Checks if it's a bool field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a bool field, False otherwise.
        """
        return (
            issubclass(bool, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is bool
        )

    @classmethod
    def is_str_field(cls, k: str) -> bool:
        """Checks if it's a string field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a string field, False otherwise.
        """
        return (
            issubclass(str, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is str
        )

    @classmethod
    def is_sort_by_field(cls, k: str) -> bool:
        """Checks if it's a sort by field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a sort by field, False otherwise.
        """
        return (
            issubclass(str, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ == str
        ) and k == "sort_by"

    @staticmethod
    def _define_datetime_filter(
        column: str, value: Any, operator: GenericFilterOps
    ) -> NumericFilter:
        """Define a datetime filter for a given column.

        Args:
            column: The column to filter on.
            value: The datetime value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.

        Raises:
            ValueError: If the value is not a valid datetime.
        """
        try:
            if isinstance(value, datetime):
                datetime_value = value
            else:
                datetime_value = datetime.strptime(
                    value, FILTERING_DATETIME_FORMAT
                )
        except ValueError as e:
            raise ValueError(
                "The datetime filter only works with values in the following "
                f"format: {FILTERING_DATETIME_FORMAT}"
            ) from e
        datetime_filter = NumericFilter(
            operation=GenericFilterOps(operator),
            column=column,
            value=datetime_value,
        )
        return datetime_filter

    @staticmethod
    def _define_uuid_filter(
        column: str, value: Any, operator: GenericFilterOps
    ) -> UUIDFilter:
        """Define a UUID filter for a given column.

        Args:
            column: The column to filter on.
            value: The UUID value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.

        Raises:
            ValueError: If the value is not a valid UUID.
        """
        # For equality checks, ensure that the value is a valid UUID.
        if operator == GenericFilterOps.EQUALS and not isinstance(value, UUID):
            try:
                UUID(value)
            except ValueError as e:
                raise ValueError(
                    "Invalid value passed as UUID query parameter."
                ) from e

        # Cast the value to string for further comparisons.
        value = str(value)

        # Generate the filter.
        uuid_filter = UUIDFilter(
            operation=GenericFilterOps(operator),
            column=column,
            value=value,
        )
        return uuid_filter

    @staticmethod
    def _define_bool_filter(
        column: str, value: Any, operator: GenericFilterOps
    ) -> BoolFilter:
        """Define a bool filter for a given column.

        Args:
            column: The column to filter on.
            value: The bool value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.
        """
        if GenericFilterOps(operator) != GenericFilterOps.EQUALS:
            logger.warning(
                "Boolean filters do not support any"
                "operation except for equals. Defaulting"
                "to an `equals` comparison."
            )
        return BoolFilter(
            operation=GenericFilterOps.EQUALS,
            column=column,
            value=bool(value),
        )

    @property
    def offset(self) -> int:
        """Returns the offset needed for the query on the data persistence layer.

        Returns:
            The offset for the query.
        """
        return self.size * (self.page - 1)

    def generate_filter(
        self, table: Type[SQLModel]
    ) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
        """Generate the filter for the query.

        Args:
            table: The Table that is being queried from.

        Returns:
            The filter expression for the query.

        Raises:
            RuntimeError: If a valid logical operator is not supplied.
        """
        from sqlalchemy import and_
        from sqlmodel import or_

        filters = []
        for column_filter in self.list_of_filters:
            filters.append(
                column_filter.generate_query_conditions(table=table)
            )
        if self.logical_operator == LogicalOperators.OR:
            return or_(False, *filters)
        elif self.logical_operator == LogicalOperators.AND:
            return and_(True, *filters)
        else:
            raise RuntimeError("No valid logical operator was supplied.")

    def apply_filter(
        self,
        query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
        table: Type["AnySchema"],
    ) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
        """Applies the filter to a query.

        Args:
            query: The query to which to apply the filter.
            table: The query table.

        Returns:
            The query with filter applied.
        """
        filters = self.generate_filter(table=table)

        if filters is not None:
            query = query.where(filters)

        return query
created: Union[datetime.datetime, str] pydantic-field

Created

id: Union[uuid.UUID, str] pydantic-field

Id for this resource

list_of_filters: List[zenml.models.filter_models.Filter] property readonly

Converts the class variables into a list of usable Filter Models.

Returns:

Type Description
List[zenml.models.filter_models.Filter]

A list of Filter models.

logical_operator: LogicalOperators pydantic-field

Which logical operator to use between all filters ['and', 'or']

offset: int property readonly

Returns the offset needed for the query on the data persistence layer.

Returns:

Type Description
int

The offset for the query.

page: ConstrainedIntValue pydantic-field

Page number

size: ConstrainedIntValue pydantic-field

Page size

sort_by: str pydantic-field

Which column to sort by.

sorting_params: Tuple[str, zenml.enums.SorterOps] property readonly

Converts the class variables into a list of usable Filter Models.

Returns:

Type Description
Tuple[str, zenml.enums.SorterOps]

A tuple of the column to sort by and the sorting operand.

updated: Union[datetime.datetime, str] pydantic-field

Updated

apply_filter(self, query, table)

Applies the filter to a query.

Parameters:

Name Type Description Default
query Union[Select[AnySchema], SelectOfScalar[AnySchema]]

The query to which to apply the filter.

required
table Type[AnySchema]

The query table.

required

Returns:

Type Description
Union[Select[AnySchema], SelectOfScalar[AnySchema]]

The query with filter applied.

Source code in zenml/models/page_model.py
def apply_filter(
    self,
    query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
    table: Type["AnySchema"],
) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
    """Applies the filter to a query.

    Args:
        query: The query to which to apply the filter.
        table: The query table.

    Returns:
        The query with filter applied.
    """
    filters = self.generate_filter(table=table)

    if filters is not None:
        query = query.where(filters)

    return query
filter_ops(values) classmethod

Parse incoming filters to ensure all filters are legal.

Parameters:

Name Type Description Default
values Dict[str, Any]

The values of the class.

required

Returns:

Type Description
Dict[str, Any]

The values of the class.

Source code in zenml/models/page_model.py
@root_validator(pre=True)
def filter_ops(cls, values: Dict[str, Any]) -> Dict[str, Any]:
    """Parse incoming filters to ensure all filters are legal.

    Args:
        values: The values of the class.

    Returns:
        The values of the class.
    """
    cls._generate_filter_list(values)
    return values
generate_filter(self, table)

Generate the filter for the query.

Parameters:

Name Type Description Default
table Type[sqlmodel.main.SQLModel]

The Table that is being queried from.

required

Returns:

Type Description
Union[BinaryExpression[Any], BooleanClauseList[Any]]

The filter expression for the query.

Exceptions:

Type Description
RuntimeError

If a valid logical operator is not supplied.

Source code in zenml/models/page_model.py
def generate_filter(
    self, table: Type[SQLModel]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
    """Generate the filter for the query.

    Args:
        table: The Table that is being queried from.

    Returns:
        The filter expression for the query.

    Raises:
        RuntimeError: If a valid logical operator is not supplied.
    """
    from sqlalchemy import and_
    from sqlmodel import or_

    filters = []
    for column_filter in self.list_of_filters:
        filters.append(
            column_filter.generate_query_conditions(table=table)
        )
    if self.logical_operator == LogicalOperators.OR:
        return or_(False, *filters)
    elif self.logical_operator == LogicalOperators.AND:
        return and_(True, *filters)
    else:
        raise RuntimeError("No valid logical operator was supplied.")
is_bool_field(k) classmethod

Checks if it's a bool field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a bool field, False otherwise.

Source code in zenml/models/page_model.py
@classmethod
def is_bool_field(cls, k: str) -> bool:
    """Checks if it's a bool field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a bool field, False otherwise.
    """
    return (
        issubclass(bool, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is bool
    )
is_datetime_field(k) classmethod

Checks if it's a datetime field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a datetime field, False otherwise.

Source code in zenml/models/page_model.py
@classmethod
def is_datetime_field(cls, k: str) -> bool:
    """Checks if it's a datetime field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a datetime field, False otherwise.
    """
    return (
        issubclass(datetime, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is datetime
    )
is_int_field(k) classmethod

Checks if it's a int field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a int field, False otherwise.

Source code in zenml/models/page_model.py
@classmethod
def is_int_field(cls, k: str) -> bool:
    """Checks if it's a int field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a int field, False otherwise.
    """
    return (
        issubclass(int, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is int
    )
is_sort_by_field(k) classmethod

Checks if it's a sort by field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a sort by field, False otherwise.

Source code in zenml/models/page_model.py
@classmethod
def is_sort_by_field(cls, k: str) -> bool:
    """Checks if it's a sort by field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a sort by field, False otherwise.
    """
    return (
        issubclass(str, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ == str
    ) and k == "sort_by"
is_str_field(k) classmethod

Checks if it's a string field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a string field, False otherwise.

Source code in zenml/models/page_model.py
@classmethod
def is_str_field(cls, k: str) -> bool:
    """Checks if it's a string field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a string field, False otherwise.
    """
    return (
        issubclass(str, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is str
    )
is_uuid_field(k) classmethod

Checks if it's a uuid field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a uuid field, False otherwise.

Source code in zenml/models/page_model.py
@classmethod
def is_uuid_field(cls, k: str) -> bool:
    """Checks if it's a uuid field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a uuid field, False otherwise.
    """
    return (
        issubclass(UUID, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is UUID
    )
validate_sort_by(v) classmethod

Validate that the sort_column is a valid column with a valid operand.

Parameters:

Name Type Description Default
v str

The sort_by field value.

required

Returns:

Type Description
str

The validated sort_by field value.

Exceptions:

Type Description
ValidationError

If the sort_by field is not a string.

ValueError

If the resource can't be sorted by this field.

Source code in zenml/models/page_model.py
@validator("sort_by", pre=True)
def validate_sort_by(cls, v: str) -> str:
    """Validate that the sort_column is a valid column with a valid operand.

    Args:
        v: The sort_by field value.

    Returns:
        The validated sort_by field value.

    Raises:
        ValidationError: If the sort_by field is not a string.
        ValueError: If the resource can't be sorted by this field.
    """
    # Somehow pydantic allows you to pass in int values, which will be
    #  interpreted as string, however within the validator they are still
    #  integers, which don't have a .split() method
    if not isinstance(v, str):
        raise ValidationError(
            f"str type expected for the sort_by field. "
            f"Received a {type(v)}"
        )
    column = v
    split_value = v.split(":", 1)
    if len(split_value) == 2:
        column = split_value[1]

        if split_value[0] not in SorterOps.values():
            logger.warning(
                "Invalid operand used for column sorting. "
                "Only the following operands are supported `%s`. "
                "Defaulting to 'asc' on column `%s`.",
                SorterOps.values(),
                column,
            )
            v = column

    if column in cls.FILTER_EXCLUDE_FIELDS:
        raise ValueError(
            f"This resource can not be sorted by this field: '{v}'"
        )
    elif column in cls.__fields__:
        return v
    else:
        raise ValueError(
            "You can only sort by valid fields of this resource"
        )
__contains__(self, item) special

Returns whether the page contains a specific item.

This enables item in page checks.

Parameters:

Name Type Description Default
item ~B

The item to check for.

required

Returns:

Type Description
bool

Whether the item is in the page.

Source code in zenml/models/page_model.py
def __contains__(self, item: B) -> bool:
    """Returns whether the page contains a specific item.

    This enables `item in page` checks.

    Args:
        item: The item to check for.

    Returns:
        Whether the item is in the page.
    """
    return item in self.items
__getitem__(self, index) special

Return the item at the given index.

This enables page[index].

Parameters:

Name Type Description Default
index int

The index to get the item from.

required

Returns:

Type Description
~B

The item at the given index.

Source code in zenml/models/page_model.py
def __getitem__(self, index: int) -> B:
    """Return the item at the given index.

    This enables `page[index]`.

    Args:
        index: The index to get the item from.

    Returns:
        The item at the given index.
    """
    return self.items[index]
__iter__(self) special

Return an iterator over the items in the page.

This enables for item in page loops, but breaks dict(page).

Yields:

Type Description
Generator[~B, NoneType, NoneType]

An iterator over the items in the page.

Source code in zenml/models/page_model.py
def __iter__(self) -> Generator[B, None, None]:  # type: ignore[override]
    """Return an iterator over the items in the page.

    This enables `for item in page` loops, but breaks `dict(page)`.

    Yields:
        An iterator over the items in the page.
    """
    for item in self.items.__iter__():
        yield item
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

__len__(self) special

Return the item count of the page.

This enables len(page).

Returns:

Type Description
int

The amount of items in the page.

Source code in zenml/models/page_model.py
def __len__(self) -> int:
    """Return the item count of the page.

    This enables `len(page)`.

    Returns:
        The amount of items in the page.
    """
    return len(self.items)

pipeline_build_models

Models representing pipeline builds.

BuildItem (BaseModel) pydantic-model

Pipeline build item.

Attributes:

Name Type Description
image str

The image name or digest.

dockerfile Optional[str]

The contents of the Dockerfile used to build the image.

requirements Optional[str]

The pip requirements installed in the image. This is a string consisting of multiple concatenated requirements.txt files.

settings_checksum Optional[str]

Checksum of the settings used for the build.

contains_code bool

Whether the image contains user files.

requires_code_download bool

Whether the image needs to download files.

Source code in zenml/models/pipeline_build_models.py
class BuildItem(BaseModel):
    """Pipeline build item.

    Attributes:
        image: The image name or digest.
        dockerfile: The contents of the Dockerfile used to build the image.
        requirements: The pip requirements installed in the image. This is a
            string consisting of multiple concatenated requirements.txt files.
        settings_checksum: Checksum of the settings used for the build.
        contains_code: Whether the image contains user files.
        requires_code_download: Whether the image needs to download files.
    """

    image: str = Field(title="The image name or digest.")
    dockerfile: Optional[str] = Field(
        title="The dockerfile used to build the image."
    )
    requirements: Optional[str] = Field(
        title="The pip requirements installed in the image."
    )
    settings_checksum: Optional[str] = Field(
        title="The checksum of the build settings."
    )
    contains_code: bool = Field(
        default=True, title="Whether the image contains user files."
    )
    requires_code_download: bool = Field(
        default=False, title="Whether the image needs to download files."
    )

PipelineBuildBaseModel (YAMLSerializationMixin) pydantic-model

Base model for pipeline builds.

Attributes:

Name Type Description
images Dict[str, zenml.models.pipeline_build_models.BuildItem]

Docker images of this build.

is_local bool

Whether the images are stored locally or in a container registry.

Source code in zenml/models/pipeline_build_models.py
class PipelineBuildBaseModel(pydantic_utils.YAMLSerializationMixin):
    """Base model for pipeline builds.

    Attributes:
        images: Docker images of this build.
        is_local: Whether the images are stored locally or in a container
            registry.
    """

    images: Dict[str, BuildItem] = Field(
        default={}, title="The images of this build."
    )
    is_local: bool = Field(
        title="Whether the build images are stored in a container registry or locally.",
    )
    contains_code: bool = Field(
        title="Whether any image of the build contains user code.",
    )
    zenml_version: Optional[str] = Field(
        title="The version of ZenML used for this build."
    )
    python_version: Optional[str] = Field(
        title="The Python version used for this build."
    )
    checksum: Optional[str] = Field(title="The build checksum.")

    @property
    def requires_code_download(self) -> bool:
        """Whether the build requires code download.

        Returns:
            Whether the build requires code download.
        """
        return any(
            item.requires_code_download for item in self.images.values()
        )

    @staticmethod
    def get_image_key(component_key: str, step: Optional[str] = None) -> str:
        """Get the image key.

        Args:
            component_key: The component key.
            step: The pipeline step for which the image was built.

        Returns:
            The image key.
        """
        if step:
            return f"{step}.{component_key}"
        else:
            return component_key

    def get_image(self, component_key: str, step: Optional[str] = None) -> str:
        """Get the image built for a specific key.

        Args:
            component_key: The key for which to get the image.
            step: The pipeline step for which to get the image. If no image
                exists for this step, will fallback to the pipeline image for
                the same key.

        Returns:
            The image name or digest.
        """
        return self._get_item(component_key=component_key, step=step).image

    def get_settings_checksum(
        self, component_key: str, step: Optional[str] = None
    ) -> Optional[str]:
        """Get the settings checksum for a specific key.

        Args:
            component_key: The key for which to get the checksum.
            step: The pipeline step for which to get the checksum. If no
                image exists for this step, will fallback to the pipeline image
                for the same key.

        Returns:
            The settings checksum.
        """
        return self._get_item(
            component_key=component_key, step=step
        ).settings_checksum

    def _get_item(
        self, component_key: str, step: Optional[str] = None
    ) -> BuildItem:
        """Get the item for a specific key.

        Args:
            component_key: The key for which to get the item.
            step: The pipeline step for which to get the item. If no item
                exists for this step, will fallback to the item for
                the same key.

        Raises:
            KeyError: If no item exists for the given key.

        Returns:
            The build item.
        """
        if step:
            try:
                combined_key = self.get_image_key(
                    component_key=component_key, step=step
                )
                return self.images[combined_key]
            except KeyError:
                pass

        try:
            return self.images[component_key]
        except KeyError:
            raise KeyError(
                f"Unable to find image for key {component_key}. Available keys: "
                f"{set(self.images)}."
            )
requires_code_download: bool property readonly

Whether the build requires code download.

Returns:

Type Description
bool

Whether the build requires code download.

get_image(self, component_key, step=None)

Get the image built for a specific key.

Parameters:

Name Type Description Default
component_key str

The key for which to get the image.

required
step Optional[str]

The pipeline step for which to get the image. If no image exists for this step, will fallback to the pipeline image for the same key.

None

Returns:

Type Description
str

The image name or digest.

Source code in zenml/models/pipeline_build_models.py
def get_image(self, component_key: str, step: Optional[str] = None) -> str:
    """Get the image built for a specific key.

    Args:
        component_key: The key for which to get the image.
        step: The pipeline step for which to get the image. If no image
            exists for this step, will fallback to the pipeline image for
            the same key.

    Returns:
        The image name or digest.
    """
    return self._get_item(component_key=component_key, step=step).image
get_image_key(component_key, step=None) staticmethod

Get the image key.

Parameters:

Name Type Description Default
component_key str

The component key.

required
step Optional[str]

The pipeline step for which the image was built.

None

Returns:

Type Description
str

The image key.

Source code in zenml/models/pipeline_build_models.py
@staticmethod
def get_image_key(component_key: str, step: Optional[str] = None) -> str:
    """Get the image key.

    Args:
        component_key: The component key.
        step: The pipeline step for which the image was built.

    Returns:
        The image key.
    """
    if step:
        return f"{step}.{component_key}"
    else:
        return component_key
get_settings_checksum(self, component_key, step=None)

Get the settings checksum for a specific key.

Parameters:

Name Type Description Default
component_key str

The key for which to get the checksum.

required
step Optional[str]

The pipeline step for which to get the checksum. If no image exists for this step, will fallback to the pipeline image for the same key.

None

Returns:

Type Description
Optional[str]

The settings checksum.

Source code in zenml/models/pipeline_build_models.py
def get_settings_checksum(
    self, component_key: str, step: Optional[str] = None
) -> Optional[str]:
    """Get the settings checksum for a specific key.

    Args:
        component_key: The key for which to get the checksum.
        step: The pipeline step for which to get the checksum. If no
            image exists for this step, will fallback to the pipeline image
            for the same key.

    Returns:
        The settings checksum.
    """
    return self._get_item(
        component_key=component_key, step=step
    ).settings_checksum

PipelineBuildFilterModel (WorkspaceScopedFilterModel) pydantic-model

Model to enable advanced filtering of all pipeline builds.

Source code in zenml/models/pipeline_build_models.py
class PipelineBuildFilterModel(WorkspaceScopedFilterModel):
    """Model to enable advanced filtering of all pipeline builds."""

    workspace_id: Union[UUID, str, None] = Field(
        description="Workspace for this pipeline build."
    )
    user_id: Union[UUID, str, None] = Field(
        description="User that produced this pipeline build."
    )
    pipeline_id: Union[UUID, str, None] = Field(
        description="Pipeline associated with the pipeline build.",
    )
    stack_id: Union[UUID, str, None] = Field(
        description="Stack used for the Pipeline Run"
    )
    is_local: Optional[bool] = Field(
        description="Whether the build images are stored in a container registry or locally.",
    )
    contains_code: Optional[bool] = Field(
        description="Whether any image of the build contains user code.",
    )
    zenml_version: Optional[str] = Field(
        description="The version of ZenML used for this build."
    )
    python_version: Optional[str] = Field(
        description="The Python version used for this build."
    )
    checksum: Optional[str] = Field(description="The build checksum.")
checksum: str pydantic-field

The build checksum.

contains_code: bool pydantic-field

Whether any image of the build contains user code.

is_local: bool pydantic-field

Whether the build images are stored in a container registry or locally.

pipeline_id: Union[uuid.UUID, str] pydantic-field

Pipeline associated with the pipeline build.

python_version: str pydantic-field

The Python version used for this build.

stack_id: Union[uuid.UUID, str] pydantic-field

Stack used for the Pipeline Run

user_id: Union[uuid.UUID, str] pydantic-field

User that produced this pipeline build.

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace for this pipeline build.

zenml_version: str pydantic-field

The version of ZenML used for this build.

PipelineBuildRequestModel (PipelineBuildBaseModel, WorkspaceScopedRequestModel) pydantic-model

Request model for pipelines builds.

Source code in zenml/models/pipeline_build_models.py
class PipelineBuildRequestModel(
    PipelineBuildBaseModel, WorkspaceScopedRequestModel
):
    """Request model for pipelines builds."""

    stack: Optional[UUID] = Field(
        title="The stack that was used for this build."
    )
    pipeline: Optional[UUID] = Field(
        title="The pipeline that was used for this build."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

PipelineBuildResponseModel (PipelineBuildBaseModel, WorkspaceScopedResponseModel) pydantic-model

Response model for pipeline builds.

Source code in zenml/models/pipeline_build_models.py
class PipelineBuildResponseModel(
    PipelineBuildBaseModel, WorkspaceScopedResponseModel
):
    """Response model for pipeline builds."""

    pipeline: Optional["PipelineResponseModel"] = Field(
        title="The pipeline that was used for this build."
    )
    stack: Optional["StackResponseModel"] = Field(
        title="The stack that was used for this build."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

pipeline_deployment_models

Models representing pipeline deployments.

PipelineDeploymentBaseModel (BaseModel) pydantic-model

Base model for pipeline deployments.

Source code in zenml/models/pipeline_deployment_models.py
class PipelineDeploymentBaseModel(BaseModel):
    """Base model for pipeline deployments."""

    run_name_template: str = Field(
        title="The run name template for runs created using this deployment.",
    )
    pipeline_configuration: PipelineConfiguration = Field(
        title="The pipeline configuration for this deployment."
    )
    step_configurations: Dict[str, Step] = Field(
        default={}, title="The step configurations for this deployment."
    )
    client_environment: Dict[str, str] = Field(
        default={}, title="The client environment for this deployment."
    )

    @property
    def requires_included_files(self) -> bool:
        """Whether the deployment requires included files.

        Returns:
            Whether the deployment requires included files.
        """
        return any(
            step.config.docker_settings.source_files == SourceFileMode.INCLUDE
            for step in self.step_configurations.values()
        )

    @property
    def requires_code_download(self) -> bool:
        """Whether the deployment requires downloading some code files.

        Returns:
            Whether the deployment requires downloading some code files.
        """
        return any(
            step.config.docker_settings.source_files == SourceFileMode.DOWNLOAD
            for step in self.step_configurations.values()
        )
requires_code_download: bool property readonly

Whether the deployment requires downloading some code files.

Returns:

Type Description
bool

Whether the deployment requires downloading some code files.

requires_included_files: bool property readonly

Whether the deployment requires included files.

Returns:

Type Description
bool

Whether the deployment requires included files.

PipelineDeploymentFilterModel (WorkspaceScopedFilterModel) pydantic-model

Model to enable advanced filtering of all pipeline deployments.

Source code in zenml/models/pipeline_deployment_models.py
class PipelineDeploymentFilterModel(WorkspaceScopedFilterModel):
    """Model to enable advanced filtering of all pipeline deployments."""

    workspace_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Workspace for this deployment."
    )
    user_id: Optional[Union[UUID, str]] = Field(
        default=None, description="User that created this deployment."
    )
    pipeline_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Pipeline associated with the deployment."
    )
    stack_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Stack associated with the deployment."
    )
    build_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Build associated with the deployment."
    )
    schedule_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Schedule associated with the deployment."
    )
build_id: Union[uuid.UUID, str] pydantic-field

Build associated with the deployment.

pipeline_id: Union[uuid.UUID, str] pydantic-field

Pipeline associated with the deployment.

schedule_id: Union[uuid.UUID, str] pydantic-field

Schedule associated with the deployment.

stack_id: Union[uuid.UUID, str] pydantic-field

Stack associated with the deployment.

user_id: Union[uuid.UUID, str] pydantic-field

User that created this deployment.

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace for this deployment.

PipelineDeploymentRequestModel (PipelineDeploymentBaseModel, WorkspaceScopedRequestModel) pydantic-model

Request model for pipeline deployments.

Source code in zenml/models/pipeline_deployment_models.py
class PipelineDeploymentRequestModel(
    PipelineDeploymentBaseModel, WorkspaceScopedRequestModel
):
    """Request model for pipeline deployments."""

    stack: UUID = Field(title="The stack associated with the deployment.")
    pipeline: Optional[UUID] = Field(
        title="The pipeline associated with the deployment."
    )
    build: Optional[UUID] = Field(
        title="The build associated with the deployment."
    )
    schedule: Optional[UUID] = Field(
        title="The schedule associated with the deployment."
    )
    code_reference: Optional["CodeReferenceRequestModel"] = Field(
        title="The code reference associated with the deployment."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

PipelineDeploymentResponseModel (PipelineDeploymentBaseModel, WorkspaceScopedResponseModel) pydantic-model

Response model for pipeline deployments.

Source code in zenml/models/pipeline_deployment_models.py
class PipelineDeploymentResponseModel(
    PipelineDeploymentBaseModel, WorkspaceScopedResponseModel
):
    """Response model for pipeline deployments."""

    pipeline: Optional["PipelineResponseModel"] = Field(
        title="The pipeline associated with the deployment."
    )
    stack: Optional["StackResponseModel"] = Field(
        title="The stack associated with the deployment."
    )
    build: Optional["PipelineBuildResponseModel"] = Field(
        title="The pipeline build associated with the deployment."
    )
    schedule: Optional["ScheduleResponseModel"] = Field(
        title="The schedule associated with the deployment."
    )
    code_reference: Optional["CodeReferenceResponseModel"] = Field(
        title="The code reference associated with the deployment."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

pipeline_models

Models representing pipelines.

PipelineBaseModel (BaseModel) pydantic-model

Base model for pipelines.

Source code in zenml/models/pipeline_models.py
class PipelineBaseModel(BaseModel):
    """Base model for pipelines."""

    name: str = Field(
        title="The name of the pipeline.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    version: str = Field(
        title="The version of the pipeline.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    version_hash: str = Field(
        title="The version hash of the pipeline.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    docstring: Optional[str] = Field(
        title="The docstring of the pipeline.",
        max_length=TEXT_FIELD_MAX_LENGTH,
    )
    spec: PipelineSpec = Field(title="The spec of the pipeline.")

PipelineFilterModel (WorkspaceScopedFilterModel) pydantic-model

Model to enable advanced filtering of all Workspaces.

Source code in zenml/models/pipeline_models.py
class PipelineFilterModel(WorkspaceScopedFilterModel):
    """Model to enable advanced filtering of all Workspaces."""

    name: Optional[str] = Field(
        default=None,
        description="Name of the Pipeline",
    )
    version: Optional[str] = Field(
        default=None,
        description="Version of the Pipeline",
    )
    version_hash: Optional[str] = Field(
        default=None,
        description="Version hash of the Pipeline",
    )
    docstring: Optional[str] = Field(
        default=None,
        description="Docstring of the Pipeline",
    )
    workspace_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Workspace of the Pipeline"
    )
    user_id: Optional[Union[UUID, str]] = Field(
        default=None, description="User of the Pipeline"
    )
docstring: str pydantic-field

Docstring of the Pipeline

name: str pydantic-field

Name of the Pipeline

user_id: Union[uuid.UUID, str] pydantic-field

User of the Pipeline

version: str pydantic-field

Version of the Pipeline

version_hash: str pydantic-field

Version hash of the Pipeline

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace of the Pipeline

PipelineRequestModel (PipelineBaseModel, WorkspaceScopedRequestModel) pydantic-model

Pipeline request model.

Source code in zenml/models/pipeline_models.py
class PipelineRequestModel(PipelineBaseModel, WorkspaceScopedRequestModel):
    """Pipeline request model."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

PipelineResponseModel (PipelineBaseModel, WorkspaceScopedResponseModel) pydantic-model

Pipeline response model user, workspace, runs, and status hydrated.

Source code in zenml/models/pipeline_models.py
class PipelineResponseModel(PipelineBaseModel, WorkspaceScopedResponseModel):
    """Pipeline response model user, workspace, runs, and status hydrated."""

    status: Optional[List[ExecutionStatus]] = Field(
        default=None, title="The status of the last 3 Pipeline Runs."
    )

    def get_runs(self, **kwargs: Any) -> List["PipelineRunResponseModel"]:
        """Get runs of this pipeline.

        Can be used to fetch runs other than `self.runs` and supports
        fine-grained filtering and pagination.

        Args:
            **kwargs: Further arguments for filtering or pagination that are
                passed to `client.list_pipeline_runs()`.

        Returns:
            List of runs of this pipeline.
        """
        from zenml.client import Client

        return Client().list_pipeline_runs(pipeline_id=self.id, **kwargs).items

    @property
    def runs(self) -> List["PipelineRunResponseModel"]:
        """Returns the 50 most recent runs of this pipeline in descending order.

        Returns:
            The 50 most recent runs of this pipeline in descending order.
        """
        return self.get_runs()

    @property
    def num_runs(self) -> int:
        """Returns the number of runs of this pipeline.

        Returns:
            The number of runs of this pipeline.
        """
        from zenml.client import Client

        return Client().list_pipeline_runs(pipeline_id=self.id, size=1).total

    @property
    def last_run(self) -> "PipelineRunResponseModel":
        """Returns the last run of this pipeline.

        Returns:
            The last run of this pipeline.

        Raises:
            RuntimeError: If no runs were found for this pipeline.
        """
        runs = self.get_runs(size=1)
        if not runs:
            raise RuntimeError(
                f"No runs found for pipeline '{self.name}' with id {self.id}."
            )
        return runs[0]

    @property
    def last_successful_run(self) -> "PipelineRunResponseModel":
        """Returns the last successful run of this pipeline.

        Returns:
            The last successful run of this pipeline.

        Raises:
            RuntimeError: If no successful runs were found for this pipeline.
        """
        runs = self.get_runs(status=ExecutionStatus.COMPLETED, size=1)
        if not runs:
            raise RuntimeError(
                f"No successful runs found for pipeline '{self.name}' with id "
                f"{self.id}."
            )
        return runs[0]
last_run: PipelineRunResponseModel property readonly

Returns the last run of this pipeline.

Returns:

Type Description
PipelineRunResponseModel

The last run of this pipeline.

Exceptions:

Type Description
RuntimeError

If no runs were found for this pipeline.

last_successful_run: PipelineRunResponseModel property readonly

Returns the last successful run of this pipeline.

Returns:

Type Description
PipelineRunResponseModel

The last successful run of this pipeline.

Exceptions:

Type Description
RuntimeError

If no successful runs were found for this pipeline.

num_runs: int property readonly

Returns the number of runs of this pipeline.

Returns:

Type Description
int

The number of runs of this pipeline.

runs: List[PipelineRunResponseModel] property readonly

Returns the 50 most recent runs of this pipeline in descending order.

Returns:

Type Description
List[PipelineRunResponseModel]

The 50 most recent runs of this pipeline in descending order.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_runs(self, **kwargs)

Get runs of this pipeline.

Can be used to fetch runs other than self.runs and supports fine-grained filtering and pagination.

Parameters:

Name Type Description Default
**kwargs Any

Further arguments for filtering or pagination that are passed to client.list_pipeline_runs().

{}

Returns:

Type Description
List[PipelineRunResponseModel]

List of runs of this pipeline.

Source code in zenml/models/pipeline_models.py
def get_runs(self, **kwargs: Any) -> List["PipelineRunResponseModel"]:
    """Get runs of this pipeline.

    Can be used to fetch runs other than `self.runs` and supports
    fine-grained filtering and pagination.

    Args:
        **kwargs: Further arguments for filtering or pagination that are
            passed to `client.list_pipeline_runs()`.

    Returns:
        List of runs of this pipeline.
    """
    from zenml.client import Client

    return Client().list_pipeline_runs(pipeline_id=self.id, **kwargs).items

PipelineUpdateModel (PipelineRequestModel) pydantic-model

Pipeline update model.

Source code in zenml/models/pipeline_models.py
class PipelineUpdateModel(PipelineRequestModel):
    """Pipeline update model."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

pipeline_run_models

Models representing pipeline runs.

PipelineRunBaseModel (BaseModel) pydantic-model

Base model for pipeline runs.

Source code in zenml/models/pipeline_run_models.py
class PipelineRunBaseModel(BaseModel):
    """Base model for pipeline runs."""

    name: str = Field(
        title="The name of the pipeline run.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    orchestrator_run_id: Optional[str] = Field(
        title="The orchestrator run ID.",
        max_length=STR_FIELD_MAX_LENGTH,
        default=None,
    )
    schedule_id: Optional[UUID] = Field(
        title="The ID of the schedule that triggered this pipeline run.",
        default=None,
    )
    enable_cache: Optional[bool] = Field(
        title="Whether to enable caching for this pipeline run.",
        default=None,
    )
    start_time: Optional[datetime] = Field(
        title="The start time of the pipeline run.",
        default=None,
    )
    end_time: Optional[datetime] = Field(
        title="The end time of the pipeline run.",
        default=None,
    )
    status: ExecutionStatus = Field(
        title="The status of the pipeline run.",
    )
    config: PipelineConfiguration = Field(
        title="The pipeline configuration used for this pipeline run.",
    )
    num_steps: Optional[int] = Field(
        title="The number of steps in this pipeline run.",
        default=None,
    )
    client_version: Optional[str] = Field(
        title="Client version.",
        default=current_zenml_version,
        max_length=STR_FIELD_MAX_LENGTH,
    )
    server_version: Optional[str] = Field(
        title="Server version.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    client_environment: Dict[str, str] = Field(
        default={},
        title=(
            "Environment of the client that initiated this pipeline run "
            "(OS, Python version, etc.)."
        ),
    )
    orchestrator_environment: Dict[str, str] = Field(
        default={},
        title=(
            "Environment of the orchestrator that executed this pipeline run "
            "(OS, Python version, etc.)."
        ),
    )

PipelineRunFilterModel (WorkspaceScopedFilterModel) pydantic-model

Model to enable advanced filtering of all Workspaces.

Source code in zenml/models/pipeline_run_models.py
class PipelineRunFilterModel(WorkspaceScopedFilterModel):
    """Model to enable advanced filtering of all Workspaces."""

    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *WorkspaceScopedFilterModel.FILTER_EXCLUDE_FIELDS,
        "unlisted",
        "code_repository_id",
    ]

    name: Optional[str] = Field(
        default=None,
        description="Name of the Pipeline Run",
    )
    orchestrator_run_id: Optional[str] = Field(
        default=None,
        description="Name of the Pipeline Run within the orchestrator",
    )

    pipeline_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Pipeline associated with the Pipeline Run"
    )
    workspace_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Workspace of the Pipeline Run"
    )
    user_id: Optional[Union[UUID, str]] = Field(
        default=None, description="User that created the Pipeline Run"
    )

    stack_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Stack used for the Pipeline Run"
    )
    schedule_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Schedule that triggered the Pipeline Run"
    )
    build_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Build used for the Pipeline Run"
    )
    deployment_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Deployment used for the Pipeline Run"
    )
    code_repository_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Code repository used for the Pipeline Run"
    )

    status: Optional[str] = Field(
        default=None,
        description="Name of the Pipeline Run",
    )
    start_time: Optional[Union[datetime, str]] = Field(
        default=None, description="Start time for this run"
    )
    end_time: Optional[Union[datetime, str]] = Field(
        default=None, description="End time for this run"
    )

    num_steps: Optional[int] = Field(
        default=None,
        description="Amount of steps in the Pipeline Run",
    )

    unlisted: Optional[bool] = None

    def generate_filter(
        self, table: Type["SQLModel"]
    ) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
        """Generate the filter for the query.

        Args:
            table: The Table that is being queried from.

        Returns:
            The filter expression for the query.
        """
        from sqlalchemy import and_
        from sqlmodel import or_

        base_filter = super().generate_filter(table)

        operator = (
            or_ if self.logical_operator == LogicalOperators.OR else and_
        )

        if self.unlisted is not None:
            if self.unlisted is True:
                unlisted_filter = getattr(table, "pipeline_id").is_(None)
            else:
                unlisted_filter = getattr(table, "pipeline_id").is_not(None)

            base_filter = operator(base_filter, unlisted_filter)

        if self.code_repository_id:
            from zenml.zen_stores.schemas import (
                CodeReferenceSchema,
                PipelineDeploymentSchema,
                PipelineRunSchema,
            )

            code_repo_filter = and_(  # type: ignore[type-var]
                PipelineRunSchema.deployment_id == PipelineDeploymentSchema.id,
                PipelineDeploymentSchema.code_reference_id
                == CodeReferenceSchema.id,
                CodeReferenceSchema.code_repository_id
                == self.code_repository_id,
            )

            base_filter = operator(base_filter, code_repo_filter)

        return base_filter
build_id: Union[uuid.UUID, str] pydantic-field

Build used for the Pipeline Run

code_repository_id: Union[uuid.UUID, str] pydantic-field

Code repository used for the Pipeline Run

deployment_id: Union[uuid.UUID, str] pydantic-field

Deployment used for the Pipeline Run

end_time: Union[datetime.datetime, str] pydantic-field

End time for this run

name: str pydantic-field

Name of the Pipeline Run

num_steps: int pydantic-field

Amount of steps in the Pipeline Run

orchestrator_run_id: str pydantic-field

Name of the Pipeline Run within the orchestrator

pipeline_id: Union[uuid.UUID, str] pydantic-field

Pipeline associated with the Pipeline Run

schedule_id: Union[uuid.UUID, str] pydantic-field

Schedule that triggered the Pipeline Run

stack_id: Union[uuid.UUID, str] pydantic-field

Stack used for the Pipeline Run

start_time: Union[datetime.datetime, str] pydantic-field

Start time for this run

status: str pydantic-field

Name of the Pipeline Run

user_id: Union[uuid.UUID, str] pydantic-field

User that created the Pipeline Run

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace of the Pipeline Run

generate_filter(self, table)

Generate the filter for the query.

Parameters:

Name Type Description Default
table Type[SQLModel]

The Table that is being queried from.

required

Returns:

Type Description
Union[BinaryExpression[Any], BooleanClauseList[Any]]

The filter expression for the query.

Source code in zenml/models/pipeline_run_models.py
def generate_filter(
    self, table: Type["SQLModel"]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
    """Generate the filter for the query.

    Args:
        table: The Table that is being queried from.

    Returns:
        The filter expression for the query.
    """
    from sqlalchemy import and_
    from sqlmodel import or_

    base_filter = super().generate_filter(table)

    operator = (
        or_ if self.logical_operator == LogicalOperators.OR else and_
    )

    if self.unlisted is not None:
        if self.unlisted is True:
            unlisted_filter = getattr(table, "pipeline_id").is_(None)
        else:
            unlisted_filter = getattr(table, "pipeline_id").is_not(None)

        base_filter = operator(base_filter, unlisted_filter)

    if self.code_repository_id:
        from zenml.zen_stores.schemas import (
            CodeReferenceSchema,
            PipelineDeploymentSchema,
            PipelineRunSchema,
        )

        code_repo_filter = and_(  # type: ignore[type-var]
            PipelineRunSchema.deployment_id == PipelineDeploymentSchema.id,
            PipelineDeploymentSchema.code_reference_id
            == CodeReferenceSchema.id,
            CodeReferenceSchema.code_repository_id
            == self.code_repository_id,
        )

        base_filter = operator(base_filter, code_repo_filter)

    return base_filter

PipelineRunRequestModel (PipelineRunBaseModel, WorkspaceScopedRequestModel) pydantic-model

Pipeline run model with user, workspace, pipeline, and stack as UUIDs.

Source code in zenml/models/pipeline_run_models.py
class PipelineRunRequestModel(
    PipelineRunBaseModel, WorkspaceScopedRequestModel
):
    """Pipeline run model with user, workspace, pipeline, and stack as UUIDs."""

    id: UUID
    stack: Optional[UUID]  # Might become None if the stack is deleted.
    pipeline: Optional[UUID]  # Unlisted runs have this as None.
    build: Optional[UUID]
    deployment: Optional[UUID]
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

PipelineRunResponseModel (PipelineRunBaseModel, WorkspaceScopedResponseModel) pydantic-model

Pipeline run model with user, workspace, pipeline, and stack hydrated.

Source code in zenml/models/pipeline_run_models.py
class PipelineRunResponseModel(
    PipelineRunBaseModel, WorkspaceScopedResponseModel
):
    """Pipeline run model with user, workspace, pipeline, and stack hydrated."""

    pipeline: Optional["PipelineResponseModel"] = Field(
        default=None, title="The pipeline this run belongs to."
    )
    stack: Optional["StackResponseModel"] = Field(
        default=None, title="The stack that was used for this run."
    )
    metadata: Dict[str, "RunMetadataResponseModel"] = Field(
        default={},
        title="Metadata associated with this pipeline run.",
    )
    build: Optional["PipelineBuildResponseModel"] = Field(
        default=None, title="The pipeline build that was used for this run."
    )
    deployment: Optional["PipelineDeploymentResponseModel"] = Field(
        default=None, title="The deployment that was used for this run."
    )
    steps: Dict[str, "StepRunResponseModel"] = Field(
        default={}, title="The steps of this run."
    )

    @property
    def artifacts(self) -> List["ArtifactResponseModel"]:
        """Get all artifacts that are outputs of steps of this pipeline run.

        Returns:
            All output artifacts of this pipeline run (including cached ones).
        """
        from zenml.utils.artifact_utils import get_artifacts_of_pipeline_run

        return get_artifacts_of_pipeline_run(self)

    @property
    def produced_artifacts(self) -> List["ArtifactResponseModel"]:
        """Get all artifacts produced during this pipeline run.

        Returns:
            A list of all artifacts produced during this pipeline run.
        """
        from zenml.utils.artifact_utils import get_artifacts_of_pipeline_run

        return get_artifacts_of_pipeline_run(self, only_produced=True)

    def get_step(self, step: str) -> "StepRunResponseModel":
        """(Deprecated) Get a step by name.

        Args:
            step: Name of the step to get.

        Returns:
            The step with the given name.
        """
        from zenml.logger import get_logger

        logger = get_logger(__name__)
        logger.warning(
            "`run.get_step(<step_name>)` is deprecated and will be removed in "
            "a future release. Please use `run.steps[<step_name>]` instead."
        )
        return self.steps[step]
artifacts: List[ArtifactResponseModel] property readonly

Get all artifacts that are outputs of steps of this pipeline run.

Returns:

Type Description
List[ArtifactResponseModel]

All output artifacts of this pipeline run (including cached ones).

produced_artifacts: List[ArtifactResponseModel] property readonly

Get all artifacts produced during this pipeline run.

Returns:

Type Description
List[ArtifactResponseModel]

A list of all artifacts produced during this pipeline run.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_step(self, step)

(Deprecated) Get a step by name.

Parameters:

Name Type Description Default
step str

Name of the step to get.

required

Returns:

Type Description
StepRunResponseModel

The step with the given name.

Source code in zenml/models/pipeline_run_models.py
def get_step(self, step: str) -> "StepRunResponseModel":
    """(Deprecated) Get a step by name.

    Args:
        step: Name of the step to get.

    Returns:
        The step with the given name.
    """
    from zenml.logger import get_logger

    logger = get_logger(__name__)
    logger.warning(
        "`run.get_step(<step_name>)` is deprecated and will be removed in "
        "a future release. Please use `run.steps[<step_name>]` instead."
    )
    return self.steps[step]

PipelineRunUpdateModel (BaseModel) pydantic-model

Pipeline run update model.

Source code in zenml/models/pipeline_run_models.py
class PipelineRunUpdateModel(BaseModel):
    """Pipeline run update model."""

    status: Optional[ExecutionStatus] = None
    end_time: Optional[datetime] = None

role_models

Models representing roles that can be assigned to users or teams.

RoleBaseModel (BaseModel) pydantic-model

Base model for roles.

Source code in zenml/models/role_models.py
class RoleBaseModel(BaseModel):
    """Base model for roles."""

    name: str = Field(
        title="The unique name of the role.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    permissions: Set[PermissionType]

RoleFilterModel (BaseFilterModel) pydantic-model

Model to enable advanced filtering of all Users.

Source code in zenml/models/role_models.py
class RoleFilterModel(BaseFilterModel):
    """Model to enable advanced filtering of all Users."""

    name: Optional[str] = Field(
        default=None,
        description="Name of the role",
    )
name: str pydantic-field

Name of the role

RoleRequestModel (RoleBaseModel, BaseRequestModel) pydantic-model

Request model for roles.

Source code in zenml/models/role_models.py
class RoleRequestModel(RoleBaseModel, BaseRequestModel):
    """Request model for roles."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

RoleResponseModel (RoleBaseModel, BaseResponseModel) pydantic-model

Response model for roles.

Source code in zenml/models/role_models.py
class RoleResponseModel(RoleBaseModel, BaseResponseModel):
    """Response model for roles."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

RoleUpdateModel (RoleRequestModel) pydantic-model

Update model for roles.

Source code in zenml/models/role_models.py
class RoleUpdateModel(RoleRequestModel):
    """Update model for roles."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

run_metadata_models

Models representing run metadata.

RunMetadataBaseModel (BaseModel) pydantic-model

Base model for run metadata.

Source code in zenml/models/run_metadata_models.py
class RunMetadataBaseModel(BaseModel):
    """Base model for run metadata."""

    pipeline_run_id: Optional[UUID] = Field(
        title="The ID of the pipeline run that this metadata belongs to.",
    )
    step_run_id: Optional[UUID]
    artifact_id: Optional[UUID]
    stack_component_id: Optional[UUID]
    key: str = Field(
        title="The key of the metadata.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    value: MetadataType = Field(
        title="The value of the metadata.",
        max_length=TEXT_FIELD_MAX_LENGTH,
    )
    type: MetadataTypeEnum = Field(
        title="The type of the metadata.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

RunMetadataFilterModel (WorkspaceScopedFilterModel) pydantic-model

Model to enable advanced filtering of run metadata.

Source code in zenml/models/run_metadata_models.py
class RunMetadataFilterModel(WorkspaceScopedFilterModel):
    """Model to enable advanced filtering of run metadata."""

    pipeline_run_id: Optional[Union[str, UUID]] = None
    step_run_id: Optional[Union[str, UUID]] = None
    artifact_id: Optional[Union[str, UUID]] = None
    stack_component_id: Optional[Union[str, UUID]] = None
    key: Optional[str] = None
    type: Optional[Union[str, MetadataTypeEnum]] = None

RunMetadataRequestModel (RunMetadataBaseModel, WorkspaceScopedRequestModel) pydantic-model

Request model for run metadata.

Source code in zenml/models/run_metadata_models.py
class RunMetadataRequestModel(
    RunMetadataBaseModel, WorkspaceScopedRequestModel
):
    """Request model for run metadata."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

RunMetadataResponseModel (RunMetadataBaseModel, WorkspaceScopedResponseModel) pydantic-model

Response model for run metadata.

Source code in zenml/models/run_metadata_models.py
class RunMetadataResponseModel(
    RunMetadataBaseModel, WorkspaceScopedResponseModel
):
    """Response model for run metadata."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

schedule_model

Model definition for pipeline run schedules.

ScheduleBaseModel (Schedule, BaseModel) pydantic-model

Domain model for schedules.

Source code in zenml/models/schedule_model.py
class ScheduleBaseModel(Schedule, BaseModel):
    """Domain model for schedules."""

    ANALYTICS_FIELDS: ClassVar[List[str]] = ["id"]

    name: str
    active: bool
    orchestrator_id: Optional[UUID]
    pipeline_id: Optional[UUID]

ScheduleFilterModel (ShareableWorkspaceScopedFilterModel) pydantic-model

Model to enable advanced filtering of all Users.

Source code in zenml/models/schedule_model.py
class ScheduleFilterModel(ShareableWorkspaceScopedFilterModel):
    """Model to enable advanced filtering of all Users."""

    workspace_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Workspace scope of the schedule."
    )
    user_id: Optional[Union[UUID, str]] = Field(
        default=None, description="User that created the schedule"
    )
    pipeline_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Pipeline that the schedule is attached to."
    )
    orchestrator_id: Optional[Union[UUID, str]] = Field(
        default=None,
        description="Orchestrator that the schedule is attached to.",
    )
    active: Optional[bool] = Field(
        default=None,
        description="If the schedule is active",
    )
    cron_expression: Optional[str] = Field(
        default=None,
        description="The cron expression, describing the schedule",
    )
    start_time: Optional[Union[datetime, str]] = Field(
        default=None, description="Start time"
    )
    end_time: Optional[Union[datetime, str]] = Field(
        default=None, description="End time"
    )
    interval_second: Optional[Optional[float]] = Field(
        default=None,
        description="The repetition interval in seconds",
    )
    catchup: Optional[bool] = Field(
        default=None,
        description="Whether or not the schedule is set to catchup past missed "
        "events",
    )
    name: Optional[str] = Field(
        default=None,
        description="Name of the schedule",
    )
active: bool pydantic-field

If the schedule is active

catchup: bool pydantic-field

Whether or not the schedule is set to catchup past missed events

cron_expression: str pydantic-field

The cron expression, describing the schedule

end_time: Union[datetime.datetime, str] pydantic-field

End time

interval_second: float pydantic-field

The repetition interval in seconds

name: str pydantic-field

Name of the schedule

orchestrator_id: Union[uuid.UUID, str] pydantic-field

Orchestrator that the schedule is attached to.

pipeline_id: Union[uuid.UUID, str] pydantic-field

Pipeline that the schedule is attached to.

start_time: Union[datetime.datetime, str] pydantic-field

Start time

user_id: Union[uuid.UUID, str] pydantic-field

User that created the schedule

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace scope of the schedule.

ScheduleRequestModel (ScheduleBaseModel, WorkspaceScopedRequestModel) pydantic-model

Schedule request model.

Source code in zenml/models/schedule_model.py
class ScheduleRequestModel(ScheduleBaseModel, WorkspaceScopedRequestModel):
    """Schedule request model."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

ScheduleResponseModel (ScheduleBaseModel, WorkspaceScopedResponseModel) pydantic-model

Schedule response model with workspace and user hydrated.

Source code in zenml/models/schedule_model.py
class ScheduleResponseModel(ScheduleBaseModel, WorkspaceScopedResponseModel):
    """Schedule response model with workspace and user hydrated."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

ScheduleUpdateModel (BaseModel) pydantic-model

Schedule update model.

Source code in zenml/models/schedule_model.py
class ScheduleUpdateModel(BaseModel):
    """Schedule update model."""

    name: Optional[str] = None
    active: Optional[bool] = None
    cron_expression: Optional[str] = None
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None
    interval_second: Optional[timedelta] = None
    catchup: Optional[bool] = None

secret_models

Models representing secrets.

SecretBaseModel (BaseModel) pydantic-model

Base model for secrets.

Source code in zenml/models/secret_models.py
class SecretBaseModel(BaseModel):
    """Base model for secrets."""

    name: str = Field(
        title="The name of the secret.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

    scope: SecretScope = Field(
        SecretScope.WORKSPACE, title="The scope of the secret."
    )

    values: Dict[str, Optional[SecretStr]] = Field(
        default_factory=dict, title="The values stored in this secret."
    )

    @property
    def secret_values(self) -> Dict[str, str]:
        """A dictionary with all un-obfuscated values stored in this secret.

        The values are returned as strings, not SecretStr. If a value is
        None, it is not included in the returned dictionary. This is to enable
        the use of None values in the update model to indicate that a secret
        value should be deleted.

        Returns:
            A dictionary containing the secret's values.
        """
        return {
            k: v.get_secret_value()
            for k, v in self.values.items()
            if v is not None
        }

    @property
    def has_missing_values(self) -> bool:
        """Returns True if the secret has missing values (i.e. None).

        Values can be missing from a secret for example if the user retrieves a
        secret but does not have the permission to view the secret values.

        Returns:
            True if the secret has any values set to None.
        """
        return any(v is None for v in self.values.values())

    def add_secret(self, key: str, value: str) -> None:
        """Adds a secret value to the secret.

        Args:
            key: The key of the secret value.
            value: The secret value.
        """
        self.values[key] = SecretStr(value)

    def remove_secret(self, key: str) -> None:
        """Removes a secret value from the secret.

        Args:
            key: The key of the secret value.
        """
        del self.values[key]

    def remove_secrets(self) -> None:
        """Removes all secret values from the secret but keep the keys."""
        self.values = {k: None for k in self.values.keys()}
has_missing_values: bool property readonly

Returns True if the secret has missing values (i.e. None).

Values can be missing from a secret for example if the user retrieves a secret but does not have the permission to view the secret values.

Returns:

Type Description
bool

True if the secret has any values set to None.

secret_values: Dict[str, str] property readonly

A dictionary with all un-obfuscated values stored in this secret.

The values are returned as strings, not SecretStr. If a value is None, it is not included in the returned dictionary. This is to enable the use of None values in the update model to indicate that a secret value should be deleted.

Returns:

Type Description
Dict[str, str]

A dictionary containing the secret's values.

add_secret(self, key, value)

Adds a secret value to the secret.

Parameters:

Name Type Description Default
key str

The key of the secret value.

required
value str

The secret value.

required
Source code in zenml/models/secret_models.py
def add_secret(self, key: str, value: str) -> None:
    """Adds a secret value to the secret.

    Args:
        key: The key of the secret value.
        value: The secret value.
    """
    self.values[key] = SecretStr(value)
remove_secret(self, key)

Removes a secret value from the secret.

Parameters:

Name Type Description Default
key str

The key of the secret value.

required
Source code in zenml/models/secret_models.py
def remove_secret(self, key: str) -> None:
    """Removes a secret value from the secret.

    Args:
        key: The key of the secret value.
    """
    del self.values[key]
remove_secrets(self)

Removes all secret values from the secret but keep the keys.

Source code in zenml/models/secret_models.py
def remove_secrets(self) -> None:
    """Removes all secret values from the secret but keep the keys."""
    self.values = {k: None for k in self.values.keys()}

SecretFilterModel (WorkspaceScopedFilterModel) pydantic-model

Model to enable advanced filtering of all Secrets.

Source code in zenml/models/secret_models.py
class SecretFilterModel(WorkspaceScopedFilterModel):
    """Model to enable advanced filtering of all Secrets."""

    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *WorkspaceScopedFilterModel.FILTER_EXCLUDE_FIELDS,
        "values",
    ]

    name: Optional[str] = Field(
        default=None,
        description="Name of the secret",
    )

    scope: Optional[Union[SecretScope, str]] = Field(
        default=None,
        description="Scope in which to filter secrets",
    )

    workspace_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Workspace of the Secret"
    )

    user_id: Optional[Union[UUID, str]] = Field(
        default=None, description="User that created the Secret"
    )

    @staticmethod
    def _get_filtering_value(value: Optional[Any]) -> str:
        """Convert the value to a string that can be used for lexicographical filtering and sorting.

        Args:
            value: The value to convert.

        Returns:
            The value converted to string format that can be used for
            lexicographical sorting and filtering.
        """
        if value is None:
            return ""
        str_value = str(value)
        if isinstance(value, datetime):
            str_value = value.strftime("%Y-%m-%d %H:%M:%S")
        return str_value

    def secret_matches(self, secret: SecretResponseModel) -> bool:
        """Checks if a secret matches the filter criteria.

        Args:
            secret: The secret to check.

        Returns:
            True if the secret matches the filter criteria, False otherwise.
        """
        for filter in self.list_of_filters:
            column_value: Optional[Any] = None
            if filter.column == "workspace_id":
                column_value = secret.workspace.id
            elif filter.column == "user_id":
                column_value = secret.user.id if secret.user else None
            else:
                column_value = getattr(secret, filter.column)

            # Convert the values to strings for lexicographical comparison.
            str_column_value = self._get_filtering_value(column_value)
            str_filter_value = self._get_filtering_value(filter.value)

            # Compare the lexicographical values according to the operation.
            if filter.operation == GenericFilterOps.EQUALS:
                result = str_column_value == str_filter_value
            elif filter.operation == GenericFilterOps.CONTAINS:
                result = str_filter_value in str_column_value
            elif filter.operation == GenericFilterOps.STARTSWITH:
                result = str_column_value.startswith(str_filter_value)
            elif filter.operation == GenericFilterOps.ENDSWITH:
                result = str_column_value.endswith(str_filter_value)
            elif filter.operation == GenericFilterOps.GT:
                result = str_column_value > str_filter_value
            elif filter.operation == GenericFilterOps.GTE:
                result = str_column_value >= str_filter_value
            elif filter.operation == GenericFilterOps.LT:
                result = str_column_value < str_filter_value
            elif filter.operation == GenericFilterOps.LTE:
                result = str_column_value <= str_filter_value

            # Exit early if the result is False for AND and True for OR
            if self.logical_operator == LogicalOperators.AND:
                if not result:
                    return False
            else:
                if result:
                    return True

        # If we get here, all filters have been checked and the result is
        # True for AND and False for OR
        if self.logical_operator == LogicalOperators.AND:
            return True
        else:
            return False

    def sort_secrets(
        self, secrets: List[SecretResponseModel]
    ) -> List[SecretResponseModel]:
        """Sorts a list of secrets according to the filter criteria.

        Args:
            secrets: The list of secrets to sort.

        Returns:
            The sorted list of secrets.
        """
        column, sort_op = self.sorting_params
        sorted_secrets = sorted(
            secrets,
            key=lambda secret: self._get_filtering_value(
                getattr(secret, column)
            ),
            reverse=sort_op == SorterOps.DESCENDING,
        )

        return sorted_secrets
name: str pydantic-field

Name of the secret

scope: Union[zenml.enums.SecretScope, str] pydantic-field

Scope in which to filter secrets

user_id: Union[uuid.UUID, str] pydantic-field

User that created the Secret

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace of the Secret

secret_matches(self, secret)

Checks if a secret matches the filter criteria.

Parameters:

Name Type Description Default
secret SecretResponseModel

The secret to check.

required

Returns:

Type Description
bool

True if the secret matches the filter criteria, False otherwise.

Source code in zenml/models/secret_models.py
def secret_matches(self, secret: SecretResponseModel) -> bool:
    """Checks if a secret matches the filter criteria.

    Args:
        secret: The secret to check.

    Returns:
        True if the secret matches the filter criteria, False otherwise.
    """
    for filter in self.list_of_filters:
        column_value: Optional[Any] = None
        if filter.column == "workspace_id":
            column_value = secret.workspace.id
        elif filter.column == "user_id":
            column_value = secret.user.id if secret.user else None
        else:
            column_value = getattr(secret, filter.column)

        # Convert the values to strings for lexicographical comparison.
        str_column_value = self._get_filtering_value(column_value)
        str_filter_value = self._get_filtering_value(filter.value)

        # Compare the lexicographical values according to the operation.
        if filter.operation == GenericFilterOps.EQUALS:
            result = str_column_value == str_filter_value
        elif filter.operation == GenericFilterOps.CONTAINS:
            result = str_filter_value in str_column_value
        elif filter.operation == GenericFilterOps.STARTSWITH:
            result = str_column_value.startswith(str_filter_value)
        elif filter.operation == GenericFilterOps.ENDSWITH:
            result = str_column_value.endswith(str_filter_value)
        elif filter.operation == GenericFilterOps.GT:
            result = str_column_value > str_filter_value
        elif filter.operation == GenericFilterOps.GTE:
            result = str_column_value >= str_filter_value
        elif filter.operation == GenericFilterOps.LT:
            result = str_column_value < str_filter_value
        elif filter.operation == GenericFilterOps.LTE:
            result = str_column_value <= str_filter_value

        # Exit early if the result is False for AND and True for OR
        if self.logical_operator == LogicalOperators.AND:
            if not result:
                return False
        else:
            if result:
                return True

    # If we get here, all filters have been checked and the result is
    # True for AND and False for OR
    if self.logical_operator == LogicalOperators.AND:
        return True
    else:
        return False
sort_secrets(self, secrets)

Sorts a list of secrets according to the filter criteria.

Parameters:

Name Type Description Default
secrets List[zenml.models.secret_models.SecretResponseModel]

The list of secrets to sort.

required

Returns:

Type Description
List[zenml.models.secret_models.SecretResponseModel]

The sorted list of secrets.

Source code in zenml/models/secret_models.py
def sort_secrets(
    self, secrets: List[SecretResponseModel]
) -> List[SecretResponseModel]:
    """Sorts a list of secrets according to the filter criteria.

    Args:
        secrets: The list of secrets to sort.

    Returns:
        The sorted list of secrets.
    """
    column, sort_op = self.sorting_params
    sorted_secrets = sorted(
        secrets,
        key=lambda secret: self._get_filtering_value(
            getattr(secret, column)
        ),
        reverse=sort_op == SorterOps.DESCENDING,
    )

    return sorted_secrets

SecretRequestModel (SecretBaseModel, WorkspaceScopedRequestModel) pydantic-model

Secret request model.

Source code in zenml/models/secret_models.py
class SecretRequestModel(SecretBaseModel, WorkspaceScopedRequestModel):
    """Secret request model."""

    ANALYTICS_FIELDS: ClassVar[List[str]] = ["scope"]
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

SecretResponseModel (SecretBaseModel, WorkspaceScopedResponseModel) pydantic-model

Secret response model with user and workspace hydrated.

Source code in zenml/models/secret_models.py
class SecretResponseModel(SecretBaseModel, WorkspaceScopedResponseModel):
    """Secret response model with user and workspace hydrated."""

    ANALYTICS_FIELDS: ClassVar[List[str]] = ["scope"]
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

SecretUpdateModel (SecretRequestModel) pydantic-model

Secret update model.

Source code in zenml/models/secret_models.py
class SecretUpdateModel(SecretRequestModel):
    """Secret update model."""

    scope: Optional[SecretScope] = Field(  # type: ignore[assignment]
        default=None, title="The scope of the secret."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

server_models

Model definitions for ZenML servers.

ServerDatabaseType (StrEnum)

Enum for server database types.

Source code in zenml/models/server_models.py
class ServerDatabaseType(StrEnum):
    """Enum for server database types."""

    SQLITE = "sqlite"
    MYSQL = "mysql"
    OTHER = "other"

ServerDeploymentType (StrEnum)

Enum for server deployment types.

Source code in zenml/models/server_models.py
class ServerDeploymentType(StrEnum):
    """Enum for server deployment types."""

    LOCAL = "local"
    DOCKER = "docker"
    KUBERNETES = "kubernetes"
    AWS = "aws"
    GCP = "gcp"
    AZURE = "azure"
    ALPHA = "alpha"
    OTHER = "other"
    HF_SPACES = "hf_spaces"
    SANDBOX = "sandbox"
    CLOUD = "cloud"

ServerModel (BaseModel) pydantic-model

Domain model for ZenML servers.

Source code in zenml/models/server_models.py
class ServerModel(BaseModel):
    """Domain model for ZenML servers."""

    id: UUID = Field(default_factory=uuid4, title="The unique server id.")

    version: str = Field(
        title="The ZenML version that the server is running.",
    )

    debug: bool = Field(
        False, title="Flag to indicate whether ZenML is running on debug mode."
    )

    deployment_type: ServerDeploymentType = Field(
        ServerDeploymentType.OTHER,
        title="The ZenML server deployment type.",
    )
    database_type: ServerDatabaseType = Field(
        ServerDatabaseType.OTHER,
        title="The database type that the server is using.",
    )
    secrets_store_type: SecretsStoreType = Field(
        SecretsStoreType.NONE,
        title="The type of secrets store that the server is using.",
    )

    def is_local(self) -> bool:
        """Return whether the server is running locally.

        Returns:
            True if the server is running locally, False otherwise.
        """
        from zenml.config.global_config import GlobalConfiguration

        # Local ZenML servers are identifiable by the fact that their
        # server ID is the same as the local client (user) ID.
        return self.id == GlobalConfiguration().user_id
is_local(self)

Return whether the server is running locally.

Returns:

Type Description
bool

True if the server is running locally, False otherwise.

Source code in zenml/models/server_models.py
def is_local(self) -> bool:
    """Return whether the server is running locally.

    Returns:
        True if the server is running locally, False otherwise.
    """
    from zenml.config.global_config import GlobalConfiguration

    # Local ZenML servers are identifiable by the fact that their
    # server ID is the same as the local client (user) ID.
    return self.id == GlobalConfiguration().user_id

service_connector_models

Model definitions for ZenML service connectors.

AuthenticationMethodModel (BaseModel) pydantic-model

Authentication method specification.

Describes the schema for the configuration and secrets that need to be provided to configure an authentication method.

Source code in zenml/models/service_connector_models.py
class AuthenticationMethodModel(BaseModel):
    """Authentication method specification.

    Describes the schema for the configuration and secrets that need to be
    provided to configure an authentication method.
    """

    name: str = Field(
        title="User readable name for the authentication method.",
    )
    auth_method: str = Field(
        title="The name of the authentication method.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    description: str = Field(
        default="",
        title="A description of the authentication method.",
    )
    config_schema: Dict[str, Any] = Field(
        default_factory=dict,
        title="The JSON schema of the configuration for this authentication "
        "method.",
    )
    min_expiration_seconds: Optional[int] = Field(
        default=None,
        title="The minimum number of seconds that the authentication "
        "session can be configured to be valid for. Set to None for "
        "authentication sessions and long-lived credentials that don't expire.",
    )
    max_expiration_seconds: Optional[int] = Field(
        default=None,
        title="The maximum number of seconds that the authentication "
        "session can be configured to be valid for. Set to None for "
        "authentication sessions and long-lived credentials that don't expire.",
    )
    default_expiration_seconds: Optional[int] = Field(
        default=None,
        title="The default number of seconds that the authentication "
        "session is valid for. Set to None for authentication sessions and "
        "long-lived credentials that don't expire.",
    )
    _config_class: Optional[Type[BaseModel]] = None

    def __init__(
        self, config_class: Optional[Type[BaseModel]] = None, **values: Any
    ):
        """Initialize the authentication method.

        Args:
            config_class: The configuration class for the authentication
                method.
            **values: The data to initialize the authentication method with.
        """
        if config_class:
            values["config_schema"] = json.loads(config_class.schema_json())
        super().__init__(**values)
        self._config_class = config_class

    @property
    def config_class(self) -> Optional[Type[BaseModel]]:
        """Get the configuration class for the authentication method.

        Returns:
            The configuration class for the authentication method.
        """
        return