Skip to content

Models

zenml.models special

Pydantic models for the various concepts in ZenML.

artifact_models

Models representing artifacts.

ArtifactBaseModel (BaseModel) pydantic-model

Base model for artifacts.

Source code in zenml/models/artifact_models.py
class ArtifactBaseModel(BaseModel):
    """Base model for artifacts."""

    name: str = Field(
        title="Name of the output in the parent step.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    artifact_store_id: Optional[UUID] = Field(
        title="ID of the artifact store in which this artifact is stored.",
        default=None,
    )
    type: ArtifactType = Field(title="Type of the artifact.")
    uri: str = Field(
        title="URI of the artifact.", max_length=STR_FIELD_MAX_LENGTH
    )
    materializer: Source = Field(
        title="Materializer class to use for this artifact.",
    )
    data_type: Source = Field(
        title="Data type of the artifact.",
    )
    visualizations: Optional[List[VisualizationModel]] = Field(
        default=None, title="Visualizations of the artifact."
    )

    _convert_source = convert_source_validator("materializer", "data_type")

ArtifactFilterModel (WorkspaceScopedFilterModel) pydantic-model

Model to enable advanced filtering of all Artifacts.

Source code in zenml/models/artifact_models.py
class ArtifactFilterModel(WorkspaceScopedFilterModel):
    """Model to enable advanced filtering of all Artifacts."""

    # `only_unused` refers to a property of the artifacts relationship
    #  rather than a field in the db, hence it needs to be handled
    #  explicitly
    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *WorkspaceScopedFilterModel.FILTER_EXCLUDE_FIELDS,
        "only_unused",
    ]

    name: Optional[str] = Field(
        default=None,
        description="Name of the artifact",
    )
    uri: Optional[str] = Field(
        default=None,
        description="Uri of the artifact",
    )
    materializer: Optional[str] = Field(
        default=None,
        description="Materializer used to produce the artifact",
    )
    type: Optional[str] = Field(
        default=None,
        description="Type of the artifact",
    )
    data_type: Optional[str] = Field(
        default=None,
        description="Datatype of the artifact",
    )
    artifact_store_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Artifact store for this artifact"
    )
    workspace_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Workspace for this artifact"
    )
    user_id: Optional[Union[UUID, str]] = Field(
        default=None, description="User that produced this artifact"
    )
    only_unused: Optional[bool] = Field(
        default=False, description="Filter only for unused artifacts"
    )
artifact_store_id: Union[uuid.UUID, str] pydantic-field

Artifact store for this artifact

data_type: str pydantic-field

Datatype of the artifact

materializer: str pydantic-field

Materializer used to produce the artifact

name: str pydantic-field

Name of the artifact

only_unused: bool pydantic-field

Filter only for unused artifacts

type: str pydantic-field

Type of the artifact

uri: str pydantic-field

Uri of the artifact

user_id: Union[uuid.UUID, str] pydantic-field

User that produced this artifact

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace for this artifact

ArtifactRequestModel (ArtifactBaseModel, WorkspaceScopedRequestModel) pydantic-model

Request model for artifacts.

Source code in zenml/models/artifact_models.py
class ArtifactRequestModel(ArtifactBaseModel, WorkspaceScopedRequestModel):
    """Request model for artifacts."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

ArtifactResponseModel (ArtifactBaseModel, WorkspaceScopedResponseModel) pydantic-model

Response model for artifacts.

Source code in zenml/models/artifact_models.py
class ArtifactResponseModel(ArtifactBaseModel, WorkspaceScopedResponseModel):
    """Response model for artifacts."""

    producer_step_run_id: Optional[UUID] = Field(
        title="ID of the step run that produced this artifact.",
        default=None,
    )
    metadata: Dict[str, "RunMetadataResponseModel"] = Field(
        default={}, title="Metadata of the artifact."
    )

    @property
    def step(self) -> "StepRunResponseModel":
        """Get the step that produced this artifact.

        Returns:
            The step that produced this artifact.
        """
        from zenml.utils.artifact_utils import get_producer_step_of_artifact

        return get_producer_step_of_artifact(self)

    @property
    def run(self) -> "PipelineRunResponseModel":
        """Get the pipeline run that produced this artifact.

        Returns:
            The pipeline run that produced this artifact.
        """
        return self.step.run

    def load(self) -> Any:
        """Materializes (loads) the data stored in this artifact.

        Returns:
            The materialized data.
        """
        from zenml.utils.artifact_utils import load_artifact

        return load_artifact(self)

    def read(self) -> Any:
        """(Deprecated) Materializes (loads) the data stored in this artifact.

        Returns:
            The materialized data.
        """
        logger.warning(
            "`artifact.read()` is deprecated and will be removed in a future "
            "release. Please use `artifact.load()` instead."
        )
        return self.load()

    def visualize(self, title: Optional[str] = None) -> None:
        """Visualize the artifact in notebook environments.

        Args:
            title: Optional title to show before the visualizations.
        """
        from zenml.utils.visualization_utils import visualize_artifact

        visualize_artifact(self, title=title)
run: PipelineRunResponseModel property readonly

Get the pipeline run that produced this artifact.

Returns:

Type Description
PipelineRunResponseModel

The pipeline run that produced this artifact.

step: StepRunResponseModel property readonly

Get the step that produced this artifact.

Returns:

Type Description
StepRunResponseModel

The step that produced this artifact.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

load(self)

Materializes (loads) the data stored in this artifact.

Returns:

Type Description
Any

The materialized data.

Source code in zenml/models/artifact_models.py
def load(self) -> Any:
    """Materializes (loads) the data stored in this artifact.

    Returns:
        The materialized data.
    """
    from zenml.utils.artifact_utils import load_artifact

    return load_artifact(self)
read(self)

(Deprecated) Materializes (loads) the data stored in this artifact.

Returns:

Type Description
Any

The materialized data.

Source code in zenml/models/artifact_models.py
def read(self) -> Any:
    """(Deprecated) Materializes (loads) the data stored in this artifact.

    Returns:
        The materialized data.
    """
    logger.warning(
        "`artifact.read()` is deprecated and will be removed in a future "
        "release. Please use `artifact.load()` instead."
    )
    return self.load()
visualize(self, title=None)

Visualize the artifact in notebook environments.

Parameters:

Name Type Description Default
title Optional[str]

Optional title to show before the visualizations.

None
Source code in zenml/models/artifact_models.py
def visualize(self, title: Optional[str] = None) -> None:
    """Visualize the artifact in notebook environments.

    Args:
        title: Optional title to show before the visualizations.
    """
    from zenml.utils.visualization_utils import visualize_artifact

    visualize_artifact(self, title=title)

base_models

Base domain model definitions.

BaseRequestModel (BaseZenModel) pydantic-model

Base request model.

Used as a base class for all request models.

Source code in zenml/models/base_models.py
class BaseRequestModel(BaseZenModel):
    """Base request model.

    Used as a base class for all request models.
    """
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

BaseResponseModel (BaseZenModel) pydantic-model

Base domain model.

Used as a base class for all domain models that have the following common characteristics:

  • are uniquely identified by a UUID
  • have a creation timestamp and a last modified timestamp
Source code in zenml/models/base_models.py
class BaseResponseModel(BaseZenModel):
    """Base domain model.

    Used as a base class for all domain models that have the following common
    characteristics:

      * are uniquely identified by a UUID
      * have a creation timestamp and a last modified timestamp
    """

    id: UUID = Field(title="The unique resource id.")

    created: datetime = Field(title="Time when this resource was created.")
    updated: datetime = Field(
        title="Time when this resource was last updated."
    )

    def __hash__(self) -> int:
        """Implementation of hash magic method.

        Returns:
            Hash of the UUID.
        """
        return hash((type(self),) + tuple([self.id]))

    def __eq__(self, other: Any) -> bool:
        """Implementation of equality magic method.

        Args:
            other: The other object to compare to.

        Returns:
            True if the other object is of the same type and has the same UUID.
        """
        if isinstance(other, BaseResponseModel):
            return self.id == other.id
        else:
            return False

    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Fetches the analytics metadata for base response models.

        Returns:
            The analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        metadata["entity_id"] = self.id
        return metadata
__eq__(self, other) special

Implementation of equality magic method.

Parameters:

Name Type Description Default
other Any

The other object to compare to.

required

Returns:

Type Description
bool

True if the other object is of the same type and has the same UUID.

Source code in zenml/models/base_models.py
def __eq__(self, other: Any) -> bool:
    """Implementation of equality magic method.

    Args:
        other: The other object to compare to.

    Returns:
        True if the other object is of the same type and has the same UUID.
    """
    if isinstance(other, BaseResponseModel):
        return self.id == other.id
    else:
        return False
__hash__(self) special

Implementation of hash magic method.

Returns:

Type Description
int

Hash of the UUID.

Source code in zenml/models/base_models.py
def __hash__(self) -> int:
    """Implementation of hash magic method.

    Returns:
        Hash of the UUID.
    """
    return hash((type(self),) + tuple([self.id]))
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_analytics_metadata(self)

Fetches the analytics metadata for base response models.

Returns:

Type Description
Dict[str, Any]

The analytics metadata.

Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Fetches the analytics metadata for base response models.

    Returns:
        The analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    metadata["entity_id"] = self.id
    return metadata

BaseZenModel (AnalyticsTrackedModelMixin) pydantic-model

Base model class for all ZenML models.

This class is used as a base class for all ZenML models. It provides functionality for tracking analytics events and proper encoding of SecretStr values.

Source code in zenml/models/base_models.py
class BaseZenModel(AnalyticsTrackedModelMixin):
    """Base model class for all ZenML models.

    This class is used as a base class for all ZenML models. It provides
    functionality for tracking analytics events and proper encoding of
    SecretStr values.
    """

    class Config:
        """Pydantic configuration class."""

        # This is needed to allow the REST client and server to unpack SecretStr
        # values correctly.
        json_encoders = {
            SecretStr: lambda v: v.get_secret_value()
            if v is not None
            else None
        }

        # Allow extras on all models to support forwards and backwards
        # compatibility (e.g. new fields in newer versions of ZenML servers
        # are allowed to be present in older versions of ZenML clients and
        # vice versa).
        extra = "allow"
Config

Pydantic configuration class.

Source code in zenml/models/base_models.py
class Config:
    """Pydantic configuration class."""

    # This is needed to allow the REST client and server to unpack SecretStr
    # values correctly.
    json_encoders = {
        SecretStr: lambda v: v.get_secret_value()
        if v is not None
        else None
    }

    # Allow extras on all models to support forwards and backwards
    # compatibility (e.g. new fields in newer versions of ZenML servers
    # are allowed to be present in older versions of ZenML clients and
    # vice versa).
    extra = "allow"
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

ShareableRequestModel (WorkspaceScopedRequestModel) pydantic-model

Base shareable workspace-scoped domain model.

Used as a base class for all domain models that are workspace-scoped and are shareable.

Source code in zenml/models/base_models.py
class ShareableRequestModel(WorkspaceScopedRequestModel):
    """Base shareable workspace-scoped domain model.

    Used as a base class for all domain models that are workspace-scoped and are
    shareable.
    """

    is_shared: bool = Field(
        default=False,
        title=(
            "Flag describing if this resource is shared with other users in "
            "the same workspace."
        ),
    )

    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Fetches the analytics metadata for workspace scoped models.

        Returns:
            The analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        metadata["is_shared"] = self.is_shared
        return metadata
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_analytics_metadata(self)

Fetches the analytics metadata for workspace scoped models.

Returns:

Type Description
Dict[str, Any]

The analytics metadata.

Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Fetches the analytics metadata for workspace scoped models.

    Returns:
        The analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    metadata["is_shared"] = self.is_shared
    return metadata

ShareableResponseModel (WorkspaceScopedResponseModel) pydantic-model

Base shareable workspace-scoped domain model.

Used as a base class for all domain models that are workspace-scoped and are shareable.

Source code in zenml/models/base_models.py
class ShareableResponseModel(WorkspaceScopedResponseModel):
    """Base shareable workspace-scoped domain model.

    Used as a base class for all domain models that are workspace-scoped and are
    shareable.
    """

    is_shared: bool = Field(
        title=(
            "Flag describing if this resource is shared with other users in "
            "the same workspace."
        ),
    )

    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Fetches the analytics metadata for workspace scoped models.

        Returns:
            The analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        metadata["is_shared"] = self.is_shared
        return metadata
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_analytics_metadata(self)

Fetches the analytics metadata for workspace scoped models.

Returns:

Type Description
Dict[str, Any]

The analytics metadata.

Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Fetches the analytics metadata for workspace scoped models.

    Returns:
        The analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    metadata["is_shared"] = self.is_shared
    return metadata

UserScopedRequestModel (BaseRequestModel) pydantic-model

Base user-owned request model.

Used as a base class for all domain models that are "owned" by a user.

Source code in zenml/models/base_models.py
class UserScopedRequestModel(BaseRequestModel):
    """Base user-owned request model.

    Used as a base class for all domain models that are "owned" by a user.
    """

    user: UUID = Field(title="The id of the user that created this resource.")

    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Fetches the analytics metadata for user scoped models.

        Returns:
            The analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        metadata["user_id"] = self.user
        return metadata
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_analytics_metadata(self)

Fetches the analytics metadata for user scoped models.

Returns:

Type Description
Dict[str, Any]

The analytics metadata.

Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Fetches the analytics metadata for user scoped models.

    Returns:
        The analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    metadata["user_id"] = self.user
    return metadata

UserScopedResponseModel (BaseResponseModel) pydantic-model

Base user-owned domain model.

Used as a base class for all domain models that are "owned" by a user.

Source code in zenml/models/base_models.py
class UserScopedResponseModel(BaseResponseModel):
    """Base user-owned domain model.

    Used as a base class for all domain models that are "owned" by a user.
    """

    user: Union["UserResponseModel", None] = Field(
        title="The user that created this resource.", nullable=True
    )

    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Fetches the analytics metadata for user scoped models.

        Returns:
            The analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        if self.user is not None:
            metadata["user_id"] = self.user.id
        return metadata
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_analytics_metadata(self)

Fetches the analytics metadata for user scoped models.

Returns:

Type Description
Dict[str, Any]

The analytics metadata.

Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Fetches the analytics metadata for user scoped models.

    Returns:
        The analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    if self.user is not None:
        metadata["user_id"] = self.user.id
    return metadata

WorkspaceScopedRequestModel (UserScopedRequestModel) pydantic-model

Base workspace-scoped request domain model.

Used as a base class for all domain models that are workspace-scoped.

Source code in zenml/models/base_models.py
class WorkspaceScopedRequestModel(UserScopedRequestModel):
    """Base workspace-scoped request domain model.

    Used as a base class for all domain models that are workspace-scoped.
    """

    workspace: UUID = Field(
        title="The workspace to which this resource belongs."
    )

    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Fetches the analytics metadata for workspace scoped models.

        Returns:
            The analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        metadata["workspace_id"] = self.workspace
        return metadata
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_analytics_metadata(self)

Fetches the analytics metadata for workspace scoped models.

Returns:

Type Description
Dict[str, Any]

The analytics metadata.

Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Fetches the analytics metadata for workspace scoped models.

    Returns:
        The analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    metadata["workspace_id"] = self.workspace
    return metadata

WorkspaceScopedResponseModel (UserScopedResponseModel) pydantic-model

Base workspace-scoped domain model.

Used as a base class for all domain models that are workspace-scoped.

Source code in zenml/models/base_models.py
class WorkspaceScopedResponseModel(UserScopedResponseModel):
    """Base workspace-scoped domain model.

    Used as a base class for all domain models that are workspace-scoped.
    """

    workspace: "WorkspaceResponseModel" = Field(
        title="The workspace of this resource."
    )

    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Fetches the analytics metadata for workspace scoped models.

        Returns:
            The analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        metadata["workspace_id"] = self.workspace.id
        return metadata
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_analytics_metadata(self)

Fetches the analytics metadata for workspace scoped models.

Returns:

Type Description
Dict[str, Any]

The analytics metadata.

Source code in zenml/models/base_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Fetches the analytics metadata for workspace scoped models.

    Returns:
        The analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    metadata["workspace_id"] = self.workspace.id
    return metadata

update_model(_cls)

Base update model.

This is used as a decorator on top of request models to convert them into update models where the fields are optional and can be set to None.

Parameters:

Name Type Description Default
_cls Type[~T]

The class to decorate

required

Returns:

Type Description
Type[~T]

The decorated class.

Source code in zenml/models/base_models.py
def update_model(_cls: Type[T]) -> Type[T]:
    """Base update model.

    This is used as a decorator on top of request models to convert them
    into update models where the fields are optional and can be set to None.

    Args:
        _cls: The class to decorate

    Returns:
        The decorated class.
    """
    for _, value in _cls.__fields__.items():
        value.required = False
        value.allow_none = True

    return _cls

code_repository_models

Models representing code repositories.

CodeReferenceBaseModel (BaseModel) pydantic-model

Base model for code references.

Source code in zenml/models/code_repository_models.py
class CodeReferenceBaseModel(BaseModel):
    """Base model for code references."""

    commit: str = Field(description="The commit of the code reference.")
    subdirectory: str = Field(
        description="The subdirectory of the code reference."
    )
commit: str pydantic-field required

The commit of the code reference.

subdirectory: str pydantic-field required

The subdirectory of the code reference.

CodeReferenceRequestModel (CodeReferenceBaseModel, BaseRequestModel) pydantic-model

Code reference request model.

Source code in zenml/models/code_repository_models.py
class CodeReferenceRequestModel(CodeReferenceBaseModel, BaseRequestModel):
    """Code reference request model."""

    code_repository: UUID = Field(
        description="The repository of the code reference."
    )
code_repository: UUID pydantic-field required

The repository of the code reference.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

CodeReferenceResponseModel (CodeReferenceBaseModel, BaseResponseModel) pydantic-model

Code reference response model.

Source code in zenml/models/code_repository_models.py
class CodeReferenceResponseModel(CodeReferenceBaseModel, BaseResponseModel):
    """Code reference response model."""

    code_repository: CodeRepositoryResponseModel = Field(
        description="The repository of the code reference."
    )
code_repository: CodeRepositoryResponseModel pydantic-field required

The repository of the code reference.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

CodeRepositoryBaseModel (BaseModel) pydantic-model

Base model for code repositories.

Source code in zenml/models/code_repository_models.py
class CodeRepositoryBaseModel(BaseModel):
    """Base model for code repositories."""

    name: str = Field(
        title="The name of the code repository.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    config: Dict[str, Any] = Field(
        description="Configuration for the code repository."
    )
    source: Source = Field(description="The code repository source.")
    logo_url: Optional[str] = Field(
        description="Optional URL of a logo (png, jpg or svg) for the code repository."
    )
    description: Optional[str] = Field(
        description="Code repository description.",
        max_length=TEXT_FIELD_MAX_LENGTH,
    )
config: Dict[str, Any] pydantic-field required

Configuration for the code repository.

description: ConstrainedStrValue pydantic-field

Code repository description.

logo_url: str pydantic-field

Optional URL of a logo (png, jpg or svg) for the code repository.

source: Source pydantic-field required

The code repository source.

CodeRepositoryFilterModel (WorkspaceScopedFilterModel) pydantic-model

Model to enable advanced filtering of all code repositories.

Source code in zenml/models/code_repository_models.py
class CodeRepositoryFilterModel(WorkspaceScopedFilterModel):
    """Model to enable advanced filtering of all code repositories."""

    name: Optional[str] = Field(
        description="Name of the code repository.",
    )
    workspace_id: Union[UUID, str, None] = Field(
        description="Workspace of the code repository."
    )
    user_id: Union[UUID, str, None] = Field(
        description="User that created the code repository."
    )
name: str pydantic-field

Name of the code repository.

user_id: Union[uuid.UUID, str] pydantic-field

User that created the code repository.

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace of the code repository.

CodeRepositoryRequestModel (CodeRepositoryBaseModel, WorkspaceScopedRequestModel) pydantic-model

Code repository request model.

Source code in zenml/models/code_repository_models.py
class CodeRepositoryRequestModel(
    CodeRepositoryBaseModel, WorkspaceScopedRequestModel
):
    """Code repository request model."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

CodeRepositoryResponseModel (CodeRepositoryBaseModel, WorkspaceScopedResponseModel) pydantic-model

Code repository response model.

Source code in zenml/models/code_repository_models.py
class CodeRepositoryResponseModel(
    CodeRepositoryBaseModel, WorkspaceScopedResponseModel
):
    """Code repository response model."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

CodeRepositoryUpdateModel (CodeRepositoryRequestModel) pydantic-model

Code repository update model.

Source code in zenml/models/code_repository_models.py
class CodeRepositoryUpdateModel(CodeRepositoryRequestModel):
    """Code repository update model."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

component_models

Models representing stack components.

ComponentBaseModel (BaseModel) pydantic-model

Base model for stack components.

Source code in zenml/models/component_models.py
class ComponentBaseModel(BaseModel):
    """Base model for stack components."""

    name: str = Field(
        title="The name of the stack component.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

    type: StackComponentType = Field(
        title="The type of the stack component.",
    )

    flavor: str = Field(
        title="The flavor of the stack component.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

    configuration: Dict[str, Any] = Field(
        title="The stack component configuration.",
    )

    connector_resource_id: Optional[str] = Field(
        default=None,
        description="The ID of a specific resource instance to "
        "gain access to through the connector",
    )

    labels: Optional[Dict[str, Any]] = Field(
        default=None,
        title="The stack component labels.",
    )
connector_resource_id: str pydantic-field

The ID of a specific resource instance to gain access to through the connector

ComponentFilterModel (ShareableWorkspaceScopedFilterModel) pydantic-model

Model to enable advanced filtering of all ComponentModels.

The Component Model needs additional scoping. As such the _scope_user field can be set to the user that is doing the filtering. The generate_filter() method of the baseclass is overwritten to include the scoping.

Source code in zenml/models/component_models.py
class ComponentFilterModel(ShareableWorkspaceScopedFilterModel):
    """Model to enable advanced filtering of all ComponentModels.

    The Component Model needs additional scoping. As such the `_scope_user`
    field can be set to the user that is doing the filtering. The
    `generate_filter()` method of the baseclass is overwritten to include the
    scoping.
    """

    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *ShareableWorkspaceScopedFilterModel.FILTER_EXCLUDE_FIELDS,
        "scope_type",
    ]
    CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *ShareableWorkspaceScopedFilterModel.CLI_EXCLUDE_FIELDS,
        "scope_type",
    ]
    scope_type: Optional[str] = Field(
        default=None,
        description="The type to scope this query to.",
    )

    is_shared: Optional[Union[bool, str]] = Field(
        default=None, description="If the stack is shared or private"
    )
    name: Optional[str] = Field(
        default=None,
        description="Name of the stack component",
    )
    flavor: Optional[str] = Field(
        default=None,
        description="Flavor of the stack component",
    )
    type: Optional[str] = Field(
        default=None,
        description="Type of the stack component",
    )
    workspace_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Workspace of the stack component"
    )
    user_id: Optional[Union[UUID, str]] = Field(
        default=None, description="User of the stack"
    )
    connector_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Connector linked to the stack component"
    )

    def set_scope_type(self, component_type: str) -> None:
        """Set the type of component on which to perform the filtering to scope the response.

        Args:
            component_type: The type of component to scope the query to.
        """
        self.scope_type = component_type

    def generate_filter(
        self, table: Type["SQLModel"]
    ) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
        """Generate the filter for the query.

        Stack components can be scoped by type to narrow the search.

        Args:
            table: The Table that is being queried from.

        Returns:
            The filter expression for the query.
        """
        from sqlalchemy import and_

        base_filter = super().generate_filter(table)
        if self.scope_type:
            type_filter = getattr(table, "type") == self.scope_type
            return and_(base_filter, type_filter)
        return base_filter
connector_id: Union[uuid.UUID, str] pydantic-field

Connector linked to the stack component

flavor: str pydantic-field

Flavor of the stack component

is_shared: Union[bool, str] pydantic-field

If the stack is shared or private

name: str pydantic-field

Name of the stack component

scope_type: str pydantic-field

The type to scope this query to.

type: str pydantic-field

Type of the stack component

user_id: Union[uuid.UUID, str] pydantic-field

User of the stack

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace of the stack component

generate_filter(self, table)

Generate the filter for the query.

Stack components can be scoped by type to narrow the search.

Parameters:

Name Type Description Default
table Type[SQLModel]

The Table that is being queried from.

required

Returns:

Type Description
Union[BinaryExpression[Any], BooleanClauseList[Any]]

The filter expression for the query.

Source code in zenml/models/component_models.py
def generate_filter(
    self, table: Type["SQLModel"]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
    """Generate the filter for the query.

    Stack components can be scoped by type to narrow the search.

    Args:
        table: The Table that is being queried from.

    Returns:
        The filter expression for the query.
    """
    from sqlalchemy import and_

    base_filter = super().generate_filter(table)
    if self.scope_type:
        type_filter = getattr(table, "type") == self.scope_type
        return and_(base_filter, type_filter)
    return base_filter
set_scope_type(self, component_type)

Set the type of component on which to perform the filtering to scope the response.

Parameters:

Name Type Description Default
component_type str

The type of component to scope the query to.

required
Source code in zenml/models/component_models.py
def set_scope_type(self, component_type: str) -> None:
    """Set the type of component on which to perform the filtering to scope the response.

    Args:
        component_type: The type of component to scope the query to.
    """
    self.scope_type = component_type

ComponentRequestModel (ComponentBaseModel, ShareableRequestModel) pydantic-model

Request model for stack components.

Source code in zenml/models/component_models.py
class ComponentRequestModel(ComponentBaseModel, ShareableRequestModel):
    """Request model for stack components."""

    ANALYTICS_FIELDS: ClassVar[List[str]] = ["type", "flavor"]

    connector: Optional[UUID] = Field(
        default=None,
        title="The service connector linked to this stack component.",
    )

    @validator("name")
    def name_cant_be_a_secret_reference(cls, name: str) -> str:
        """Validator to ensure that the given name is not a secret reference.

        Args:
            name: The name to validate.

        Returns:
            The name if it is not a secret reference.

        Raises:
            ValueError: If the name is a secret reference.
        """
        if secret_utils.is_secret_reference(name):
            raise ValueError(
                "Passing the `name` attribute of a stack component as a "
                "secret reference is not allowed."
            )
        return name
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

name_cant_be_a_secret_reference(name) classmethod

Validator to ensure that the given name is not a secret reference.

Parameters:

Name Type Description Default
name str

The name to validate.

required

Returns:

Type Description
str

The name if it is not a secret reference.

Exceptions:

Type Description
ValueError

If the name is a secret reference.

Source code in zenml/models/component_models.py
@validator("name")
def name_cant_be_a_secret_reference(cls, name: str) -> str:
    """Validator to ensure that the given name is not a secret reference.

    Args:
        name: The name to validate.

    Returns:
        The name if it is not a secret reference.

    Raises:
        ValueError: If the name is a secret reference.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )
    return name

ComponentResponseModel (ComponentBaseModel, ShareableResponseModel) pydantic-model

Response model for stack components.

Source code in zenml/models/component_models.py
class ComponentResponseModel(ComponentBaseModel, ShareableResponseModel):
    """Response model for stack components."""

    ANALYTICS_FIELDS: ClassVar[List[str]] = ["type", "flavor"]

    connector: Optional["ServiceConnectorResponseModel"] = Field(
        default=None,
        title="The service connector linked to this stack component.",
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

ComponentUpdateModel (ComponentRequestModel) pydantic-model

Update model for stack components.

Source code in zenml/models/component_models.py
class ComponentUpdateModel(ComponentRequestModel):
    """Update model for stack components."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

constants

Constants used by ZenML domain models.

filter_models

Base filter model definitions.

BaseFilterModel (BaseModel) pydantic-model

Class to unify all filter, paginate and sort request parameters.

This Model allows fine-grained filtering, sorting and pagination of resources.

Usage example for subclasses of this class:

ResourceListModel(
    name="contains:default",
    workspace="default"
    count_steps="gte:5"
    sort_by="created",
    page=2,
    size=50
)
Source code in zenml/models/filter_models.py
class BaseFilterModel(BaseModel):
    """Class to unify all filter, paginate and sort request parameters.

    This Model allows fine-grained filtering, sorting and pagination of
    resources.

    Usage example for subclasses of this class:
    ```
    ResourceListModel(
        name="contains:default",
        workspace="default"
        count_steps="gte:5"
        sort_by="created",
        page=2,
        size=50
    )
    ```
    """

    # List of fields that cannot be used as filters.
    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        "sort_by",
        "page",
        "size",
        "logical_operator",
    ]

    # List of fields that are not even mentioned as options in the CLI.
    CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = []

    sort_by: str = Field(
        default="created", description="Which column to sort by."
    )
    logical_operator: LogicalOperators = Field(
        default=LogicalOperators.AND,
        description="Which logical operator to use between all filters "
        "['and', 'or']",
    )
    page: int = Field(
        default=PAGINATION_STARTING_PAGE, ge=1, description="Page number"
    )
    size: int = Field(
        default=PAGE_SIZE_DEFAULT,
        ge=1,
        le=PAGE_SIZE_MAXIMUM,
        description="Page size",
    )

    id: Optional[Union[UUID, str]] = Field(
        default=None, description="Id for this resource"
    )
    created: Optional[Union[datetime, str]] = Field(
        default=None, description="Created"
    )
    updated: Optional[Union[datetime, str]] = Field(
        default=None, description="Updated"
    )

    @validator("sort_by", pre=True)
    def validate_sort_by(cls, v: str) -> str:
        """Validate that the sort_column is a valid column with a valid operand.

        Args:
            v: The sort_by field value.

        Returns:
            The validated sort_by field value.

        Raises:
            ValidationError: If the sort_by field is not a string.
            ValueError: If the resource can't be sorted by this field.
        """
        # Somehow pydantic allows you to pass in int values, which will be
        #  interpreted as string, however within the validator they are still
        #  integers, which don't have a .split() method
        if not isinstance(v, str):
            raise ValidationError(
                f"str type expected for the sort_by field. "
                f"Received a {type(v)}"
            )
        column = v
        split_value = v.split(":", 1)
        if len(split_value) == 2:
            column = split_value[1]

            if split_value[0] not in SorterOps.values():
                logger.warning(
                    "Invalid operand used for column sorting. "
                    "Only the following operands are supported `%s`. "
                    "Defaulting to 'asc' on column `%s`.",
                    SorterOps.values(),
                    column,
                )
                v = column

        if column in cls.FILTER_EXCLUDE_FIELDS:
            raise ValueError(
                f"This resource can not be sorted by this field: '{v}'"
            )
        elif column in cls.__fields__:
            return v
        else:
            raise ValueError(
                "You can only sort by valid fields of this resource"
            )

    @root_validator(pre=True)
    def filter_ops(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        """Parse incoming filters to ensure all filters are legal.

        Args:
            values: The values of the class.

        Returns:
            The values of the class.
        """
        cls._generate_filter_list(values)
        return values

    @property
    def list_of_filters(self) -> List[Filter]:
        """Converts the class variables into a list of usable Filter Models.

        Returns:
            A list of Filter models.
        """
        return self._generate_filter_list(
            {key: getattr(self, key) for key in self.__fields__}
        )

    @property
    def sorting_params(self) -> Tuple[str, SorterOps]:
        """Converts the class variables into a list of usable Filter Models.

        Returns:
            A tuple of the column to sort by and the sorting operand.
        """
        column = self.sort_by
        # The default sorting operand is asc
        operator = SorterOps.ASCENDING

        # Check if user explicitly set an operand
        split_value = self.sort_by.split(":", 1)
        if len(split_value) == 2:
            column = split_value[1]
            operator = SorterOps(split_value[0])

        return column, operator

    @classmethod
    def _generate_filter_list(cls, values: Dict[str, Any]) -> List[Filter]:
        """Create a list of filters from a (column, value) dictionary.

        Args:
            values: A dictionary of column names and values to filter on.

        Returns:
            A list of filters.
        """
        list_of_filters: List[Filter] = []

        for key, value in values.items():
            # Ignore excluded filters
            if key in cls.FILTER_EXCLUDE_FIELDS:
                continue

            # Skip filtering for None values
            if value is None:
                continue

            # Determine the operator and filter value
            value, operator = cls._resolve_operator(value)

            # Define the filter
            filter = cls._define_filter(
                column=key, value=value, operator=operator
            )
            list_of_filters.append(filter)

        return list_of_filters

    @staticmethod
    def _resolve_operator(value: Any) -> Tuple[Any, GenericFilterOps]:
        """Determine the operator and filter value from a user-provided value.

        If the user-provided value is a string of the form "operator:value",
        then the operator is extracted and the value is returned. Otherwise,
        `GenericFilterOps.EQUALS` is used as default operator and the value
        is returned as-is.

        Args:
            value: The user-provided value.

        Returns:
            A tuple of the filter value and the operator.
        """
        operator = GenericFilterOps.EQUALS  # Default operator
        if isinstance(value, str):
            split_value = value.split(":", 1)
            if (
                len(split_value) == 2
                and split_value[0] in GenericFilterOps.values()
            ):
                value = split_value[1]
                operator = GenericFilterOps(split_value[0])
        return value, operator

    @classmethod
    def _define_filter(
        cls, column: str, value: Any, operator: GenericFilterOps
    ) -> Filter:
        """Define a filter for a given column.

        Args:
            column: The column to filter on.
            value: The value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.
        """
        # Create datetime filters
        if cls.is_datetime_field(column):
            return cls._define_datetime_filter(
                column=column,
                value=value,
                operator=operator,
            )

        # Create UUID filters
        if cls.is_uuid_field(column):
            return cls._define_uuid_filter(
                column=column,
                value=value,
                operator=operator,
            )

        # Create int filters
        if cls.is_int_field(column):
            return NumericFilter(
                operation=GenericFilterOps(operator),
                column=column,
                value=int(value),
            )

        # Create bool filters
        if cls.is_bool_field(column):
            return cls._define_bool_filter(
                column=column,
                value=value,
                operator=operator,
            )

        # Create str filters
        if cls.is_str_field(column):
            return StrFilter(
                operation=GenericFilterOps(operator),
                column=column,
                value=value,
            )

        # Handle unsupported datatypes
        logger.warning(
            f"The Datatype {cls.__fields__[column].type_} might not be "
            "supported for filtering. Defaulting to a string filter."
        )
        return StrFilter(
            operation=GenericFilterOps(operator),
            column=column,
            value=str(value),
        )

    @classmethod
    def is_datetime_field(cls, k: str) -> bool:
        """Checks if it's a datetime field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a datetime field, False otherwise.
        """
        return (
            issubclass(datetime, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is datetime
        )

    @classmethod
    def is_uuid_field(cls, k: str) -> bool:
        """Checks if it's a uuid field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a uuid field, False otherwise.
        """
        return (
            issubclass(UUID, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is UUID
        )

    @classmethod
    def is_int_field(cls, k: str) -> bool:
        """Checks if it's a int field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a int field, False otherwise.
        """
        return (
            issubclass(int, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is int
        )

    @classmethod
    def is_bool_field(cls, k: str) -> bool:
        """Checks if it's a bool field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a bool field, False otherwise.
        """
        return (
            issubclass(bool, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is bool
        )

    @classmethod
    def is_str_field(cls, k: str) -> bool:
        """Checks if it's a string field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a string field, False otherwise.
        """
        return (
            issubclass(str, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is str
        )

    @classmethod
    def is_sort_by_field(cls, k: str) -> bool:
        """Checks if it's a sort by field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a sort by field, False otherwise.
        """
        return (
            issubclass(str, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ == str
        ) and k == "sort_by"

    @staticmethod
    def _define_datetime_filter(
        column: str, value: Any, operator: GenericFilterOps
    ) -> NumericFilter:
        """Define a datetime filter for a given column.

        Args:
            column: The column to filter on.
            value: The datetime value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.

        Raises:
            ValueError: If the value is not a valid datetime.
        """
        try:
            if isinstance(value, datetime):
                datetime_value = value
            else:
                datetime_value = datetime.strptime(
                    value, FILTERING_DATETIME_FORMAT
                )
        except ValueError as e:
            raise ValueError(
                "The datetime filter only works with values in the following "
                f"format: {FILTERING_DATETIME_FORMAT}"
            ) from e
        datetime_filter = NumericFilter(
            operation=GenericFilterOps(operator),
            column=column,
            value=datetime_value,
        )
        return datetime_filter

    @staticmethod
    def _define_uuid_filter(
        column: str, value: Any, operator: GenericFilterOps
    ) -> UUIDFilter:
        """Define a UUID filter for a given column.

        Args:
            column: The column to filter on.
            value: The UUID value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.

        Raises:
            ValueError: If the value is not a valid UUID.
        """
        # For equality checks, ensure that the value is a valid UUID.
        if operator == GenericFilterOps.EQUALS and not isinstance(value, UUID):
            try:
                UUID(value)
            except ValueError as e:
                raise ValueError(
                    "Invalid value passed as UUID query parameter."
                ) from e

        # Cast the value to string for further comparisons.
        value = str(value)

        # Generate the filter.
        uuid_filter = UUIDFilter(
            operation=GenericFilterOps(operator),
            column=column,
            value=value,
        )
        return uuid_filter

    @staticmethod
    def _define_bool_filter(
        column: str, value: Any, operator: GenericFilterOps
    ) -> BoolFilter:
        """Define a bool filter for a given column.

        Args:
            column: The column to filter on.
            value: The bool value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.
        """
        if GenericFilterOps(operator) != GenericFilterOps.EQUALS:
            logger.warning(
                "Boolean filters do not support any"
                "operation except for equals. Defaulting"
                "to an `equals` comparison."
            )
        return BoolFilter(
            operation=GenericFilterOps.EQUALS,
            column=column,
            value=bool(value),
        )

    @property
    def offset(self) -> int:
        """Returns the offset needed for the query on the data persistence layer.

        Returns:
            The offset for the query.
        """
        return self.size * (self.page - 1)

    def generate_filter(
        self, table: Type[SQLModel]
    ) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
        """Generate the filter for the query.

        Args:
            table: The Table that is being queried from.

        Returns:
            The filter expression for the query.

        Raises:
            RuntimeError: If a valid logical operator is not supplied.
        """
        from sqlalchemy import and_
        from sqlmodel import or_

        filters = []
        for column_filter in self.list_of_filters:
            filters.append(
                column_filter.generate_query_conditions(table=table)
            )
        if self.logical_operator == LogicalOperators.OR:
            return or_(False, *filters)
        elif self.logical_operator == LogicalOperators.AND:
            return and_(True, *filters)
        else:
            raise RuntimeError("No valid logical operator was supplied.")

    def apply_filter(
        self,
        query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
        table: Type["AnySchema"],
    ) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
        """Applies the filter to a query.

        Args:
            query: The query to which to apply the filter.
            table: The query table.

        Returns:
            The query with filter applied.
        """
        filters = self.generate_filter(table=table)

        if filters is not None:
            query = query.where(filters)

        return query
created: Union[datetime.datetime, str] pydantic-field

Created

id: Union[uuid.UUID, str] pydantic-field

Id for this resource

list_of_filters: List[zenml.models.filter_models.Filter] property readonly

Converts the class variables into a list of usable Filter Models.

Returns:

Type Description
List[zenml.models.filter_models.Filter]

A list of Filter models.

logical_operator: LogicalOperators pydantic-field

Which logical operator to use between all filters ['and', 'or']

offset: int property readonly

Returns the offset needed for the query on the data persistence layer.

Returns:

Type Description
int

The offset for the query.

page: ConstrainedIntValue pydantic-field

Page number

size: ConstrainedIntValue pydantic-field

Page size

sort_by: str pydantic-field

Which column to sort by.

sorting_params: Tuple[str, zenml.enums.SorterOps] property readonly

Converts the class variables into a list of usable Filter Models.

Returns:

Type Description
Tuple[str, zenml.enums.SorterOps]

A tuple of the column to sort by and the sorting operand.

updated: Union[datetime.datetime, str] pydantic-field

Updated

apply_filter(self, query, table)

Applies the filter to a query.

Parameters:

Name Type Description Default
query Union[Select[AnySchema], SelectOfScalar[AnySchema]]

The query to which to apply the filter.

required
table Type[AnySchema]

The query table.

required

Returns:

Type Description
Union[Select[AnySchema], SelectOfScalar[AnySchema]]

The query with filter applied.

Source code in zenml/models/filter_models.py
def apply_filter(
    self,
    query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
    table: Type["AnySchema"],
) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
    """Applies the filter to a query.

    Args:
        query: The query to which to apply the filter.
        table: The query table.

    Returns:
        The query with filter applied.
    """
    filters = self.generate_filter(table=table)

    if filters is not None:
        query = query.where(filters)

    return query
filter_ops(values) classmethod

Parse incoming filters to ensure all filters are legal.

Parameters:

Name Type Description Default
values Dict[str, Any]

The values of the class.

required

Returns:

Type Description
Dict[str, Any]

The values of the class.

Source code in zenml/models/filter_models.py
@root_validator(pre=True)
def filter_ops(cls, values: Dict[str, Any]) -> Dict[str, Any]:
    """Parse incoming filters to ensure all filters are legal.

    Args:
        values: The values of the class.

    Returns:
        The values of the class.
    """
    cls._generate_filter_list(values)
    return values
generate_filter(self, table)

Generate the filter for the query.

Parameters:

Name Type Description Default
table Type[sqlmodel.main.SQLModel]

The Table that is being queried from.

required

Returns:

Type Description
Union[BinaryExpression[Any], BooleanClauseList[Any]]

The filter expression for the query.

Exceptions:

Type Description
RuntimeError

If a valid logical operator is not supplied.

Source code in zenml/models/filter_models.py
def generate_filter(
    self, table: Type[SQLModel]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
    """Generate the filter for the query.

    Args:
        table: The Table that is being queried from.

    Returns:
        The filter expression for the query.

    Raises:
        RuntimeError: If a valid logical operator is not supplied.
    """
    from sqlalchemy import and_
    from sqlmodel import or_

    filters = []
    for column_filter in self.list_of_filters:
        filters.append(
            column_filter.generate_query_conditions(table=table)
        )
    if self.logical_operator == LogicalOperators.OR:
        return or_(False, *filters)
    elif self.logical_operator == LogicalOperators.AND:
        return and_(True, *filters)
    else:
        raise RuntimeError("No valid logical operator was supplied.")
is_bool_field(k) classmethod

Checks if it's a bool field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a bool field, False otherwise.

Source code in zenml/models/filter_models.py
@classmethod
def is_bool_field(cls, k: str) -> bool:
    """Checks if it's a bool field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a bool field, False otherwise.
    """
    return (
        issubclass(bool, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is bool
    )
is_datetime_field(k) classmethod

Checks if it's a datetime field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a datetime field, False otherwise.

Source code in zenml/models/filter_models.py
@classmethod
def is_datetime_field(cls, k: str) -> bool:
    """Checks if it's a datetime field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a datetime field, False otherwise.
    """
    return (
        issubclass(datetime, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is datetime
    )
is_int_field(k) classmethod

Checks if it's a int field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a int field, False otherwise.

Source code in zenml/models/filter_models.py
@classmethod
def is_int_field(cls, k: str) -> bool:
    """Checks if it's a int field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a int field, False otherwise.
    """
    return (
        issubclass(int, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is int
    )
is_sort_by_field(k) classmethod

Checks if it's a sort by field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a sort by field, False otherwise.

Source code in zenml/models/filter_models.py
@classmethod
def is_sort_by_field(cls, k: str) -> bool:
    """Checks if it's a sort by field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a sort by field, False otherwise.
    """
    return (
        issubclass(str, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ == str
    ) and k == "sort_by"
is_str_field(k) classmethod

Checks if it's a string field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a string field, False otherwise.

Source code in zenml/models/filter_models.py
@classmethod
def is_str_field(cls, k: str) -> bool:
    """Checks if it's a string field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a string field, False otherwise.
    """
    return (
        issubclass(str, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is str
    )
is_uuid_field(k) classmethod

Checks if it's a uuid field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a uuid field, False otherwise.

Source code in zenml/models/filter_models.py
@classmethod
def is_uuid_field(cls, k: str) -> bool:
    """Checks if it's a uuid field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a uuid field, False otherwise.
    """
    return (
        issubclass(UUID, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is UUID
    )
validate_sort_by(v) classmethod

Validate that the sort_column is a valid column with a valid operand.

Parameters:

Name Type Description Default
v str

The sort_by field value.

required

Returns:

Type Description
str

The validated sort_by field value.

Exceptions:

Type Description
ValidationError

If the sort_by field is not a string.

ValueError

If the resource can't be sorted by this field.

Source code in zenml/models/filter_models.py
@validator("sort_by", pre=True)
def validate_sort_by(cls, v: str) -> str:
    """Validate that the sort_column is a valid column with a valid operand.

    Args:
        v: The sort_by field value.

    Returns:
        The validated sort_by field value.

    Raises:
        ValidationError: If the sort_by field is not a string.
        ValueError: If the resource can't be sorted by this field.
    """
    # Somehow pydantic allows you to pass in int values, which will be
    #  interpreted as string, however within the validator they are still
    #  integers, which don't have a .split() method
    if not isinstance(v, str):
        raise ValidationError(
            f"str type expected for the sort_by field. "
            f"Received a {type(v)}"
        )
    column = v
    split_value = v.split(":", 1)
    if len(split_value) == 2:
        column = split_value[1]

        if split_value[0] not in SorterOps.values():
            logger.warning(
                "Invalid operand used for column sorting. "
                "Only the following operands are supported `%s`. "
                "Defaulting to 'asc' on column `%s`.",
                SorterOps.values(),
                column,
            )
            v = column

    if column in cls.FILTER_EXCLUDE_FIELDS:
        raise ValueError(
            f"This resource can not be sorted by this field: '{v}'"
        )
    elif column in cls.__fields__:
        return v
    else:
        raise ValueError(
            "You can only sort by valid fields of this resource"
        )

BoolFilter (Filter) pydantic-model

Filter for all Boolean fields.

Source code in zenml/models/filter_models.py
class BoolFilter(Filter):
    """Filter for all Boolean fields."""

    ALLOWED_OPS: ClassVar[List[str]] = [GenericFilterOps.EQUALS]

    def generate_query_conditions_from_column(self, column: Any) -> Any:
        """Generate query conditions for a boolean column.

        Args:
            column: The boolean column of an SQLModel table on which to filter.

        Returns:
            A list of query conditions.
        """
        return column == self.value
generate_query_conditions_from_column(self, column)

Generate query conditions for a boolean column.

Parameters:

Name Type Description Default
column Any

The boolean column of an SQLModel table on which to filter.

required

Returns:

Type Description
Any

A list of query conditions.

Source code in zenml/models/filter_models.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
    """Generate query conditions for a boolean column.

    Args:
        column: The boolean column of an SQLModel table on which to filter.

    Returns:
        A list of query conditions.
    """
    return column == self.value

Filter (BaseModel, ABC) pydantic-model

Filter for all fields.

A Filter is a combination of a column, a value that the user uses to filter on this column and an operation to use. The easiest example would be user equals aria with column=user, value=aria and the operation=equals.

All subclasses of this class will support different sets of operations. This operation set is defined in the ALLOWED_OPS class variable.

Source code in zenml/models/filter_models.py
class Filter(BaseModel, ABC):
    """Filter for all fields.

    A Filter is a combination of a column, a value that the user uses to
    filter on this column and an operation to use. The easiest example
    would be `user equals aria` with column=`user`, value=`aria` and the
    operation=`equals`.

    All subclasses of this class will support different sets of operations.
    This operation set is defined in the ALLOWED_OPS class variable.
    """

    ALLOWED_OPS: ClassVar[List[str]] = []

    operation: GenericFilterOps
    column: str
    value: Any

    @validator("operation", pre=True)
    def validate_operation(cls, op: str) -> str:
        """Validate that the operation is a valid op for the field type.

        Args:
            op: The operation of this filter.

        Returns:
            The operation if it is valid.

        Raises:
            ValueError: If the operation is not valid for this field type.
        """
        if op not in cls.ALLOWED_OPS:
            raise ValueError(
                f"This datatype can not be filtered using this operation: "
                f"'{op}'. The allowed operations are: {cls.ALLOWED_OPS}"
            )
        else:
            return op

    def generate_query_conditions(
        self,
        table: Type[SQLModel],
    ) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
        """Generate the query conditions for the database.

        This method converts the Filter class into an appropriate SQLModel
        query condition, to be used when filtering on the Database.

        Args:
            table: The SQLModel table to use for the query creation

        Returns:
            A list of conditions that will be combined using the `and` operation
        """
        column = getattr(table, self.column)
        conditions = self.generate_query_conditions_from_column(column)
        return conditions  # type:ignore[no-any-return]

    @abstractmethod
    def generate_query_conditions_from_column(self, column: Any) -> Any:
        """Generate query conditions given the corresponding database column.

        This method should be overridden by subclasses to define how each
        supported operation in `self.ALLOWED_OPS` can be used to filter the
        given column by `self.value`.

        Args:
            column: The column of an SQLModel table on which to filter.

        Returns:
            A list of query conditions.
        """
generate_query_conditions(self, table)

Generate the query conditions for the database.

This method converts the Filter class into an appropriate SQLModel query condition, to be used when filtering on the Database.

Parameters:

Name Type Description Default
table Type[sqlmodel.main.SQLModel]

The SQLModel table to use for the query creation

required

Returns:

Type Description
Union[BinaryExpression[Any], BooleanClauseList[Any]]

A list of conditions that will be combined using the and operation

Source code in zenml/models/filter_models.py
def generate_query_conditions(
    self,
    table: Type[SQLModel],
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
    """Generate the query conditions for the database.

    This method converts the Filter class into an appropriate SQLModel
    query condition, to be used when filtering on the Database.

    Args:
        table: The SQLModel table to use for the query creation

    Returns:
        A list of conditions that will be combined using the `and` operation
    """
    column = getattr(table, self.column)
    conditions = self.generate_query_conditions_from_column(column)
    return conditions  # type:ignore[no-any-return]
generate_query_conditions_from_column(self, column)

Generate query conditions given the corresponding database column.

This method should be overridden by subclasses to define how each supported operation in self.ALLOWED_OPS can be used to filter the given column by self.value.

Parameters:

Name Type Description Default
column Any

The column of an SQLModel table on which to filter.

required

Returns:

Type Description
Any

A list of query conditions.

Source code in zenml/models/filter_models.py
@abstractmethod
def generate_query_conditions_from_column(self, column: Any) -> Any:
    """Generate query conditions given the corresponding database column.

    This method should be overridden by subclasses to define how each
    supported operation in `self.ALLOWED_OPS` can be used to filter the
    given column by `self.value`.

    Args:
        column: The column of an SQLModel table on which to filter.

    Returns:
        A list of query conditions.
    """
validate_operation(op) classmethod

Validate that the operation is a valid op for the field type.

Parameters:

Name Type Description Default
op str

The operation of this filter.

required

Returns:

Type Description
str

The operation if it is valid.

Exceptions:

Type Description
ValueError

If the operation is not valid for this field type.

Source code in zenml/models/filter_models.py
@validator("operation", pre=True)
def validate_operation(cls, op: str) -> str:
    """Validate that the operation is a valid op for the field type.

    Args:
        op: The operation of this filter.

    Returns:
        The operation if it is valid.

    Raises:
        ValueError: If the operation is not valid for this field type.
    """
    if op not in cls.ALLOWED_OPS:
        raise ValueError(
            f"This datatype can not be filtered using this operation: "
            f"'{op}'. The allowed operations are: {cls.ALLOWED_OPS}"
        )
    else:
        return op

NumericFilter (Filter) pydantic-model

Filter for all numeric fields.

Source code in zenml/models/filter_models.py
class NumericFilter(Filter):
    """Filter for all numeric fields."""

    value: Union[float, datetime]

    ALLOWED_OPS: ClassVar[List[str]] = [
        GenericFilterOps.EQUALS,
        GenericFilterOps.GT,
        GenericFilterOps.GTE,
        GenericFilterOps.LT,
        GenericFilterOps.LTE,
    ]

    def generate_query_conditions_from_column(self, column: Any) -> Any:
        """Generate query conditions for a UUID column.

        Args:
            column: The UUID column of an SQLModel table on which to filter.

        Returns:
            A list of query conditions.
        """
        if self.operation == GenericFilterOps.GTE:
            return column >= self.value
        if self.operation == GenericFilterOps.GT:
            return column > self.value
        if self.operation == GenericFilterOps.LTE:
            return column <= self.value
        if self.operation == GenericFilterOps.LT:
            return column < self.value
        return column == self.value
generate_query_conditions_from_column(self, column)

Generate query conditions for a UUID column.

Parameters:

Name Type Description Default
column Any

The UUID column of an SQLModel table on which to filter.

required

Returns:

Type Description
Any

A list of query conditions.

Source code in zenml/models/filter_models.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
    """Generate query conditions for a UUID column.

    Args:
        column: The UUID column of an SQLModel table on which to filter.

    Returns:
        A list of query conditions.
    """
    if self.operation == GenericFilterOps.GTE:
        return column >= self.value
    if self.operation == GenericFilterOps.GT:
        return column > self.value
    if self.operation == GenericFilterOps.LTE:
        return column <= self.value
    if self.operation == GenericFilterOps.LT:
        return column < self.value
    return column == self.value

ShareableWorkspaceScopedFilterModel (WorkspaceScopedFilterModel) pydantic-model

Model to enable advanced scoping with workspace and user scoped shareable things.

Source code in zenml/models/filter_models.py
class ShareableWorkspaceScopedFilterModel(WorkspaceScopedFilterModel):
    """Model to enable advanced scoping with workspace and user scoped shareable things."""

    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *WorkspaceScopedFilterModel.FILTER_EXCLUDE_FIELDS,
        "scope_user",
    ]
    CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *WorkspaceScopedFilterModel.CLI_EXCLUDE_FIELDS,
        "scope_user",
    ]
    scope_user: Optional[UUID] = Field(
        default=None,
        description="The user to scope this query to.",
    )

    def set_scope_user(self, user_id: UUID) -> None:
        """Set the user that is performing the filtering to scope the response.

        Args:
            user_id: The user ID to scope the response to.
        """
        self.scope_user = user_id

    def apply_filter(
        self,
        query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
        table: Type["AnySchema"],
    ) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
        """Applies the filter to a query.

        Args:
            query: The query to which to apply the filter.
            table: The query table.

        Returns:
            The query with filter applied.
        """
        from sqlmodel import or_

        query = super().apply_filter(query=query, table=table)

        if self.scope_user:
            scope_filter = or_(
                getattr(table, "user_id") == self.scope_user,
                getattr(table, "is_shared").is_(True),
            )
            query = query.where(scope_filter)

        return query
scope_user: UUID pydantic-field

The user to scope this query to.

apply_filter(self, query, table)

Applies the filter to a query.

Parameters:

Name Type Description Default
query Union[Select[AnySchema], SelectOfScalar[AnySchema]]

The query to which to apply the filter.

required
table Type[AnySchema]

The query table.

required

Returns:

Type Description
Union[Select[AnySchema], SelectOfScalar[AnySchema]]

The query with filter applied.

Source code in zenml/models/filter_models.py
def apply_filter(
    self,
    query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
    table: Type["AnySchema"],
) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
    """Applies the filter to a query.

    Args:
        query: The query to which to apply the filter.
        table: The query table.

    Returns:
        The query with filter applied.
    """
    from sqlmodel import or_

    query = super().apply_filter(query=query, table=table)

    if self.scope_user:
        scope_filter = or_(
            getattr(table, "user_id") == self.scope_user,
            getattr(table, "is_shared").is_(True),
        )
        query = query.where(scope_filter)

    return query
set_scope_user(self, user_id)

Set the user that is performing the filtering to scope the response.

Parameters:

Name Type Description Default
user_id UUID

The user ID to scope the response to.

required
Source code in zenml/models/filter_models.py
def set_scope_user(self, user_id: UUID) -> None:
    """Set the user that is performing the filtering to scope the response.

    Args:
        user_id: The user ID to scope the response to.
    """
    self.scope_user = user_id

StrFilter (Filter) pydantic-model

Filter for all string fields.

Source code in zenml/models/filter_models.py
class StrFilter(Filter):
    """Filter for all string fields."""

    ALLOWED_OPS: ClassVar[List[str]] = [
        GenericFilterOps.EQUALS,
        GenericFilterOps.STARTSWITH,
        GenericFilterOps.CONTAINS,
        GenericFilterOps.ENDSWITH,
    ]

    def generate_query_conditions_from_column(self, column: Any) -> Any:
        """Generate query conditions for a string column.

        Args:
            column: The string column of an SQLModel table on which to filter.

        Returns:
            A list of query conditions.
        """
        if self.operation == GenericFilterOps.CONTAINS:
            return column.like(f"%{self.value}%")
        if self.operation == GenericFilterOps.STARTSWITH:
            return column.startswith(f"{self.value}")
        if self.operation == GenericFilterOps.ENDSWITH:
            return column.endswith(f"{self.value}")
        return column == self.value
generate_query_conditions_from_column(self, column)

Generate query conditions for a string column.

Parameters:

Name Type Description Default
column Any

The string column of an SQLModel table on which to filter.

required

Returns:

Type Description
Any

A list of query conditions.

Source code in zenml/models/filter_models.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
    """Generate query conditions for a string column.

    Args:
        column: The string column of an SQLModel table on which to filter.

    Returns:
        A list of query conditions.
    """
    if self.operation == GenericFilterOps.CONTAINS:
        return column.like(f"%{self.value}%")
    if self.operation == GenericFilterOps.STARTSWITH:
        return column.startswith(f"{self.value}")
    if self.operation == GenericFilterOps.ENDSWITH:
        return column.endswith(f"{self.value}")
    return column == self.value

UUIDFilter (StrFilter) pydantic-model

Filter for all uuid fields which are mostly treated like strings.

Source code in zenml/models/filter_models.py
class UUIDFilter(StrFilter):
    """Filter for all uuid fields which are mostly treated like strings."""

    def generate_query_conditions_from_column(self, column: Any) -> Any:
        """Generate query conditions for a UUID column.

        Args:
            column: The UUID column of an SQLModel table on which to filter.

        Returns:
            A list of query conditions.
        """
        import sqlalchemy
        from sqlalchemy_utils.functions import cast_if

        # For equality checks, compare the UUID directly
        if self.operation == GenericFilterOps.EQUALS:
            return column == self.value

        # For all other operations, cast and handle the column as string
        return super().generate_query_conditions_from_column(
            column=cast_if(column, sqlalchemy.String)
        )
generate_query_conditions_from_column(self, column)

Generate query conditions for a UUID column.

Parameters:

Name Type Description Default
column Any

The UUID column of an SQLModel table on which to filter.

required

Returns:

Type Description
Any

A list of query conditions.

Source code in zenml/models/filter_models.py
def generate_query_conditions_from_column(self, column: Any) -> Any:
    """Generate query conditions for a UUID column.

    Args:
        column: The UUID column of an SQLModel table on which to filter.

    Returns:
        A list of query conditions.
    """
    import sqlalchemy
    from sqlalchemy_utils.functions import cast_if

    # For equality checks, compare the UUID directly
    if self.operation == GenericFilterOps.EQUALS:
        return column == self.value

    # For all other operations, cast and handle the column as string
    return super().generate_query_conditions_from_column(
        column=cast_if(column, sqlalchemy.String)
    )

WorkspaceScopedFilterModel (BaseFilterModel) pydantic-model

Model to enable advanced scoping with workspace.

Source code in zenml/models/filter_models.py
class WorkspaceScopedFilterModel(BaseFilterModel):
    """Model to enable advanced scoping with workspace."""

    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *BaseFilterModel.FILTER_EXCLUDE_FIELDS,
        "scope_workspace",
    ]
    CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *BaseFilterModel.CLI_EXCLUDE_FIELDS,
        "scope_workspace",
    ]
    scope_workspace: Optional[UUID] = Field(
        default=None,
        description="The workspace to scope this query to.",
    )

    def set_scope_workspace(self, workspace_id: UUID) -> None:
        """Set the workspace to scope this response.

        Args:
            workspace_id: The workspace to scope this response to.
        """
        self.scope_workspace = workspace_id

    def apply_filter(
        self,
        query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
        table: Type["AnySchema"],
    ) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
        """Applies the filter to a query.

        Args:
            query: The query to which to apply the filter.
            table: The query table.

        Returns:
            The query with filter applied.
        """
        from sqlmodel import or_

        query = super().apply_filter(query=query, table=table)

        if self.scope_workspace:
            scope_filter = or_(
                getattr(table, "workspace_id") == self.scope_workspace,
                getattr(table, "workspace_id").is_(None),
            )
            query = query.where(scope_filter)

        return query
scope_workspace: UUID pydantic-field

The workspace to scope this query to.

apply_filter(self, query, table)

Applies the filter to a query.

Parameters:

Name Type Description Default
query Union[Select[AnySchema], SelectOfScalar[AnySchema]]

The query to which to apply the filter.

required
table Type[AnySchema]

The query table.

required

Returns:

Type Description
Union[Select[AnySchema], SelectOfScalar[AnySchema]]

The query with filter applied.

Source code in zenml/models/filter_models.py
def apply_filter(
    self,
    query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
    table: Type["AnySchema"],
) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
    """Applies the filter to a query.

    Args:
        query: The query to which to apply the filter.
        table: The query table.

    Returns:
        The query with filter applied.
    """
    from sqlmodel import or_

    query = super().apply_filter(query=query, table=table)

    if self.scope_workspace:
        scope_filter = or_(
            getattr(table, "workspace_id") == self.scope_workspace,
            getattr(table, "workspace_id").is_(None),
        )
        query = query.where(scope_filter)

    return query
set_scope_workspace(self, workspace_id)

Set the workspace to scope this response.

Parameters:

Name Type Description Default
workspace_id UUID

The workspace to scope this response to.

required
Source code in zenml/models/filter_models.py
def set_scope_workspace(self, workspace_id: UUID) -> None:
    """Set the workspace to scope this response.

    Args:
        workspace_id: The workspace to scope this response to.
    """
    self.scope_workspace = workspace_id

flavor_models

Models representing stack component flavors.

FlavorBaseModel (BaseModel) pydantic-model

Base model for stack component flavors.

Source code in zenml/models/flavor_models.py
class FlavorBaseModel(BaseModel):
    """Base model for stack component flavors."""

    name: str = Field(
        title="The name of the Flavor.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    type: StackComponentType = Field(title="The type of the Flavor.")
    config_schema: Dict[str, Any] = Field(
        title="The JSON schema of this flavor's corresponding configuration.",
    )
    connector_type: Optional[str] = Field(
        default=None,
        title="The type of the connector that this flavor uses.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    connector_resource_type: Optional[str] = Field(
        default=None,
        title="The resource type of the connector that this flavor uses.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    connector_resource_id_attr: Optional[str] = Field(
        default=None,
        title="The name of an attribute in the stack component configuration "
        "that plays the role of resource ID when linked to a service connector.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    source: str = Field(
        title="The path to the module which contains this Flavor.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    integration: Optional[str] = Field(
        title="The name of the integration that the Flavor belongs to.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    logo_url: Optional[str] = Field(
        default=None,
        title="Optionally, a url pointing to a png,"
        "svg or jpg can be attached.",
    )
    docs_url: Optional[str] = Field(
        default=None,
        title="Optionally, a url pointing to docs, within docs.zenml.io.",
    )
    sdk_docs_url: Optional[str] = Field(
        default=None,
        title="Optionally, a url pointing to SDK docs,"
        "within sdkdocs.zenml.io.",
    )
    is_custom: bool = Field(
        title="Whether or not this flavor is a custom, user created flavor.",
        default=True,
    )

    @property
    def connector_requirements(self) -> Optional[ServiceConnectorRequirements]:
        """Returns the connector requirements for the flavor.

        Returns:
            The connector requirements for the flavor.
        """
        if not self.connector_resource_type:
            return None

        return ServiceConnectorRequirements(
            connector_type=self.connector_type,
            resource_type=self.connector_resource_type,
            resource_id_attr=self.connector_resource_id_attr,
        )
connector_requirements: Optional[zenml.models.service_connector_models.ServiceConnectorRequirements] property readonly

Returns the connector requirements for the flavor.

Returns:

Type Description
Optional[zenml.models.service_connector_models.ServiceConnectorRequirements]

The connector requirements for the flavor.

FlavorFilterModel (WorkspaceScopedFilterModel) pydantic-model

Model to enable advanced filtering of all Flavors.

Source code in zenml/models/flavor_models.py
class FlavorFilterModel(WorkspaceScopedFilterModel):
    """Model to enable advanced filtering of all Flavors."""

    name: Optional[str] = Field(
        default=None,
        description="Name of the flavor",
    )
    type: Optional[str] = Field(
        default=None,
        description="Stack Component Type of the stack flavor",
    )
    integration: Optional[str] = Field(
        default=None,
        description="Integration associated with the flavor",
    )
    workspace_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Workspace of the stack"
    )
    user_id: Optional[Union[UUID, str]] = Field(
        default=None, description="User of the stack"
    )
integration: str pydantic-field

Integration associated with the flavor

name: str pydantic-field

Name of the flavor

type: str pydantic-field

Stack Component Type of the stack flavor

user_id: Union[uuid.UUID, str] pydantic-field

User of the stack

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace of the stack

FlavorRequestModel (FlavorBaseModel, BaseRequestModel) pydantic-model

Request model for stack component flavors.

Source code in zenml/models/flavor_models.py
class FlavorRequestModel(FlavorBaseModel, BaseRequestModel):
    """Request model for stack component flavors."""

    ANALYTICS_FIELDS: ClassVar[List[str]] = [
        "type",
        "integration",
    ]

    user: Optional[UUID] = Field(
        default=None, title="The id of the user that created this resource."
    )

    workspace: Optional[UUID] = Field(
        default=None, title="The workspace to which this resource belongs."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

FlavorResponseModel (FlavorBaseModel, BaseResponseModel) pydantic-model

Response model for stack component flavors.

Source code in zenml/models/flavor_models.py
class FlavorResponseModel(FlavorBaseModel, BaseResponseModel):
    """Response model for stack component flavors."""

    ANALYTICS_FIELDS: ClassVar[List[str]] = [
        "id",
        "type",
        "integration",
    ]

    user: Union["UserResponseModel", None] = Field(
        title="The user that created this resource.", nullable=True
    )

    workspace: Optional["WorkspaceResponseModel"] = Field(
        title="The project of this resource."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

FlavorUpdateModel (FlavorRequestModel) pydantic-model

Update model for flavors.

Source code in zenml/models/flavor_models.py
class FlavorUpdateModel(FlavorRequestModel):
    """Update model for flavors."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

hub_plugin_models

Models representing ZenML Hub plugins.

HubPluginBaseModel (BaseModel) pydantic-model

Base model for a ZenML Hub plugin.

Source code in zenml/models/hub_plugin_models.py
class HubPluginBaseModel(BaseModel):
    """Base model for a ZenML Hub plugin."""

    name: str
    description: Optional[str]
    version: Optional[str]
    release_notes: Optional[str]
    repository_url: str
    repository_subdirectory: Optional[str]
    repository_branch: Optional[str]
    repository_commit: Optional[str]
    tags: Optional[List[str]]
    logo_url: Optional[str]

HubPluginRequestModel (HubPluginBaseModel) pydantic-model

Request model for a ZenML Hub plugin.

Source code in zenml/models/hub_plugin_models.py
class HubPluginRequestModel(HubPluginBaseModel):
    """Request model for a ZenML Hub plugin."""

HubPluginResponseModel (HubPluginBaseModel) pydantic-model

Response model for a ZenML Hub plugin.

Source code in zenml/models/hub_plugin_models.py
class HubPluginResponseModel(HubPluginBaseModel):
    """Response model for a ZenML Hub plugin."""

    id: UUID
    status: PluginStatus
    author: str
    version: str
    index_url: Optional[str]
    package_name: Optional[str]
    requirements: Optional[List[str]]
    build_logs: Optional[str]
    created: datetime
    updated: datetime

HubUserResponseModel (BaseModel) pydantic-model

Model for a ZenML Hub user.

Source code in zenml/models/hub_plugin_models.py
class HubUserResponseModel(BaseModel):
    """Model for a ZenML Hub user."""

    id: UUID
    email: str
    username: Optional[str]

PluginStatus (StrEnum)

Enum that represents the status of a plugin.

  • PENDING: Plugin is being built
  • FAILED: Plugin build failed
  • AVAILABLE: Plugin is available for installation
  • YANKED: Plugin was yanked and is no longer available
Source code in zenml/models/hub_plugin_models.py
class PluginStatus(StrEnum):
    """Enum that represents the status of a plugin.

    - PENDING: Plugin is being built
    - FAILED: Plugin build failed
    - AVAILABLE: Plugin is available for installation
    - YANKED: Plugin was yanked and is no longer available
    """

    PENDING = "pending"
    FAILED = "failed"
    AVAILABLE = "available"
    YANKED = "yanked"

logs_models

Models representing logs files.

LogsBaseModel (BaseModel) pydantic-model

Base model for logs.

Source code in zenml/models/logs_models.py
class LogsBaseModel(BaseModel):
    """Base model for logs."""

    uri: str = Field(
        title="The uri of the logs file",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    artifact_store_id: Union[str, UUID] = Field(
        title="The artifact store ID to associate the logs with.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

LogsRequestModel (LogsBaseModel, BaseRequestModel) pydantic-model

Request model for logs.

Source code in zenml/models/logs_models.py
class LogsRequestModel(LogsBaseModel, BaseRequestModel):
    """Request model for logs."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

LogsResponseModel (LogsBaseModel, BaseResponseModel) pydantic-model

Response model for logs.

Source code in zenml/models/logs_models.py
class LogsResponseModel(LogsBaseModel, BaseResponseModel):
    """Response model for logs."""

    step_run_id: Optional[Union[str, UUID]] = Field(
        title="Step ID to associate the logs with.",
        default=None,
        description="When this is set, pipeline_run_id should be set to None.",
    )
    pipeline_run_id: Optional[Union[str, UUID]] = Field(
        title="Pipeline run ID to associate the logs with.",
        default=None,
        description="When this is set, step_run_id should be set to None.",
    )
pipeline_run_id: Union[uuid.UUID, str] pydantic-field

When this is set, step_run_id should be set to None.

step_run_id: Union[uuid.UUID, str] pydantic-field

When this is set, pipeline_run_id should be set to None.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

page_model

Model implementation for easy pagination for Lists of ZenML Domain Models.

The code contained within this file has been inspired by the fastapi-pagination library: https://github.com/uriyyo/fastapi-pagination

Page (GenericModel, Generic) pydantic-model

Return Model for List Models to accommodate pagination.

Source code in zenml/models/page_model.py
class Page(GenericModel, Generic[B]):
    """Return Model for List Models to accommodate pagination."""

    index: PositiveInt
    max_size: PositiveInt
    total_pages: NonNegativeInt
    total: NonNegativeInt
    items: List[B]

    __params_type__ = BaseFilterModel

    @property
    def size(self) -> int:
        """Return the item count of the page.

        Returns:
            The amount of items in the page.
        """
        return len(self.items)

    def __len__(self) -> int:
        """Return the item count of the page.

        This enables `len(page)`.

        Returns:
            The amount of items in the page.
        """
        return len(self.items)

    def __getitem__(self, index: int) -> B:
        """Return the item at the given index.

        This enables `page[index]`.

        Args:
            index: The index to get the item from.

        Returns:
            The item at the given index.
        """
        return self.items[index]

    def __iter__(self) -> Generator[B, None, None]:  # type: ignore[override]
        """Return an iterator over the items in the page.

        This enables `for item in page` loops, but breaks `dict(page)`.

        Yields:
            An iterator over the items in the page.
        """
        for item in self.items.__iter__():
            yield item

    def __contains__(self, item: B) -> bool:
        """Returns whether the page contains a specific item.

        This enables `item in page` checks.

        Args:
            item: The item to check for.

        Returns:
            Whether the item is in the page.
        """
        return item in self.items

    class Config:
        """Pydantic configuration class."""

        # This is needed to allow the REST API server to unpack SecretStr
        # values correctly before sending them to the client.
        json_encoders = {
            SecretStr: lambda v: v.get_secret_value() if v else None
        }
size: int property readonly

Return the item count of the page.

Returns:

Type Description
int

The amount of items in the page.

Config

Pydantic configuration class.

Source code in zenml/models/page_model.py
class Config:
    """Pydantic configuration class."""

    # This is needed to allow the REST API server to unpack SecretStr
    # values correctly before sending them to the client.
    json_encoders = {
        SecretStr: lambda v: v.get_secret_value() if v else None
    }
__params_type__ (BaseModel) pydantic-model

Class to unify all filter, paginate and sort request parameters.

This Model allows fine-grained filtering, sorting and pagination of resources.

Usage example for subclasses of this class:

ResourceListModel(
    name="contains:default",
    workspace="default"
    count_steps="gte:5"
    sort_by="created",
    page=2,
    size=50
)
Source code in zenml/models/page_model.py
class BaseFilterModel(BaseModel):
    """Class to unify all filter, paginate and sort request parameters.

    This Model allows fine-grained filtering, sorting and pagination of
    resources.

    Usage example for subclasses of this class:
    ```
    ResourceListModel(
        name="contains:default",
        workspace="default"
        count_steps="gte:5"
        sort_by="created",
        page=2,
        size=50
    )
    ```
    """

    # List of fields that cannot be used as filters.
    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        "sort_by",
        "page",
        "size",
        "logical_operator",
    ]

    # List of fields that are not even mentioned as options in the CLI.
    CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = []

    sort_by: str = Field(
        default="created", description="Which column to sort by."
    )
    logical_operator: LogicalOperators = Field(
        default=LogicalOperators.AND,
        description="Which logical operator to use between all filters "
        "['and', 'or']",
    )
    page: int = Field(
        default=PAGINATION_STARTING_PAGE, ge=1, description="Page number"
    )
    size: int = Field(
        default=PAGE_SIZE_DEFAULT,
        ge=1,
        le=PAGE_SIZE_MAXIMUM,
        description="Page size",
    )

    id: Optional[Union[UUID, str]] = Field(
        default=None, description="Id for this resource"
    )
    created: Optional[Union[datetime, str]] = Field(
        default=None, description="Created"
    )
    updated: Optional[Union[datetime, str]] = Field(
        default=None, description="Updated"
    )

    @validator("sort_by", pre=True)
    def validate_sort_by(cls, v: str) -> str:
        """Validate that the sort_column is a valid column with a valid operand.

        Args:
            v: The sort_by field value.

        Returns:
            The validated sort_by field value.

        Raises:
            ValidationError: If the sort_by field is not a string.
            ValueError: If the resource can't be sorted by this field.
        """
        # Somehow pydantic allows you to pass in int values, which will be
        #  interpreted as string, however within the validator they are still
        #  integers, which don't have a .split() method
        if not isinstance(v, str):
            raise ValidationError(
                f"str type expected for the sort_by field. "
                f"Received a {type(v)}"
            )
        column = v
        split_value = v.split(":", 1)
        if len(split_value) == 2:
            column = split_value[1]

            if split_value[0] not in SorterOps.values():
                logger.warning(
                    "Invalid operand used for column sorting. "
                    "Only the following operands are supported `%s`. "
                    "Defaulting to 'asc' on column `%s`.",
                    SorterOps.values(),
                    column,
                )
                v = column

        if column in cls.FILTER_EXCLUDE_FIELDS:
            raise ValueError(
                f"This resource can not be sorted by this field: '{v}'"
            )
        elif column in cls.__fields__:
            return v
        else:
            raise ValueError(
                "You can only sort by valid fields of this resource"
            )

    @root_validator(pre=True)
    def filter_ops(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        """Parse incoming filters to ensure all filters are legal.

        Args:
            values: The values of the class.

        Returns:
            The values of the class.
        """
        cls._generate_filter_list(values)
        return values

    @property
    def list_of_filters(self) -> List[Filter]:
        """Converts the class variables into a list of usable Filter Models.

        Returns:
            A list of Filter models.
        """
        return self._generate_filter_list(
            {key: getattr(self, key) for key in self.__fields__}
        )

    @property
    def sorting_params(self) -> Tuple[str, SorterOps]:
        """Converts the class variables into a list of usable Filter Models.

        Returns:
            A tuple of the column to sort by and the sorting operand.
        """
        column = self.sort_by
        # The default sorting operand is asc
        operator = SorterOps.ASCENDING

        # Check if user explicitly set an operand
        split_value = self.sort_by.split(":", 1)
        if len(split_value) == 2:
            column = split_value[1]
            operator = SorterOps(split_value[0])

        return column, operator

    @classmethod
    def _generate_filter_list(cls, values: Dict[str, Any]) -> List[Filter]:
        """Create a list of filters from a (column, value) dictionary.

        Args:
            values: A dictionary of column names and values to filter on.

        Returns:
            A list of filters.
        """
        list_of_filters: List[Filter] = []

        for key, value in values.items():
            # Ignore excluded filters
            if key in cls.FILTER_EXCLUDE_FIELDS:
                continue

            # Skip filtering for None values
            if value is None:
                continue

            # Determine the operator and filter value
            value, operator = cls._resolve_operator(value)

            # Define the filter
            filter = cls._define_filter(
                column=key, value=value, operator=operator
            )
            list_of_filters.append(filter)

        return list_of_filters

    @staticmethod
    def _resolve_operator(value: Any) -> Tuple[Any, GenericFilterOps]:
        """Determine the operator and filter value from a user-provided value.

        If the user-provided value is a string of the form "operator:value",
        then the operator is extracted and the value is returned. Otherwise,
        `GenericFilterOps.EQUALS` is used as default operator and the value
        is returned as-is.

        Args:
            value: The user-provided value.

        Returns:
            A tuple of the filter value and the operator.
        """
        operator = GenericFilterOps.EQUALS  # Default operator
        if isinstance(value, str):
            split_value = value.split(":", 1)
            if (
                len(split_value) == 2
                and split_value[0] in GenericFilterOps.values()
            ):
                value = split_value[1]
                operator = GenericFilterOps(split_value[0])
        return value, operator

    @classmethod
    def _define_filter(
        cls, column: str, value: Any, operator: GenericFilterOps
    ) -> Filter:
        """Define a filter for a given column.

        Args:
            column: The column to filter on.
            value: The value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.
        """
        # Create datetime filters
        if cls.is_datetime_field(column):
            return cls._define_datetime_filter(
                column=column,
                value=value,
                operator=operator,
            )

        # Create UUID filters
        if cls.is_uuid_field(column):
            return cls._define_uuid_filter(
                column=column,
                value=value,
                operator=operator,
            )

        # Create int filters
        if cls.is_int_field(column):
            return NumericFilter(
                operation=GenericFilterOps(operator),
                column=column,
                value=int(value),
            )

        # Create bool filters
        if cls.is_bool_field(column):
            return cls._define_bool_filter(
                column=column,
                value=value,
                operator=operator,
            )

        # Create str filters
        if cls.is_str_field(column):
            return StrFilter(
                operation=GenericFilterOps(operator),
                column=column,
                value=value,
            )

        # Handle unsupported datatypes
        logger.warning(
            f"The Datatype {cls.__fields__[column].type_} might not be "
            "supported for filtering. Defaulting to a string filter."
        )
        return StrFilter(
            operation=GenericFilterOps(operator),
            column=column,
            value=str(value),
        )

    @classmethod
    def is_datetime_field(cls, k: str) -> bool:
        """Checks if it's a datetime field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a datetime field, False otherwise.
        """
        return (
            issubclass(datetime, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is datetime
        )

    @classmethod
    def is_uuid_field(cls, k: str) -> bool:
        """Checks if it's a uuid field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a uuid field, False otherwise.
        """
        return (
            issubclass(UUID, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is UUID
        )

    @classmethod
    def is_int_field(cls, k: str) -> bool:
        """Checks if it's a int field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a int field, False otherwise.
        """
        return (
            issubclass(int, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is int
        )

    @classmethod
    def is_bool_field(cls, k: str) -> bool:
        """Checks if it's a bool field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a bool field, False otherwise.
        """
        return (
            issubclass(bool, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is bool
        )

    @classmethod
    def is_str_field(cls, k: str) -> bool:
        """Checks if it's a string field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a string field, False otherwise.
        """
        return (
            issubclass(str, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ is str
        )

    @classmethod
    def is_sort_by_field(cls, k: str) -> bool:
        """Checks if it's a sort by field.

        Args:
            k: The key to check.

        Returns:
            True if the field is a sort by field, False otherwise.
        """
        return (
            issubclass(str, get_args(cls.__fields__[k].type_))
            or cls.__fields__[k].type_ == str
        ) and k == "sort_by"

    @staticmethod
    def _define_datetime_filter(
        column: str, value: Any, operator: GenericFilterOps
    ) -> NumericFilter:
        """Define a datetime filter for a given column.

        Args:
            column: The column to filter on.
            value: The datetime value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.

        Raises:
            ValueError: If the value is not a valid datetime.
        """
        try:
            if isinstance(value, datetime):
                datetime_value = value
            else:
                datetime_value = datetime.strptime(
                    value, FILTERING_DATETIME_FORMAT
                )
        except ValueError as e:
            raise ValueError(
                "The datetime filter only works with values in the following "
                f"format: {FILTERING_DATETIME_FORMAT}"
            ) from e
        datetime_filter = NumericFilter(
            operation=GenericFilterOps(operator),
            column=column,
            value=datetime_value,
        )
        return datetime_filter

    @staticmethod
    def _define_uuid_filter(
        column: str, value: Any, operator: GenericFilterOps
    ) -> UUIDFilter:
        """Define a UUID filter for a given column.

        Args:
            column: The column to filter on.
            value: The UUID value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.

        Raises:
            ValueError: If the value is not a valid UUID.
        """
        # For equality checks, ensure that the value is a valid UUID.
        if operator == GenericFilterOps.EQUALS and not isinstance(value, UUID):
            try:
                UUID(value)
            except ValueError as e:
                raise ValueError(
                    "Invalid value passed as UUID query parameter."
                ) from e

        # Cast the value to string for further comparisons.
        value = str(value)

        # Generate the filter.
        uuid_filter = UUIDFilter(
            operation=GenericFilterOps(operator),
            column=column,
            value=value,
        )
        return uuid_filter

    @staticmethod
    def _define_bool_filter(
        column: str, value: Any, operator: GenericFilterOps
    ) -> BoolFilter:
        """Define a bool filter for a given column.

        Args:
            column: The column to filter on.
            value: The bool value by which to filter.
            operator: The operator to use for filtering.

        Returns:
            A Filter object.
        """
        if GenericFilterOps(operator) != GenericFilterOps.EQUALS:
            logger.warning(
                "Boolean filters do not support any"
                "operation except for equals. Defaulting"
                "to an `equals` comparison."
            )
        return BoolFilter(
            operation=GenericFilterOps.EQUALS,
            column=column,
            value=bool(value),
        )

    @property
    def offset(self) -> int:
        """Returns the offset needed for the query on the data persistence layer.

        Returns:
            The offset for the query.
        """
        return self.size * (self.page - 1)

    def generate_filter(
        self, table: Type[SQLModel]
    ) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
        """Generate the filter for the query.

        Args:
            table: The Table that is being queried from.

        Returns:
            The filter expression for the query.

        Raises:
            RuntimeError: If a valid logical operator is not supplied.
        """
        from sqlalchemy import and_
        from sqlmodel import or_

        filters = []
        for column_filter in self.list_of_filters:
            filters.append(
                column_filter.generate_query_conditions(table=table)
            )
        if self.logical_operator == LogicalOperators.OR:
            return or_(False, *filters)
        elif self.logical_operator == LogicalOperators.AND:
            return and_(True, *filters)
        else:
            raise RuntimeError("No valid logical operator was supplied.")

    def apply_filter(
        self,
        query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
        table: Type["AnySchema"],
    ) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
        """Applies the filter to a query.

        Args:
            query: The query to which to apply the filter.
            table: The query table.

        Returns:
            The query with filter applied.
        """
        filters = self.generate_filter(table=table)

        if filters is not None:
            query = query.where(filters)

        return query
created: Union[datetime.datetime, str] pydantic-field

Created

id: Union[uuid.UUID, str] pydantic-field

Id for this resource

list_of_filters: List[zenml.models.filter_models.Filter] property readonly

Converts the class variables into a list of usable Filter Models.

Returns:

Type Description
List[zenml.models.filter_models.Filter]

A list of Filter models.

logical_operator: LogicalOperators pydantic-field

Which logical operator to use between all filters ['and', 'or']

offset: int property readonly

Returns the offset needed for the query on the data persistence layer.

Returns:

Type Description
int

The offset for the query.

page: ConstrainedIntValue pydantic-field

Page number

size: ConstrainedIntValue pydantic-field

Page size

sort_by: str pydantic-field

Which column to sort by.

sorting_params: Tuple[str, zenml.enums.SorterOps] property readonly

Converts the class variables into a list of usable Filter Models.

Returns:

Type Description
Tuple[str, zenml.enums.SorterOps]

A tuple of the column to sort by and the sorting operand.

updated: Union[datetime.datetime, str] pydantic-field

Updated

apply_filter(self, query, table)

Applies the filter to a query.

Parameters:

Name Type Description Default
query Union[Select[AnySchema], SelectOfScalar[AnySchema]]

The query to which to apply the filter.

required
table Type[AnySchema]

The query table.

required

Returns:

Type Description
Union[Select[AnySchema], SelectOfScalar[AnySchema]]

The query with filter applied.

Source code in zenml/models/page_model.py
def apply_filter(
    self,
    query: Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"],
    table: Type["AnySchema"],
) -> Union["Select[AnySchema]", "SelectOfScalar[AnySchema]"]:
    """Applies the filter to a query.

    Args:
        query: The query to which to apply the filter.
        table: The query table.

    Returns:
        The query with filter applied.
    """
    filters = self.generate_filter(table=table)

    if filters is not None:
        query = query.where(filters)

    return query
filter_ops(values) classmethod

Parse incoming filters to ensure all filters are legal.

Parameters:

Name Type Description Default
values Dict[str, Any]

The values of the class.

required

Returns:

Type Description
Dict[str, Any]

The values of the class.

Source code in zenml/models/page_model.py
@root_validator(pre=True)
def filter_ops(cls, values: Dict[str, Any]) -> Dict[str, Any]:
    """Parse incoming filters to ensure all filters are legal.

    Args:
        values: The values of the class.

    Returns:
        The values of the class.
    """
    cls._generate_filter_list(values)
    return values
generate_filter(self, table)

Generate the filter for the query.

Parameters:

Name Type Description Default
table Type[sqlmodel.main.SQLModel]

The Table that is being queried from.

required

Returns:

Type Description
Union[BinaryExpression[Any], BooleanClauseList[Any]]

The filter expression for the query.

Exceptions:

Type Description
RuntimeError

If a valid logical operator is not supplied.

Source code in zenml/models/page_model.py
def generate_filter(
    self, table: Type[SQLModel]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
    """Generate the filter for the query.

    Args:
        table: The Table that is being queried from.

    Returns:
        The filter expression for the query.

    Raises:
        RuntimeError: If a valid logical operator is not supplied.
    """
    from sqlalchemy import and_
    from sqlmodel import or_

    filters = []
    for column_filter in self.list_of_filters:
        filters.append(
            column_filter.generate_query_conditions(table=table)
        )
    if self.logical_operator == LogicalOperators.OR:
        return or_(False, *filters)
    elif self.logical_operator == LogicalOperators.AND:
        return and_(True, *filters)
    else:
        raise RuntimeError("No valid logical operator was supplied.")
is_bool_field(k) classmethod

Checks if it's a bool field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a bool field, False otherwise.

Source code in zenml/models/page_model.py
@classmethod
def is_bool_field(cls, k: str) -> bool:
    """Checks if it's a bool field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a bool field, False otherwise.
    """
    return (
        issubclass(bool, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is bool
    )
is_datetime_field(k) classmethod

Checks if it's a datetime field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a datetime field, False otherwise.

Source code in zenml/models/page_model.py
@classmethod
def is_datetime_field(cls, k: str) -> bool:
    """Checks if it's a datetime field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a datetime field, False otherwise.
    """
    return (
        issubclass(datetime, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is datetime
    )
is_int_field(k) classmethod

Checks if it's a int field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a int field, False otherwise.

Source code in zenml/models/page_model.py
@classmethod
def is_int_field(cls, k: str) -> bool:
    """Checks if it's a int field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a int field, False otherwise.
    """
    return (
        issubclass(int, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is int
    )
is_sort_by_field(k) classmethod

Checks if it's a sort by field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a sort by field, False otherwise.

Source code in zenml/models/page_model.py
@classmethod
def is_sort_by_field(cls, k: str) -> bool:
    """Checks if it's a sort by field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a sort by field, False otherwise.
    """
    return (
        issubclass(str, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ == str
    ) and k == "sort_by"
is_str_field(k) classmethod

Checks if it's a string field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a string field, False otherwise.

Source code in zenml/models/page_model.py
@classmethod
def is_str_field(cls, k: str) -> bool:
    """Checks if it's a string field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a string field, False otherwise.
    """
    return (
        issubclass(str, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is str
    )
is_uuid_field(k) classmethod

Checks if it's a uuid field.

Parameters:

Name Type Description Default
k str

The key to check.

required

Returns:

Type Description
bool

True if the field is a uuid field, False otherwise.

Source code in zenml/models/page_model.py
@classmethod
def is_uuid_field(cls, k: str) -> bool:
    """Checks if it's a uuid field.

    Args:
        k: The key to check.

    Returns:
        True if the field is a uuid field, False otherwise.
    """
    return (
        issubclass(UUID, get_args(cls.__fields__[k].type_))
        or cls.__fields__[k].type_ is UUID
    )
validate_sort_by(v) classmethod

Validate that the sort_column is a valid column with a valid operand.

Parameters:

Name Type Description Default
v str

The sort_by field value.

required

Returns:

Type Description
str

The validated sort_by field value.

Exceptions:

Type Description
ValidationError

If the sort_by field is not a string.

ValueError

If the resource can't be sorted by this field.

Source code in zenml/models/page_model.py
@validator("sort_by", pre=True)
def validate_sort_by(cls, v: str) -> str:
    """Validate that the sort_column is a valid column with a valid operand.

    Args:
        v: The sort_by field value.

    Returns:
        The validated sort_by field value.

    Raises:
        ValidationError: If the sort_by field is not a string.
        ValueError: If the resource can't be sorted by this field.
    """
    # Somehow pydantic allows you to pass in int values, which will be
    #  interpreted as string, however within the validator they are still
    #  integers, which don't have a .split() method
    if not isinstance(v, str):
        raise ValidationError(
            f"str type expected for the sort_by field. "
            f"Received a {type(v)}"
        )
    column = v
    split_value = v.split(":", 1)
    if len(split_value) == 2:
        column = split_value[1]

        if split_value[0] not in SorterOps.values():
            logger.warning(
                "Invalid operand used for column sorting. "
                "Only the following operands are supported `%s`. "
                "Defaulting to 'asc' on column `%s`.",
                SorterOps.values(),
                column,
            )
            v = column

    if column in cls.FILTER_EXCLUDE_FIELDS:
        raise ValueError(
            f"This resource can not be sorted by this field: '{v}'"
        )
    elif column in cls.__fields__:
        return v
    else:
        raise ValueError(
            "You can only sort by valid fields of this resource"
        )
__contains__(self, item) special

Returns whether the page contains a specific item.

This enables item in page checks.

Parameters:

Name Type Description Default
item ~B

The item to check for.

required

Returns:

Type Description
bool

Whether the item is in the page.

Source code in zenml/models/page_model.py
def __contains__(self, item: B) -> bool:
    """Returns whether the page contains a specific item.

    This enables `item in page` checks.

    Args:
        item: The item to check for.

    Returns:
        Whether the item is in the page.
    """
    return item in self.items
__getitem__(self, index) special

Return the item at the given index.

This enables page[index].

Parameters:

Name Type Description Default
index int

The index to get the item from.

required

Returns:

Type Description
~B

The item at the given index.

Source code in zenml/models/page_model.py
def __getitem__(self, index: int) -> B:
    """Return the item at the given index.

    This enables `page[index]`.

    Args:
        index: The index to get the item from.

    Returns:
        The item at the given index.
    """
    return self.items[index]
__iter__(self) special

Return an iterator over the items in the page.

This enables for item in page loops, but breaks dict(page).

Yields:

Type Description
Generator[~B, NoneType, NoneType]

An iterator over the items in the page.

Source code in zenml/models/page_model.py
def __iter__(self) -> Generator[B, None, None]:  # type: ignore[override]
    """Return an iterator over the items in the page.

    This enables `for item in page` loops, but breaks `dict(page)`.

    Yields:
        An iterator over the items in the page.
    """
    for item in self.items.__iter__():
        yield item
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

__len__(self) special

Return the item count of the page.

This enables len(page).

Returns:

Type Description
int

The amount of items in the page.

Source code in zenml/models/page_model.py
def __len__(self) -> int:
    """Return the item count of the page.

    This enables `len(page)`.

    Returns:
        The amount of items in the page.
    """
    return len(self.items)

pipeline_build_models

Models representing pipeline builds.

BuildItem (BaseModel) pydantic-model

Pipeline build item.

Attributes:

Name Type Description
image str

The image name or digest.

dockerfile Optional[str]

The contents of the Dockerfile used to build the image.

requirements Optional[str]

The pip requirements installed in the image. This is a string consisting of multiple concatenated requirements.txt files.

settings_checksum Optional[str]

Checksum of the settings used for the build.

contains_code bool

Whether the image contains user files.

requires_code_download bool

Whether the image needs to download files.

Source code in zenml/models/pipeline_build_models.py
class BuildItem(BaseModel):
    """Pipeline build item.

    Attributes:
        image: The image name or digest.
        dockerfile: The contents of the Dockerfile used to build the image.
        requirements: The pip requirements installed in the image. This is a
            string consisting of multiple concatenated requirements.txt files.
        settings_checksum: Checksum of the settings used for the build.
        contains_code: Whether the image contains user files.
        requires_code_download: Whether the image needs to download files.
    """

    image: str = Field(title="The image name or digest.")
    dockerfile: Optional[str] = Field(
        title="The dockerfile used to build the image."
    )
    requirements: Optional[str] = Field(
        title="The pip requirements installed in the image."
    )
    settings_checksum: Optional[str] = Field(
        title="The checksum of the build settings."
    )
    contains_code: bool = Field(
        default=True, title="Whether the image contains user files."
    )
    requires_code_download: bool = Field(
        default=False, title="Whether the image needs to download files."
    )

PipelineBuildBaseModel (YAMLSerializationMixin) pydantic-model

Base model for pipeline builds.

Attributes:

Name Type Description
images Dict[str, zenml.models.pipeline_build_models.BuildItem]

Docker images of this build.

is_local bool

Whether the images are stored locally or in a container registry.

Source code in zenml/models/pipeline_build_models.py
class PipelineBuildBaseModel(pydantic_utils.YAMLSerializationMixin):
    """Base model for pipeline builds.

    Attributes:
        images: Docker images of this build.
        is_local: Whether the images are stored locally or in a container
            registry.
    """

    images: Dict[str, BuildItem] = Field(
        default={}, title="The images of this build."
    )
    is_local: bool = Field(
        title="Whether the build images are stored in a container registry or locally.",
    )
    contains_code: bool = Field(
        title="Whether any image of the build contains user code.",
    )
    zenml_version: Optional[str] = Field(
        title="The version of ZenML used for this build."
    )
    python_version: Optional[str] = Field(
        title="The Python version used for this build."
    )
    checksum: Optional[str] = Field(title="The build checksum.")

    @property
    def requires_code_download(self) -> bool:
        """Whether the build requires code download.

        Returns:
            Whether the build requires code download.
        """
        return any(
            item.requires_code_download for item in self.images.values()
        )

    @staticmethod
    def get_image_key(component_key: str, step: Optional[str] = None) -> str:
        """Get the image key.

        Args:
            component_key: The component key.
            step: The pipeline step for which the image was built.

        Returns:
            The image key.
        """
        if step:
            return f"{step}.{component_key}"
        else:
            return component_key

    def get_image(self, component_key: str, step: Optional[str] = None) -> str:
        """Get the image built for a specific key.

        Args:
            component_key: The key for which to get the image.
            step: The pipeline step for which to get the image. If no image
                exists for this step, will fallback to the pipeline image for
                the same key.

        Returns:
            The image name or digest.
        """
        return self._get_item(component_key=component_key, step=step).image

    def get_settings_checksum(
        self, component_key: str, step: Optional[str] = None
    ) -> Optional[str]:
        """Get the settings checksum for a specific key.

        Args:
            component_key: The key for which to get the checksum.
            step: The pipeline step for which to get the checksum. If no
                image exists for this step, will fallback to the pipeline image
                for the same key.

        Returns:
            The settings checksum.
        """
        return self._get_item(
            component_key=component_key, step=step
        ).settings_checksum

    def _get_item(
        self, component_key: str, step: Optional[str] = None
    ) -> BuildItem:
        """Get the item for a specific key.

        Args:
            component_key: The key for which to get the item.
            step: The pipeline step for which to get the item. If no item
                exists for this step, will fallback to the item for
                the same key.

        Raises:
            KeyError: If no item exists for the given key.

        Returns:
            The build item.
        """
        if step:
            try:
                combined_key = self.get_image_key(
                    component_key=component_key, step=step
                )
                return self.images[combined_key]
            except KeyError:
                pass

        try:
            return self.images[component_key]
        except KeyError:
            raise KeyError(
                f"Unable to find image for key {component_key}. Available keys: "
                f"{set(self.images)}."
            )
requires_code_download: bool property readonly

Whether the build requires code download.

Returns:

Type Description
bool

Whether the build requires code download.

get_image(self, component_key, step=None)

Get the image built for a specific key.

Parameters:

Name Type Description Default
component_key str

The key for which to get the image.

required
step Optional[str]

The pipeline step for which to get the image. If no image exists for this step, will fallback to the pipeline image for the same key.

None

Returns:

Type Description
str

The image name or digest.

Source code in zenml/models/pipeline_build_models.py
def get_image(self, component_key: str, step: Optional[str] = None) -> str:
    """Get the image built for a specific key.

    Args:
        component_key: The key for which to get the image.
        step: The pipeline step for which to get the image. If no image
            exists for this step, will fallback to the pipeline image for
            the same key.

    Returns:
        The image name or digest.
    """
    return self._get_item(component_key=component_key, step=step).image
get_image_key(component_key, step=None) staticmethod

Get the image key.

Parameters:

Name Type Description Default
component_key str

The component key.

required
step Optional[str]

The pipeline step for which the image was built.

None

Returns:

Type Description
str

The image key.

Source code in zenml/models/pipeline_build_models.py
@staticmethod
def get_image_key(component_key: str, step: Optional[str] = None) -> str:
    """Get the image key.

    Args:
        component_key: The component key.
        step: The pipeline step for which the image was built.

    Returns:
        The image key.
    """
    if step:
        return f"{step}.{component_key}"
    else:
        return component_key
get_settings_checksum(self, component_key, step=None)

Get the settings checksum for a specific key.

Parameters:

Name Type Description Default
component_key str

The key for which to get the checksum.

required
step Optional[str]

The pipeline step for which to get the checksum. If no image exists for this step, will fallback to the pipeline image for the same key.

None

Returns:

Type Description
Optional[str]

The settings checksum.

Source code in zenml/models/pipeline_build_models.py
def get_settings_checksum(
    self, component_key: str, step: Optional[str] = None
) -> Optional[str]:
    """Get the settings checksum for a specific key.

    Args:
        component_key: The key for which to get the checksum.
        step: The pipeline step for which to get the checksum. If no
            image exists for this step, will fallback to the pipeline image
            for the same key.

    Returns:
        The settings checksum.
    """
    return self._get_item(
        component_key=component_key, step=step
    ).settings_checksum

PipelineBuildFilterModel (WorkspaceScopedFilterModel) pydantic-model

Model to enable advanced filtering of all pipeline builds.

Source code in zenml/models/pipeline_build_models.py
class PipelineBuildFilterModel(WorkspaceScopedFilterModel):
    """Model to enable advanced filtering of all pipeline builds."""

    workspace_id: Union[UUID, str, None] = Field(
        description="Workspace for this pipeline build."
    )
    user_id: Union[UUID, str, None] = Field(
        description="User that produced this pipeline build."
    )
    pipeline_id: Union[UUID, str, None] = Field(
        description="Pipeline associated with the pipeline build.",
    )
    stack_id: Union[UUID, str, None] = Field(
        description="Stack used for the Pipeline Run"
    )
    is_local: Optional[bool] = Field(
        description="Whether the build images are stored in a container registry or locally.",
    )
    contains_code: Optional[bool] = Field(
        description="Whether any image of the build contains user code.",
    )
    zenml_version: Optional[str] = Field(
        description="The version of ZenML used for this build."
    )
    python_version: Optional[str] = Field(
        description="The Python version used for this build."
    )
    checksum: Optional[str] = Field(description="The build checksum.")
checksum: str pydantic-field

The build checksum.

contains_code: bool pydantic-field

Whether any image of the build contains user code.

is_local: bool pydantic-field

Whether the build images are stored in a container registry or locally.

pipeline_id: Union[uuid.UUID, str] pydantic-field

Pipeline associated with the pipeline build.

python_version: str pydantic-field

The Python version used for this build.

stack_id: Union[uuid.UUID, str] pydantic-field

Stack used for the Pipeline Run

user_id: Union[uuid.UUID, str] pydantic-field

User that produced this pipeline build.

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace for this pipeline build.

zenml_version: str pydantic-field

The version of ZenML used for this build.

PipelineBuildRequestModel (PipelineBuildBaseModel, WorkspaceScopedRequestModel) pydantic-model

Request model for pipelines builds.

Source code in zenml/models/pipeline_build_models.py
class PipelineBuildRequestModel(
    PipelineBuildBaseModel, WorkspaceScopedRequestModel
):
    """Request model for pipelines builds."""

    stack: Optional[UUID] = Field(
        title="The stack that was used for this build."
    )
    pipeline: Optional[UUID] = Field(
        title="The pipeline that was used for this build."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

PipelineBuildResponseModel (PipelineBuildBaseModel, WorkspaceScopedResponseModel) pydantic-model

Response model for pipeline builds.

Source code in zenml/models/pipeline_build_models.py
class PipelineBuildResponseModel(
    PipelineBuildBaseModel, WorkspaceScopedResponseModel
):
    """Response model for pipeline builds."""

    pipeline: Optional["PipelineResponseModel"] = Field(
        title="The pipeline that was used for this build."
    )
    stack: Optional["StackResponseModel"] = Field(
        title="The stack that was used for this build."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

pipeline_deployment_models

Models representing pipeline deployments.

PipelineDeploymentBaseModel (BaseModel) pydantic-model

Base model for pipeline deployments.

Source code in zenml/models/pipeline_deployment_models.py
class PipelineDeploymentBaseModel(BaseModel):
    """Base model for pipeline deployments."""

    run_name_template: str = Field(
        title="The run name template for runs created using this deployment.",
    )
    pipeline_configuration: PipelineConfiguration = Field(
        title="The pipeline configuration for this deployment."
    )
    step_configurations: Dict[str, Step] = Field(
        default={}, title="The step configurations for this deployment."
    )
    client_environment: Dict[str, str] = Field(
        default={}, title="The client environment for this deployment."
    )

    @property
    def requires_included_files(self) -> bool:
        """Whether the deployment requires included files.

        Returns:
            Whether the deployment requires included files.
        """
        return any(
            step.config.docker_settings.source_files == SourceFileMode.INCLUDE
            for step in self.step_configurations.values()
        )

    @property
    def requires_code_download(self) -> bool:
        """Whether the deployment requires downloading some code files.

        Returns:
            Whether the deployment requires downloading some code files.
        """
        return any(
            step.config.docker_settings.source_files == SourceFileMode.DOWNLOAD
            for step in self.step_configurations.values()
        )
requires_code_download: bool property readonly

Whether the deployment requires downloading some code files.

Returns:

Type Description
bool

Whether the deployment requires downloading some code files.

requires_included_files: bool property readonly

Whether the deployment requires included files.

Returns:

Type Description
bool

Whether the deployment requires included files.

PipelineDeploymentFilterModel (WorkspaceScopedFilterModel) pydantic-model

Model to enable advanced filtering of all pipeline deployments.

Source code in zenml/models/pipeline_deployment_models.py
class PipelineDeploymentFilterModel(WorkspaceScopedFilterModel):
    """Model to enable advanced filtering of all pipeline deployments."""

    workspace_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Workspace for this deployment."
    )
    user_id: Optional[Union[UUID, str]] = Field(
        default=None, description="User that created this deployment."
    )
    pipeline_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Pipeline associated with the deployment."
    )
    stack_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Stack associated with the deployment."
    )
    build_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Build associated with the deployment."
    )
    schedule_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Schedule associated with the deployment."
    )
build_id: Union[uuid.UUID, str] pydantic-field

Build associated with the deployment.

pipeline_id: Union[uuid.UUID, str] pydantic-field

Pipeline associated with the deployment.

schedule_id: Union[uuid.UUID, str] pydantic-field

Schedule associated with the deployment.

stack_id: Union[uuid.UUID, str] pydantic-field

Stack associated with the deployment.

user_id: Union[uuid.UUID, str] pydantic-field

User that created this deployment.

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace for this deployment.

PipelineDeploymentRequestModel (PipelineDeploymentBaseModel, WorkspaceScopedRequestModel) pydantic-model

Request model for pipeline deployments.

Source code in zenml/models/pipeline_deployment_models.py
class PipelineDeploymentRequestModel(
    PipelineDeploymentBaseModel, WorkspaceScopedRequestModel
):
    """Request model for pipeline deployments."""

    stack: UUID = Field(title="The stack associated with the deployment.")
    pipeline: Optional[UUID] = Field(
        title="The pipeline associated with the deployment."
    )
    build: Optional[UUID] = Field(
        title="The build associated with the deployment."
    )
    schedule: Optional[UUID] = Field(
        title="The schedule associated with the deployment."
    )
    code_reference: Optional["CodeReferenceRequestModel"] = Field(
        title="The code reference associated with the deployment."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

PipelineDeploymentResponseModel (PipelineDeploymentBaseModel, WorkspaceScopedResponseModel) pydantic-model

Response model for pipeline deployments.

Source code in zenml/models/pipeline_deployment_models.py
class PipelineDeploymentResponseModel(
    PipelineDeploymentBaseModel, WorkspaceScopedResponseModel
):
    """Response model for pipeline deployments."""

    pipeline: Optional["PipelineResponseModel"] = Field(
        title="The pipeline associated with the deployment."
    )
    stack: Optional["StackResponseModel"] = Field(
        title="The stack associated with the deployment."
    )
    build: Optional["PipelineBuildResponseModel"] = Field(
        title="The pipeline build associated with the deployment."
    )
    schedule: Optional["ScheduleResponseModel"] = Field(
        title="The schedule associated with the deployment."
    )
    code_reference: Optional["CodeReferenceResponseModel"] = Field(
        title="The code reference associated with the deployment."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

pipeline_models

Models representing pipelines.

PipelineBaseModel (BaseModel) pydantic-model

Base model for pipelines.

Source code in zenml/models/pipeline_models.py
class PipelineBaseModel(BaseModel):
    """Base model for pipelines."""

    name: str = Field(
        title="The name of the pipeline.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    version: str = Field(
        title="The version of the pipeline.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    version_hash: str = Field(
        title="The version hash of the pipeline.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    docstring: Optional[str] = Field(
        title="The docstring of the pipeline.",
        max_length=TEXT_FIELD_MAX_LENGTH,
    )
    spec: PipelineSpec = Field(title="The spec of the pipeline.")

PipelineFilterModel (WorkspaceScopedFilterModel) pydantic-model

Model to enable advanced filtering of all Workspaces.

Source code in zenml/models/pipeline_models.py
class PipelineFilterModel(WorkspaceScopedFilterModel):
    """Model to enable advanced filtering of all Workspaces."""

    name: Optional[str] = Field(
        default=None,
        description="Name of the Pipeline",
    )
    version: Optional[str] = Field(
        default=None,
        description="Version of the Pipeline",
    )
    version_hash: Optional[str] = Field(
        default=None,
        description="Version hash of the Pipeline",
    )
    docstring: Optional[str] = Field(
        default=None,
        description="Docstring of the Pipeline",
    )
    workspace_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Workspace of the Pipeline"
    )
    user_id: Optional[Union[UUID, str]] = Field(
        default=None, description="User of the Pipeline"
    )
docstring: str pydantic-field

Docstring of the Pipeline

name: str pydantic-field

Name of the Pipeline

user_id: Union[uuid.UUID, str] pydantic-field

User of the Pipeline

version: str pydantic-field

Version of the Pipeline

version_hash: str pydantic-field

Version hash of the Pipeline

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace of the Pipeline

PipelineRequestModel (PipelineBaseModel, WorkspaceScopedRequestModel) pydantic-model

Pipeline request model.

Source code in zenml/models/pipeline_models.py
class PipelineRequestModel(PipelineBaseModel, WorkspaceScopedRequestModel):
    """Pipeline request model."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

PipelineResponseModel (PipelineBaseModel, WorkspaceScopedResponseModel) pydantic-model

Pipeline response model user, workspace, runs, and status hydrated.

Source code in zenml/models/pipeline_models.py
class PipelineResponseModel(PipelineBaseModel, WorkspaceScopedResponseModel):
    """Pipeline response model user, workspace, runs, and status hydrated."""

    status: Optional[List[ExecutionStatus]] = Field(
        default=None, title="The status of the last 3 Pipeline Runs."
    )

    def get_runs(self, **kwargs: Any) -> List["PipelineRunResponseModel"]:
        """Get runs of this pipeline.

        Can be used to fetch runs other than `self.runs` and supports
        fine-grained filtering and pagination.

        Args:
            **kwargs: Further arguments for filtering or pagination that are
                passed to `client.list_pipeline_runs()`.

        Returns:
            List of runs of this pipeline.
        """
        from zenml.client import Client

        return Client().list_pipeline_runs(pipeline_id=self.id, **kwargs).items

    @property
    def runs(self) -> List["PipelineRunResponseModel"]:
        """Returns the 50 most recent runs of this pipeline in descending order.

        Returns:
            The 50 most recent runs of this pipeline in descending order.
        """
        return self.get_runs()

    @property
    def num_runs(self) -> int:
        """Returns the number of runs of this pipeline.

        Returns:
            The number of runs of this pipeline.
        """
        from zenml.client import Client

        return Client().list_pipeline_runs(pipeline_id=self.id, size=1).total

    @property
    def last_run(self) -> "PipelineRunResponseModel":
        """Returns the last run of this pipeline.

        Returns:
            The last run of this pipeline.

        Raises:
            RuntimeError: If no runs were found for this pipeline.
        """
        runs = self.get_runs(size=1)
        if not runs:
            raise RuntimeError(
                f"No runs found for pipeline '{self.name}' with id {self.id}."
            )
        return runs[0]

    @property
    def last_successful_run(self) -> "PipelineRunResponseModel":
        """Returns the last successful run of this pipeline.

        Returns:
            The last successful run of this pipeline.

        Raises:
            RuntimeError: If no successful runs were found for this pipeline.
        """
        runs = self.get_runs(status=ExecutionStatus.COMPLETED, size=1)
        if not runs:
            raise RuntimeError(
                f"No successful runs found for pipeline '{self.name}' with id "
                f"{self.id}."
            )
        return runs[0]
last_run: PipelineRunResponseModel property readonly

Returns the last run of this pipeline.

Returns:

Type Description
PipelineRunResponseModel

The last run of this pipeline.

Exceptions:

Type Description
RuntimeError

If no runs were found for this pipeline.

last_successful_run: PipelineRunResponseModel property readonly

Returns the last successful run of this pipeline.

Returns:

Type Description
PipelineRunResponseModel

The last successful run of this pipeline.

Exceptions:

Type Description
RuntimeError

If no successful runs were found for this pipeline.

num_runs: int property readonly

Returns the number of runs of this pipeline.

Returns:

Type Description
int

The number of runs of this pipeline.

runs: List[PipelineRunResponseModel] property readonly

Returns the 50 most recent runs of this pipeline in descending order.

Returns:

Type Description
List[PipelineRunResponseModel]

The 50 most recent runs of this pipeline in descending order.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_runs(self, **kwargs)

Get runs of this pipeline.

Can be used to fetch runs other than self.runs and supports fine-grained filtering and pagination.

Parameters:

Name Type Description Default
**kwargs Any

Further arguments for filtering or pagination that are passed to client.list_pipeline_runs().

{}

Returns:

Type Description
List[PipelineRunResponseModel]

List of runs of this pipeline.

Source code in zenml/models/pipeline_models.py
def get_runs(self, **kwargs: Any) -> List["PipelineRunResponseModel"]:
    """Get runs of this pipeline.

    Can be used to fetch runs other than `self.runs` and supports
    fine-grained filtering and pagination.

    Args:
        **kwargs: Further arguments for filtering or pagination that are
            passed to `client.list_pipeline_runs()`.

    Returns:
        List of runs of this pipeline.
    """
    from zenml.client import Client

    return Client().list_pipeline_runs(pipeline_id=self.id, **kwargs).items

PipelineUpdateModel (PipelineRequestModel) pydantic-model

Pipeline update model.

Source code in zenml/models/pipeline_models.py
class PipelineUpdateModel(PipelineRequestModel):
    """Pipeline update model."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

pipeline_run_models

Models representing pipeline runs.

PipelineRunBaseModel (BaseModel) pydantic-model

Base model for pipeline runs.

Source code in zenml/models/pipeline_run_models.py
class PipelineRunBaseModel(BaseModel):
    """Base model for pipeline runs."""

    name: str = Field(
        title="The name of the pipeline run.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    orchestrator_run_id: Optional[str] = Field(
        title="The orchestrator run ID.",
        max_length=STR_FIELD_MAX_LENGTH,
        default=None,
    )
    schedule_id: Optional[UUID] = Field(
        title="The ID of the schedule that triggered this pipeline run.",
        default=None,
    )
    enable_cache: Optional[bool] = Field(
        title="Whether to enable caching for this pipeline run.",
        default=None,
    )
    start_time: Optional[datetime] = Field(
        title="The start time of the pipeline run.",
        default=None,
    )
    end_time: Optional[datetime] = Field(
        title="The end time of the pipeline run.",
        default=None,
    )
    status: ExecutionStatus = Field(
        title="The status of the pipeline run.",
    )
    config: PipelineConfiguration = Field(
        title="The pipeline configuration used for this pipeline run.",
    )
    num_steps: Optional[int] = Field(
        title="The number of steps in this pipeline run.",
        default=None,
    )
    client_version: Optional[str] = Field(
        title="Client version.",
        default=current_zenml_version,
        max_length=STR_FIELD_MAX_LENGTH,
    )
    server_version: Optional[str] = Field(
        title="Server version.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    client_environment: Dict[str, str] = Field(
        default={},
        title=(
            "Environment of the client that initiated this pipeline run "
            "(OS, Python version, etc.)."
        ),
    )
    orchestrator_environment: Dict[str, str] = Field(
        default={},
        title=(
            "Environment of the orchestrator that executed this pipeline run "
            "(OS, Python version, etc.)."
        ),
    )

PipelineRunFilterModel (WorkspaceScopedFilterModel) pydantic-model

Model to enable advanced filtering of all Workspaces.

Source code in zenml/models/pipeline_run_models.py
class PipelineRunFilterModel(WorkspaceScopedFilterModel):
    """Model to enable advanced filtering of all Workspaces."""

    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *WorkspaceScopedFilterModel.FILTER_EXCLUDE_FIELDS,
        "unlisted",
        "code_repository_id",
    ]

    name: Optional[str] = Field(
        default=None,
        description="Name of the Pipeline Run",
    )
    orchestrator_run_id: Optional[str] = Field(
        default=None,
        description="Name of the Pipeline Run within the orchestrator",
    )

    pipeline_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Pipeline associated with the Pipeline Run"
    )
    workspace_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Workspace of the Pipeline Run"
    )
    user_id: Optional[Union[UUID, str]] = Field(
        default=None, description="User that created the Pipeline Run"
    )

    stack_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Stack used for the Pipeline Run"
    )
    schedule_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Schedule that triggered the Pipeline Run"
    )
    build_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Build used for the Pipeline Run"
    )
    deployment_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Deployment used for the Pipeline Run"
    )
    code_repository_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Code repository used for the Pipeline Run"
    )

    status: Optional[str] = Field(
        default=None,
        description="Name of the Pipeline Run",
    )
    start_time: Optional[Union[datetime, str]] = Field(
        default=None, description="Start time for this run"
    )
    end_time: Optional[Union[datetime, str]] = Field(
        default=None, description="End time for this run"
    )

    num_steps: Optional[int] = Field(
        default=None,
        description="Amount of steps in the Pipeline Run",
    )

    unlisted: Optional[bool] = None

    def generate_filter(
        self, table: Type["SQLModel"]
    ) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
        """Generate the filter for the query.

        Args:
            table: The Table that is being queried from.

        Returns:
            The filter expression for the query.
        """
        from sqlalchemy import and_
        from sqlmodel import or_

        base_filter = super().generate_filter(table)

        operator = (
            or_ if self.logical_operator == LogicalOperators.OR else and_
        )

        if self.unlisted is not None:
            if self.unlisted is True:
                unlisted_filter = getattr(table, "pipeline_id").is_(None)
            else:
                unlisted_filter = getattr(table, "pipeline_id").is_not(None)

            base_filter = operator(base_filter, unlisted_filter)

        if self.code_repository_id:
            from zenml.zen_stores.schemas import (
                CodeReferenceSchema,
                PipelineDeploymentSchema,
                PipelineRunSchema,
            )

            code_repo_filter = and_(  # type: ignore[type-var]
                PipelineRunSchema.deployment_id == PipelineDeploymentSchema.id,
                PipelineDeploymentSchema.code_reference_id
                == CodeReferenceSchema.id,
                CodeReferenceSchema.code_repository_id
                == self.code_repository_id,
            )

            base_filter = operator(base_filter, code_repo_filter)

        return base_filter
build_id: Union[uuid.UUID, str] pydantic-field

Build used for the Pipeline Run

code_repository_id: Union[uuid.UUID, str] pydantic-field

Code repository used for the Pipeline Run

deployment_id: Union[uuid.UUID, str] pydantic-field

Deployment used for the Pipeline Run

end_time: Union[datetime.datetime, str] pydantic-field

End time for this run

name: str pydantic-field

Name of the Pipeline Run

num_steps: int pydantic-field

Amount of steps in the Pipeline Run

orchestrator_run_id: str pydantic-field

Name of the Pipeline Run within the orchestrator

pipeline_id: Union[uuid.UUID, str] pydantic-field

Pipeline associated with the Pipeline Run

schedule_id: Union[uuid.UUID, str] pydantic-field

Schedule that triggered the Pipeline Run

stack_id: Union[uuid.UUID, str] pydantic-field

Stack used for the Pipeline Run

start_time: Union[datetime.datetime, str] pydantic-field

Start time for this run

status: str pydantic-field

Name of the Pipeline Run

user_id: Union[uuid.UUID, str] pydantic-field

User that created the Pipeline Run

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace of the Pipeline Run

generate_filter(self, table)

Generate the filter for the query.

Parameters:

Name Type Description Default
table Type[SQLModel]

The Table that is being queried from.

required

Returns:

Type Description
Union[BinaryExpression[Any], BooleanClauseList[Any]]

The filter expression for the query.

Source code in zenml/models/pipeline_run_models.py
def generate_filter(
    self, table: Type["SQLModel"]
) -> Union["BinaryExpression[Any]", "BooleanClauseList[Any]"]:
    """Generate the filter for the query.

    Args:
        table: The Table that is being queried from.

    Returns:
        The filter expression for the query.
    """
    from sqlalchemy import and_
    from sqlmodel import or_

    base_filter = super().generate_filter(table)

    operator = (
        or_ if self.logical_operator == LogicalOperators.OR else and_
    )

    if self.unlisted is not None:
        if self.unlisted is True:
            unlisted_filter = getattr(table, "pipeline_id").is_(None)
        else:
            unlisted_filter = getattr(table, "pipeline_id").is_not(None)

        base_filter = operator(base_filter, unlisted_filter)

    if self.code_repository_id:
        from zenml.zen_stores.schemas import (
            CodeReferenceSchema,
            PipelineDeploymentSchema,
            PipelineRunSchema,
        )

        code_repo_filter = and_(  # type: ignore[type-var]
            PipelineRunSchema.deployment_id == PipelineDeploymentSchema.id,
            PipelineDeploymentSchema.code_reference_id
            == CodeReferenceSchema.id,
            CodeReferenceSchema.code_repository_id
            == self.code_repository_id,
        )

        base_filter = operator(base_filter, code_repo_filter)

    return base_filter

PipelineRunRequestModel (PipelineRunBaseModel, WorkspaceScopedRequestModel) pydantic-model

Pipeline run model with user, workspace, pipeline, and stack as UUIDs.

Source code in zenml/models/pipeline_run_models.py
class PipelineRunRequestModel(
    PipelineRunBaseModel, WorkspaceScopedRequestModel
):
    """Pipeline run model with user, workspace, pipeline, and stack as UUIDs."""

    id: UUID
    stack: Optional[UUID]  # Might become None if the stack is deleted.
    pipeline: Optional[UUID]  # Unlisted runs have this as None.
    build: Optional[UUID]
    deployment: Optional[UUID]
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

PipelineRunResponseModel (PipelineRunBaseModel, WorkspaceScopedResponseModel) pydantic-model

Pipeline run model with user, workspace, pipeline, and stack hydrated.

Source code in zenml/models/pipeline_run_models.py
class PipelineRunResponseModel(
    PipelineRunBaseModel, WorkspaceScopedResponseModel
):
    """Pipeline run model with user, workspace, pipeline, and stack hydrated."""

    pipeline: Optional["PipelineResponseModel"] = Field(
        default=None, title="The pipeline this run belongs to."
    )
    stack: Optional["StackResponseModel"] = Field(
        default=None, title="The stack that was used for this run."
    )
    metadata: Dict[str, "RunMetadataResponseModel"] = Field(
        default={},
        title="Metadata associated with this pipeline run.",
    )
    build: Optional["PipelineBuildResponseModel"] = Field(
        default=None, title="The pipeline build that was used for this run."
    )
    deployment: Optional["PipelineDeploymentResponseModel"] = Field(
        default=None, title="The deployment that was used for this run."
    )
    steps: Dict[str, "StepRunResponseModel"] = Field(
        default={}, title="The steps of this run."
    )

    @property
    def artifacts(self) -> List["ArtifactResponseModel"]:
        """Get all artifacts that are outputs of steps of this pipeline run.

        Returns:
            All output artifacts of this pipeline run (including cached ones).
        """
        from zenml.utils.artifact_utils import get_artifacts_of_pipeline_run

        return get_artifacts_of_pipeline_run(self)

    @property
    def produced_artifacts(self) -> List["ArtifactResponseModel"]:
        """Get all artifacts produced during this pipeline run.

        Returns:
            A list of all artifacts produced during this pipeline run.
        """
        from zenml.utils.artifact_utils import get_artifacts_of_pipeline_run

        return get_artifacts_of_pipeline_run(self, only_produced=True)

    def get_step(self, step: str) -> "StepRunResponseModel":
        """(Deprecated) Get a step by name.

        Args:
            step: Name of the step to get.

        Returns:
            The step with the given name.
        """
        from zenml.logger import get_logger

        logger = get_logger(__name__)
        logger.warning(
            "`run.get_step(<step_name>)` is deprecated and will be removed in "
            "a future release. Please use `run.steps[<step_name>]` instead."
        )
        return self.steps[step]
artifacts: List[ArtifactResponseModel] property readonly

Get all artifacts that are outputs of steps of this pipeline run.

Returns:

Type Description
List[ArtifactResponseModel]

All output artifacts of this pipeline run (including cached ones).

produced_artifacts: List[ArtifactResponseModel] property readonly

Get all artifacts produced during this pipeline run.

Returns:

Type Description
List[ArtifactResponseModel]

A list of all artifacts produced during this pipeline run.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_step(self, step)

(Deprecated) Get a step by name.

Parameters:

Name Type Description Default
step str

Name of the step to get.

required

Returns:

Type Description
StepRunResponseModel

The step with the given name.

Source code in zenml/models/pipeline_run_models.py
def get_step(self, step: str) -> "StepRunResponseModel":
    """(Deprecated) Get a step by name.

    Args:
        step: Name of the step to get.

    Returns:
        The step with the given name.
    """
    from zenml.logger import get_logger

    logger = get_logger(__name__)
    logger.warning(
        "`run.get_step(<step_name>)` is deprecated and will be removed in "
        "a future release. Please use `run.steps[<step_name>]` instead."
    )
    return self.steps[step]

PipelineRunUpdateModel (BaseModel) pydantic-model

Pipeline run update model.

Source code in zenml/models/pipeline_run_models.py
class PipelineRunUpdateModel(BaseModel):
    """Pipeline run update model."""

    status: Optional[ExecutionStatus] = None
    end_time: Optional[datetime] = None

role_models

Models representing roles that can be assigned to users or teams.

RoleBaseModel (BaseModel) pydantic-model

Base model for roles.

Source code in zenml/models/role_models.py
class RoleBaseModel(BaseModel):
    """Base model for roles."""

    name: str = Field(
        title="The unique name of the role.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    permissions: Set[PermissionType]

RoleFilterModel (BaseFilterModel) pydantic-model

Model to enable advanced filtering of all Users.

Source code in zenml/models/role_models.py
class RoleFilterModel(BaseFilterModel):
    """Model to enable advanced filtering of all Users."""

    name: Optional[str] = Field(
        default=None,
        description="Name of the role",
    )
name: str pydantic-field

Name of the role

RoleRequestModel (RoleBaseModel, BaseRequestModel) pydantic-model

Request model for roles.

Source code in zenml/models/role_models.py
class RoleRequestModel(RoleBaseModel, BaseRequestModel):
    """Request model for roles."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

RoleResponseModel (RoleBaseModel, BaseResponseModel) pydantic-model

Response model for roles.

Source code in zenml/models/role_models.py
class RoleResponseModel(RoleBaseModel, BaseResponseModel):
    """Response model for roles."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

RoleUpdateModel (RoleRequestModel) pydantic-model

Update model for roles.

Source code in zenml/models/role_models.py
class RoleUpdateModel(RoleRequestModel):
    """Update model for roles."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

run_metadata_models

Models representing run metadata.

RunMetadataBaseModel (BaseModel) pydantic-model

Base model for run metadata.

Source code in zenml/models/run_metadata_models.py
class RunMetadataBaseModel(BaseModel):
    """Base model for run metadata."""

    pipeline_run_id: Optional[UUID] = Field(
        title="The ID of the pipeline run that this metadata belongs to.",
    )
    step_run_id: Optional[UUID]
    artifact_id: Optional[UUID]
    stack_component_id: Optional[UUID]
    key: str = Field(
        title="The key of the metadata.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    value: MetadataType = Field(
        title="The value of the metadata.",
        max_length=TEXT_FIELD_MAX_LENGTH,
    )
    type: MetadataTypeEnum = Field(
        title="The type of the metadata.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

RunMetadataFilterModel (WorkspaceScopedFilterModel) pydantic-model

Model to enable advanced filtering of run metadata.

Source code in zenml/models/run_metadata_models.py
class RunMetadataFilterModel(WorkspaceScopedFilterModel):
    """Model to enable advanced filtering of run metadata."""

    pipeline_run_id: Optional[Union[str, UUID]] = None
    step_run_id: Optional[Union[str, UUID]] = None
    artifact_id: Optional[Union[str, UUID]] = None
    stack_component_id: Optional[Union[str, UUID]] = None
    key: Optional[str] = None
    type: Optional[Union[str, MetadataTypeEnum]] = None

RunMetadataRequestModel (RunMetadataBaseModel, WorkspaceScopedRequestModel) pydantic-model

Request model for run metadata.

Source code in zenml/models/run_metadata_models.py
class RunMetadataRequestModel(
    RunMetadataBaseModel, WorkspaceScopedRequestModel
):
    """Request model for run metadata."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

RunMetadataResponseModel (RunMetadataBaseModel, WorkspaceScopedResponseModel) pydantic-model

Response model for run metadata.

Source code in zenml/models/run_metadata_models.py
class RunMetadataResponseModel(
    RunMetadataBaseModel, WorkspaceScopedResponseModel
):
    """Response model for run metadata."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

schedule_model

Model definition for pipeline run schedules.

ScheduleBaseModel (Schedule, BaseModel) pydantic-model

Domain model for schedules.

Source code in zenml/models/schedule_model.py
class ScheduleBaseModel(Schedule, BaseModel):
    """Domain model for schedules."""

    ANALYTICS_FIELDS: ClassVar[List[str]] = ["id"]

    name: str
    active: bool
    orchestrator_id: Optional[UUID]
    pipeline_id: Optional[UUID]

ScheduleFilterModel (ShareableWorkspaceScopedFilterModel) pydantic-model

Model to enable advanced filtering of all Users.

Source code in zenml/models/schedule_model.py
class ScheduleFilterModel(ShareableWorkspaceScopedFilterModel):
    """Model to enable advanced filtering of all Users."""

    workspace_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Workspace scope of the schedule."
    )
    user_id: Optional[Union[UUID, str]] = Field(
        default=None, description="User that created the schedule"
    )
    pipeline_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Pipeline that the schedule is attached to."
    )
    orchestrator_id: Optional[Union[UUID, str]] = Field(
        default=None,
        description="Orchestrator that the schedule is attached to.",
    )
    active: Optional[bool] = Field(
        default=None,
        description="If the schedule is active",
    )
    cron_expression: Optional[str] = Field(
        default=None,
        description="The cron expression, describing the schedule",
    )
    start_time: Optional[Union[datetime, str]] = Field(
        default=None, description="Start time"
    )
    end_time: Optional[Union[datetime, str]] = Field(
        default=None, description="End time"
    )
    interval_second: Optional[Optional[float]] = Field(
        default=None,
        description="The repetition interval in seconds",
    )
    catchup: Optional[bool] = Field(
        default=None,
        description="Whether or not the schedule is set to catchup past missed "
        "events",
    )
    name: Optional[str] = Field(
        default=None,
        description="Name of the schedule",
    )
active: bool pydantic-field

If the schedule is active

catchup: bool pydantic-field

Whether or not the schedule is set to catchup past missed events

cron_expression: str pydantic-field

The cron expression, describing the schedule

end_time: Union[datetime.datetime, str] pydantic-field

End time

interval_second: float pydantic-field

The repetition interval in seconds

name: str pydantic-field

Name of the schedule

orchestrator_id: Union[uuid.UUID, str] pydantic-field

Orchestrator that the schedule is attached to.

pipeline_id: Union[uuid.UUID, str] pydantic-field

Pipeline that the schedule is attached to.

start_time: Union[datetime.datetime, str] pydantic-field

Start time

user_id: Union[uuid.UUID, str] pydantic-field

User that created the schedule

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace scope of the schedule.

ScheduleRequestModel (ScheduleBaseModel, WorkspaceScopedRequestModel) pydantic-model

Schedule request model.

Source code in zenml/models/schedule_model.py
class ScheduleRequestModel(ScheduleBaseModel, WorkspaceScopedRequestModel):
    """Schedule request model."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

ScheduleResponseModel (ScheduleBaseModel, WorkspaceScopedResponseModel) pydantic-model

Schedule response model with workspace and user hydrated.

Source code in zenml/models/schedule_model.py
class ScheduleResponseModel(ScheduleBaseModel, WorkspaceScopedResponseModel):
    """Schedule response model with workspace and user hydrated."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

ScheduleUpdateModel (BaseModel) pydantic-model

Schedule update model.

Source code in zenml/models/schedule_model.py
class ScheduleUpdateModel(BaseModel):
    """Schedule update model."""

    name: Optional[str] = None
    active: Optional[bool] = None
    cron_expression: Optional[str] = None
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None
    interval_second: Optional[timedelta] = None
    catchup: Optional[bool] = None

secret_models

Models representing secrets.

SecretBaseModel (BaseModel) pydantic-model

Base model for secrets.

Source code in zenml/models/secret_models.py
class SecretBaseModel(BaseModel):
    """Base model for secrets."""

    name: str = Field(
        title="The name of the secret.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

    scope: SecretScope = Field(
        SecretScope.WORKSPACE, title="The scope of the secret."
    )

    values: Dict[str, Optional[SecretStr]] = Field(
        default_factory=dict, title="The values stored in this secret."
    )

    @property
    def secret_values(self) -> Dict[str, str]:
        """A dictionary with all un-obfuscated values stored in this secret.

        The values are returned as strings, not SecretStr. If a value is
        None, it is not included in the returned dictionary. This is to enable
        the use of None values in the update model to indicate that a secret
        value should be deleted.

        Returns:
            A dictionary containing the secret's values.
        """
        return {
            k: v.get_secret_value()
            for k, v in self.values.items()
            if v is not None
        }

    @property
    def has_missing_values(self) -> bool:
        """Returns True if the secret has missing values (i.e. None).

        Values can be missing from a secret for example if the user retrieves a
        secret but does not have the permission to view the secret values.

        Returns:
            True if the secret has any values set to None.
        """
        return any(v is None for v in self.values.values())

    def add_secret(self, key: str, value: str) -> None:
        """Adds a secret value to the secret.

        Args:
            key: The key of the secret value.
            value: The secret value.
        """
        self.values[key] = SecretStr(value)

    def remove_secret(self, key: str) -> None:
        """Removes a secret value from the secret.

        Args:
            key: The key of the secret value.
        """
        del self.values[key]

    def remove_secrets(self) -> None:
        """Removes all secret values from the secret but keep the keys."""
        self.values = {k: None for k in self.values.keys()}
has_missing_values: bool property readonly

Returns True if the secret has missing values (i.e. None).

Values can be missing from a secret for example if the user retrieves a secret but does not have the permission to view the secret values.

Returns:

Type Description
bool

True if the secret has any values set to None.

secret_values: Dict[str, str] property readonly

A dictionary with all un-obfuscated values stored in this secret.

The values are returned as strings, not SecretStr. If a value is None, it is not included in the returned dictionary. This is to enable the use of None values in the update model to indicate that a secret value should be deleted.

Returns:

Type Description
Dict[str, str]

A dictionary containing the secret's values.

add_secret(self, key, value)

Adds a secret value to the secret.

Parameters:

Name Type Description Default
key str

The key of the secret value.

required
value str

The secret value.

required
Source code in zenml/models/secret_models.py
def add_secret(self, key: str, value: str) -> None:
    """Adds a secret value to the secret.

    Args:
        key: The key of the secret value.
        value: The secret value.
    """
    self.values[key] = SecretStr(value)
remove_secret(self, key)

Removes a secret value from the secret.

Parameters:

Name Type Description Default
key str

The key of the secret value.

required
Source code in zenml/models/secret_models.py
def remove_secret(self, key: str) -> None:
    """Removes a secret value from the secret.

    Args:
        key: The key of the secret value.
    """
    del self.values[key]
remove_secrets(self)

Removes all secret values from the secret but keep the keys.

Source code in zenml/models/secret_models.py
def remove_secrets(self) -> None:
    """Removes all secret values from the secret but keep the keys."""
    self.values = {k: None for k in self.values.keys()}

SecretFilterModel (WorkspaceScopedFilterModel) pydantic-model

Model to enable advanced filtering of all Secrets.

Source code in zenml/models/secret_models.py
class SecretFilterModel(WorkspaceScopedFilterModel):
    """Model to enable advanced filtering of all Secrets."""

    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *WorkspaceScopedFilterModel.FILTER_EXCLUDE_FIELDS,
        "values",
    ]

    name: Optional[str] = Field(
        default=None,
        description="Name of the secret",
    )

    scope: Optional[Union[SecretScope, str]] = Field(
        default=None,
        description="Scope in which to filter secrets",
    )

    workspace_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Workspace of the Secret"
    )

    user_id: Optional[Union[UUID, str]] = Field(
        default=None, description="User that created the Secret"
    )

    @staticmethod
    def _get_filtering_value(value: Optional[Any]) -> str:
        """Convert the value to a string that can be used for lexicographical filtering and sorting.

        Args:
            value: The value to convert.

        Returns:
            The value converted to string format that can be used for
            lexicographical sorting and filtering.
        """
        if value is None:
            return ""
        str_value = str(value)
        if isinstance(value, datetime):
            str_value = value.strftime("%Y-%m-%d %H:%M:%S")
        return str_value

    def secret_matches(self, secret: SecretResponseModel) -> bool:
        """Checks if a secret matches the filter criteria.

        Args:
            secret: The secret to check.

        Returns:
            True if the secret matches the filter criteria, False otherwise.
        """
        for filter in self.list_of_filters:
            column_value: Optional[Any] = None
            if filter.column == "workspace_id":
                column_value = secret.workspace.id
            elif filter.column == "user_id":
                column_value = secret.user.id if secret.user else None
            else:
                column_value = getattr(secret, filter.column)

            # Convert the values to strings for lexicographical comparison.
            str_column_value = self._get_filtering_value(column_value)
            str_filter_value = self._get_filtering_value(filter.value)

            # Compare the lexicographical values according to the operation.
            if filter.operation == GenericFilterOps.EQUALS:
                result = str_column_value == str_filter_value
            elif filter.operation == GenericFilterOps.CONTAINS:
                result = str_filter_value in str_column_value
            elif filter.operation == GenericFilterOps.STARTSWITH:
                result = str_column_value.startswith(str_filter_value)
            elif filter.operation == GenericFilterOps.ENDSWITH:
                result = str_column_value.endswith(str_filter_value)
            elif filter.operation == GenericFilterOps.GT:
                result = str_column_value > str_filter_value
            elif filter.operation == GenericFilterOps.GTE:
                result = str_column_value >= str_filter_value
            elif filter.operation == GenericFilterOps.LT:
                result = str_column_value < str_filter_value
            elif filter.operation == GenericFilterOps.LTE:
                result = str_column_value <= str_filter_value

            # Exit early if the result is False for AND and True for OR
            if self.logical_operator == LogicalOperators.AND:
                if not result:
                    return False
            else:
                if result:
                    return True

        # If we get here, all filters have been checked and the result is
        # True for AND and False for OR
        if self.logical_operator == LogicalOperators.AND:
            return True
        else:
            return False

    def sort_secrets(
        self, secrets: List[SecretResponseModel]
    ) -> List[SecretResponseModel]:
        """Sorts a list of secrets according to the filter criteria.

        Args:
            secrets: The list of secrets to sort.

        Returns:
            The sorted list of secrets.
        """
        column, sort_op = self.sorting_params
        sorted_secrets = sorted(
            secrets,
            key=lambda secret: self._get_filtering_value(
                getattr(secret, column)
            ),
            reverse=sort_op == SorterOps.DESCENDING,
        )

        return sorted_secrets
name: str pydantic-field

Name of the secret

scope: Union[zenml.enums.SecretScope, str] pydantic-field

Scope in which to filter secrets

user_id: Union[uuid.UUID, str] pydantic-field

User that created the Secret

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace of the Secret

secret_matches(self, secret)

Checks if a secret matches the filter criteria.

Parameters:

Name Type Description Default
secret SecretResponseModel

The secret to check.

required

Returns:

Type Description
bool

True if the secret matches the filter criteria, False otherwise.

Source code in zenml/models/secret_models.py
def secret_matches(self, secret: SecretResponseModel) -> bool:
    """Checks if a secret matches the filter criteria.

    Args:
        secret: The secret to check.

    Returns:
        True if the secret matches the filter criteria, False otherwise.
    """
    for filter in self.list_of_filters:
        column_value: Optional[Any] = None
        if filter.column == "workspace_id":
            column_value = secret.workspace.id
        elif filter.column == "user_id":
            column_value = secret.user.id if secret.user else None
        else:
            column_value = getattr(secret, filter.column)

        # Convert the values to strings for lexicographical comparison.
        str_column_value = self._get_filtering_value(column_value)
        str_filter_value = self._get_filtering_value(filter.value)

        # Compare the lexicographical values according to the operation.
        if filter.operation == GenericFilterOps.EQUALS:
            result = str_column_value == str_filter_value
        elif filter.operation == GenericFilterOps.CONTAINS:
            result = str_filter_value in str_column_value
        elif filter.operation == GenericFilterOps.STARTSWITH:
            result = str_column_value.startswith(str_filter_value)
        elif filter.operation == GenericFilterOps.ENDSWITH:
            result = str_column_value.endswith(str_filter_value)
        elif filter.operation == GenericFilterOps.GT:
            result = str_column_value > str_filter_value
        elif filter.operation == GenericFilterOps.GTE:
            result = str_column_value >= str_filter_value
        elif filter.operation == GenericFilterOps.LT:
            result = str_column_value < str_filter_value
        elif filter.operation == GenericFilterOps.LTE:
            result = str_column_value <= str_filter_value

        # Exit early if the result is False for AND and True for OR
        if self.logical_operator == LogicalOperators.AND:
            if not result:
                return False
        else:
            if result:
                return True

    # If we get here, all filters have been checked and the result is
    # True for AND and False for OR
    if self.logical_operator == LogicalOperators.AND:
        return True
    else:
        return False
sort_secrets(self, secrets)

Sorts a list of secrets according to the filter criteria.

Parameters:

Name Type Description Default
secrets List[zenml.models.secret_models.SecretResponseModel]

The list of secrets to sort.

required

Returns:

Type Description
List[zenml.models.secret_models.SecretResponseModel]

The sorted list of secrets.

Source code in zenml/models/secret_models.py
def sort_secrets(
    self, secrets: List[SecretResponseModel]
) -> List[SecretResponseModel]:
    """Sorts a list of secrets according to the filter criteria.

    Args:
        secrets: The list of secrets to sort.

    Returns:
        The sorted list of secrets.
    """
    column, sort_op = self.sorting_params
    sorted_secrets = sorted(
        secrets,
        key=lambda secret: self._get_filtering_value(
            getattr(secret, column)
        ),
        reverse=sort_op == SorterOps.DESCENDING,
    )

    return sorted_secrets

SecretRequestModel (SecretBaseModel, WorkspaceScopedRequestModel) pydantic-model

Secret request model.

Source code in zenml/models/secret_models.py
class SecretRequestModel(SecretBaseModel, WorkspaceScopedRequestModel):
    """Secret request model."""

    ANALYTICS_FIELDS: ClassVar[List[str]] = ["scope"]
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

SecretResponseModel (SecretBaseModel, WorkspaceScopedResponseModel) pydantic-model

Secret response model with user and workspace hydrated.

Source code in zenml/models/secret_models.py
class SecretResponseModel(SecretBaseModel, WorkspaceScopedResponseModel):
    """Secret response model with user and workspace hydrated."""

    ANALYTICS_FIELDS: ClassVar[List[str]] = ["scope"]
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

SecretUpdateModel (SecretRequestModel) pydantic-model

Secret update model.

Source code in zenml/models/secret_models.py
class SecretUpdateModel(SecretRequestModel):
    """Secret update model."""

    scope: Optional[SecretScope] = Field(  # type: ignore[assignment]
        default=None, title="The scope of the secret."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

server_models

Model definitions for ZenML servers.

ServerDatabaseType (StrEnum)

Enum for server database types.

Source code in zenml/models/server_models.py
class ServerDatabaseType(StrEnum):
    """Enum for server database types."""

    SQLITE = "sqlite"
    MYSQL = "mysql"
    OTHER = "other"

ServerDeploymentType (StrEnum)

Enum for server deployment types.

Source code in zenml/models/server_models.py
class ServerDeploymentType(StrEnum):
    """Enum for server deployment types."""

    LOCAL = "local"
    DOCKER = "docker"
    KUBERNETES = "kubernetes"
    AWS = "aws"
    GCP = "gcp"
    AZURE = "azure"
    ALPHA = "alpha"
    OTHER = "other"
    HF_SPACES = "hf_spaces"
    SANDBOX = "sandbox"
    CLOUD = "cloud"

ServerModel (BaseModel) pydantic-model

Domain model for ZenML servers.

Source code in zenml/models/server_models.py
class ServerModel(BaseModel):
    """Domain model for ZenML servers."""

    id: UUID = Field(default_factory=uuid4, title="The unique server id.")

    version: str = Field(
        title="The ZenML version that the server is running.",
    )

    debug: bool = Field(
        False, title="Flag to indicate whether ZenML is running on debug mode."
    )

    deployment_type: ServerDeploymentType = Field(
        ServerDeploymentType.OTHER,
        title="The ZenML server deployment type.",
    )
    database_type: ServerDatabaseType = Field(
        ServerDatabaseType.OTHER,
        title="The database type that the server is using.",
    )
    secrets_store_type: SecretsStoreType = Field(
        SecretsStoreType.NONE,
        title="The type of secrets store that the server is using.",
    )

    def is_local(self) -> bool:
        """Return whether the server is running locally.

        Returns:
            True if the server is running locally, False otherwise.
        """
        from zenml.config.global_config import GlobalConfiguration

        # Local ZenML servers are identifiable by the fact that their
        # server ID is the same as the local client (user) ID.
        return self.id == GlobalConfiguration().user_id
is_local(self)

Return whether the server is running locally.

Returns:

Type Description
bool

True if the server is running locally, False otherwise.

Source code in zenml/models/server_models.py
def is_local(self) -> bool:
    """Return whether the server is running locally.

    Returns:
        True if the server is running locally, False otherwise.
    """
    from zenml.config.global_config import GlobalConfiguration

    # Local ZenML servers are identifiable by the fact that their
    # server ID is the same as the local client (user) ID.
    return self.id == GlobalConfiguration().user_id

service_connector_models

Model definitions for ZenML service connectors.

AuthenticationMethodModel (BaseModel) pydantic-model

Authentication method specification.

Describes the schema for the configuration and secrets that need to be provided to configure an authentication method.

Source code in zenml/models/service_connector_models.py
class AuthenticationMethodModel(BaseModel):
    """Authentication method specification.

    Describes the schema for the configuration and secrets that need to be
    provided to configure an authentication method.
    """

    name: str = Field(
        title="User readable name for the authentication method.",
    )
    auth_method: str = Field(
        title="The name of the authentication method.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    description: str = Field(
        default="",
        title="A description of the authentication method.",
    )
    config_schema: Dict[str, Any] = Field(
        default_factory=dict,
        title="The JSON schema of the configuration for this authentication "
        "method.",
    )
    min_expiration_seconds: Optional[int] = Field(
        default=None,
        title="The minimum number of seconds that the authentication "
        "session can be configured to be valid for. Set to None for "
        "authentication sessions and long-lived credentials that don't expire.",
    )
    max_expiration_seconds: Optional[int] = Field(
        default=None,
        title="The maximum number of seconds that the authentication "
        "session can be configured to be valid for. Set to None for "
        "authentication sessions and long-lived credentials that don't expire.",
    )
    default_expiration_seconds: Optional[int] = Field(
        default=None,
        title="The default number of seconds that the authentication "
        "session is valid for. Set to None for authentication sessions and "
        "long-lived credentials that don't expire.",
    )
    _config_class: Optional[Type[BaseModel]] = None

    def __init__(
        self, config_class: Optional[Type[BaseModel]] = None, **values: Any
    ):
        """Initialize the authentication method.

        Args:
            config_class: The configuration class for the authentication
                method.
            **values: The data to initialize the authentication method with.
        """
        if config_class:
            values["config_schema"] = json.loads(config_class.schema_json())
        super().__init__(**values)
        self._config_class = config_class

    @property
    def config_class(self) -> Optional[Type[BaseModel]]:
        """Get the configuration class for the authentication method.

        Returns:
            The configuration class for the authentication method.
        """
        return self._config_class

    def supports_temporary_credentials(self) -> bool:
        """Check if the authentication method supports temporary credentials.

        Returns:
            True if the authentication method supports temporary credentials,
            False otherwise.
        """
        return (
            self.min_expiration_seconds is not None
            or self.max_expiration_seconds is not None
            or self.default_expiration_seconds is not None
        )

    def validate_expiration(
        self, expiration_seconds: Optional[int]
    ) -> Optional[int]:
        """Validate the expiration time.

        Args:
            expiration_seconds: The expiration time in seconds. If None, the
                default expiration time is used, if applicable.

        Returns:
            The expiration time in seconds or None if not applicable.

        Raises:
            ValueError: If the expiration time is not valid.
        """
        if not self.supports_temporary_credentials():
            if expiration_seconds is not None:
                # Expiration is not supported
                raise ValueError(
                    "Expiration time is not supported for this authentication "
                    f"method but a value was provided: {expiration_seconds}"
                )

            return None

        expiration_seconds = (
            expiration_seconds or self.default_expiration_seconds
        )

        if expiration_seconds is None:
            return None

        if self.min_expiration_seconds is not None:
            if expiration_seconds < self.min_expiration_seconds:
                raise ValueError(
                    f"Expiration time must be at least "
                    f"{self.min_expiration_seconds} seconds."
                )

        if self.max_expiration_seconds is not None:
            if expiration_seconds > self.max_expiration_seconds:
                raise ValueError(
                    f"Expiration time must be at most "
                    f"{self.max_expiration_seconds} seconds."
                )

        return expiration_seconds

    class Config:
        """Pydantic config class."""

        underscore_attrs_are_private = True
config_class: Optional[Type[pydantic.main.BaseModel]] property readonly

Get the configuration class for the authentication method.

Returns:

Type Description
Optional[Type[pydantic.main.BaseModel]]

The configuration class for the authentication method.

Config

Pydantic config class.

Source code in zenml/models/service_connector_models.py
class Config:
    """Pydantic config class."""

    underscore_attrs_are_private = True
__init__(self, config_class=None, **values) special

Initialize the authentication method.

Parameters:

Name Type Description Default
config_class Optional[Type[pydantic.main.BaseModel]]

The configuration class for the authentication method.

None
**values Any

The data to initialize the authentication method with.

{}
Source code in zenml/models/service_connector_models.py
def __init__(
    self, config_class: Optional[Type[BaseModel]] = None, **values: Any
):
    """Initialize the authentication method.

    Args:
        config_class: The configuration class for the authentication
            method.
        **values: The data to initialize the authentication method with.
    """
    if config_class:
        values["config_schema"] = json.loads(config_class.schema_json())
    super().__init__(**values)
    self._config_class = config_class
supports_temporary_credentials(self)

Check if the authentication method supports temporary credentials.

Returns:

Type Description
bool

True if the authentication method supports temporary credentials, False otherwise.

Source code in zenml/models/service_connector_models.py
def supports_temporary_credentials(self) -> bool:
    """Check if the authentication method supports temporary credentials.

    Returns:
        True if the authentication method supports temporary credentials,
        False otherwise.
    """
    return (
        self.min_expiration_seconds is not None
        or self.max_expiration_seconds is not None
        or self.default_expiration_seconds is not None
    )
validate_expiration(self, expiration_seconds)

Validate the expiration time.

Parameters:

Name Type Description Default
expiration_seconds Optional[int]

The expiration time in seconds. If None, the default expiration time is used, if applicable.

required

Returns:

Type Description
Optional[int]

The expiration time in seconds or None if not applicable.

Exceptions:

Type Description
ValueError

If the expiration time is not valid.

Source code in zenml/models/service_connector_models.py
def validate_expiration(
    self, expiration_seconds: Optional[int]
) -> Optional[int]:
    """Validate the expiration time.

    Args:
        expiration_seconds: The expiration time in seconds. If None, the
            default expiration time is used, if applicable.

    Returns:
        The expiration time in seconds or None if not applicable.

    Raises:
        ValueError: If the expiration time is not valid.
    """
    if not self.supports_temporary_credentials():
        if expiration_seconds is not None:
            # Expiration is not supported
            raise ValueError(
                "Expiration time is not supported for this authentication "
                f"method but a value was provided: {expiration_seconds}"
            )

        return None

    expiration_seconds = (
        expiration_seconds or self.default_expiration_seconds
    )

    if expiration_seconds is None:
        return None

    if self.min_expiration_seconds is not None:
        if expiration_seconds < self.min_expiration_seconds:
            raise ValueError(
                f"Expiration time must be at least "
                f"{self.min_expiration_seconds} seconds."
            )

    if self.max_expiration_seconds is not None:
        if expiration_seconds > self.max_expiration_seconds:
            raise ValueError(
                f"Expiration time must be at most "
                f"{self.max_expiration_seconds} seconds."
            )

    return expiration_seconds

ResourceTypeModel (BaseModel) pydantic-model

Resource type specification.

Describes the authentication methods and resource instantiation model for one or more resource types.

Source code in zenml/models/service_connector_models.py
class ResourceTypeModel(BaseModel):
    """Resource type specification.

    Describes the authentication methods and resource instantiation model for
    one or more resource types.
    """

    name: str = Field(
        title="User readable name for the resource type.",
    )
    resource_type: str = Field(
        title="Resource type identifier.",
    )
    description: str = Field(
        default="",
        title="A description of the resource type.",
    )
    auth_methods: List[str] = Field(
        title="The list of authentication methods that can be used to access "
        "resources of this type.",
    )
    supports_instances: bool = Field(
        default=False,
        title="Specifies if a single connector instance can be used to access "
        "multiple instances of this resource type. If set to True, the "
        "connector is able to provide a list of resource IDs identifying all "
        "the resources that it can access and a resource ID needs to be "
        "explicitly configured or supplied when access to a resource is "
        "requested. If set to False, a connector instance is only able to "
        "access a single resource and a resource ID is not required to access "
        "the resource.",
    )
    logo_url: Optional[str] = Field(
        default=None,
        title="Optionally, a URL pointing to a png,"
        "svg or jpg file can be attached.",
    )
    emoji: Optional[str] = Field(
        default=None,
        title="Optionally, a python-rich emoji can be attached.",
    )

    @property
    def emojified_resource_type(self) -> str:
        """Get the emojified resource type.

        Returns:
            The emojified resource type.
        """
        if not self.emoji:
            return self.resource_type
        return f"{self.emoji} {self.resource_type}"
emojified_resource_type: str property readonly

Get the emojified resource type.

Returns:

Type Description
str

The emojified resource type.

ServiceConnectorBaseModel (BaseModel) pydantic-model

Base model for service connectors.

Source code in zenml/models/service_connector_models.py
class ServiceConnectorBaseModel(BaseModel):
    """Base model for service connectors."""

    name: str = Field(
        title="The service connector name.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    connector_type: Union[str, "ServiceConnectorTypeModel"] = Field(
        title="The type of service connector.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    description: str = Field(
        default="",
        title="The service connector instance description.",
    )
    auth_method: str = Field(
        title="The authentication method that the connector instance uses to "
        "access the resources.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    resource_types: List[str] = Field(
        default_factory=list,
        title="The type(s) of resource that the connector instance can be used "
        "to gain access to.",
    )
    resource_id: Optional[str] = Field(
        default=None,
        title="Uniquely identifies a specific resource instance that the "
        "connector instance can be used to access. If omitted, the connector "
        "instance can be used to access any and all resource instances that "
        "the authentication method and resource type(s) allow.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    supports_instances: bool = Field(
        default=False,
        title="Indicates whether the connector instance can be used to access "
        "multiple instances of the configured resource type.",
    )
    expires_at: Optional[datetime] = Field(
        default=None,
        title="Time when the authentication credentials configured for the "
        "connector expire. If omitted, the credentials do not expire.",
    )
    expiration_seconds: Optional[int] = Field(
        default=None,
        title="The duration, in seconds, that the temporary credentials "
        "generated by this connector should remain valid. Only applicable for "
        "connectors and authentication methods that involve generating "
        "temporary credentials from the ones configured in the connector.",
    )
    configuration: Dict[str, Any] = Field(
        default_factory=dict,
        title="The service connector configuration, not including secrets.",
    )
    secrets: Dict[str, Optional[SecretStr]] = Field(
        default_factory=dict,
        title="The service connector secrets.",
    )
    labels: Dict[str, str] = Field(
        default_factory=dict,
        title="Service connector labels.",
    )

    @property
    def type(self) -> str:
        """Get the connector type.

        Returns:
            The connector type.
        """
        if isinstance(self.connector_type, str):
            return self.connector_type
        return self.connector_type.connector_type

    @property
    def emojified_connector_type(self) -> str:
        """Get the emojified connector type.

        Returns:
            The emojified connector type.
        """
        if not isinstance(self.connector_type, str):
            return self.connector_type.emojified_connector_type

        return self.connector_type

    @property
    def emojified_resource_types(self) -> List[str]:
        """Get the emojified connector type.

        Returns:
            The emojified connector type.
        """
        if not isinstance(self.connector_type, str):
            return [
                self.connector_type.resource_type_dict[
                    resource_type
                ].emojified_resource_type
                for resource_type in self.resource_types
            ]

        return self.resource_types

    @property
    def is_multi_type(self) -> bool:
        """Checks if the connector is multi-type.

        A multi-type connector can be used to access multiple types of
        resources.

        Returns:
            True if the connector is multi-type, False otherwise.
        """
        return len(self.resource_types) > 1

    @property
    def is_multi_instance(self) -> bool:
        """Checks if the connector is multi-instance.

        A multi-instance connector is configured to access multiple instances
        of the configured resource type.

        Returns:
            True if the connector is multi-instance, False otherwise.
        """
        return (
            not self.is_multi_type
            and self.supports_instances
            and not self.resource_id
        )

    @property
    def is_single_instance(self) -> bool:
        """Checks if the connector is single-instance.

        A single-instance connector is configured to access only a single
        instance of the configured resource type or does not support multiple
        resource instances.

        Returns:
            True if the connector is single-instance, False otherwise.
        """
        return not self.is_multi_type and not self.is_multi_instance

    @property
    def full_configuration(self) -> Dict[str, str]:
        """Get the full connector configuration, including secrets.

        Returns:
            The full connector configuration, including secrets.
        """
        config = self.configuration.copy()
        config.update(
            {k: v.get_secret_value() for k, v in self.secrets.items() if v}
        )
        return config

    def has_expired(self) -> bool:
        """Check if the connector credentials have expired.

        Verify that the authentication credentials associated with the connector
        have not expired by checking the expiration time against the current
        time.

        Returns:
            True if the connector has expired, False otherwise.
        """
        if not self.expires_at:
            return False

        return self.expires_at < datetime.now(timezone.utc)

    def validate_and_configure_resources(
        self,
        connector_type: "ServiceConnectorTypeModel",
        resource_types: Optional[Union[str, List[str]]] = None,
        resource_id: Optional[str] = None,
        configuration: Optional[Dict[str, Any]] = None,
        secrets: Optional[Dict[str, Optional[SecretStr]]] = None,
    ) -> None:
        """Validate and configure the resources that the connector can be used to access.

        Args:
            connector_type: The connector type specification used to validate
                the connector configuration.
            resource_types: The type(s) of resource that the connector instance
                can be used to access. If omitted, a multi-type connector is
                configured.
            resource_id: Uniquely identifies a specific resource instance that
                the connector instance can be used to access.
            configuration: The connector configuration.
            secrets: The connector secrets.

        Raises:
            ValueError: If the connector configuration is not valid.
        """
        if resource_types is None:
            resource_type = None
        elif isinstance(resource_types, str):
            resource_type = resource_types
        elif len(resource_types) == 1:
            resource_type = resource_types[0]
        else:
            # Multiple or no resource types specified
            resource_type = None

        try:
            # Validate the connector configuration and retrieve the resource
            # specification
            (
                auth_method_spec,
                resource_spec,
            ) = connector_type.find_resource_specifications(
                self.auth_method,
                resource_type,
            )
        except (KeyError, ValueError) as e:
            raise ValueError(
                f"connector configuration is not valid: {e}"
            ) from e

        if resource_type and resource_spec:
            self.resource_types = [resource_spec.resource_type]
            self.resource_id = resource_id
            self.supports_instances = resource_spec.supports_instances
        else:
            # A multi-type connector is associated with all resource types
            # that it supports, does not have a resource ID configured
            # and it's unclear if it supports multiple instances or not
            self.resource_types = list(
                connector_type.resource_type_dict.keys()
            )
            self.supports_instances = False

        if configuration is None and secrets is None:
            # No configuration or secrets provided
            return

        self.configuration = {}
        self.secrets = {}

        # Validate and configure the connector configuration and secrets
        configuration = configuration or {}
        secrets = secrets or {}
        supported_attrs = []
        for attr_name, attr_schema in auth_method_spec.config_schema.get(
            "properties", {}
        ).items():
            supported_attrs.append(attr_name)
            required = attr_name in auth_method_spec.config_schema.get(
                "required", []
            )
            secret = attr_schema.get("format", "") == "password"
            value = configuration.get(attr_name, secrets.get(attr_name))
            if required:
                if value is None:
                    raise ValueError(
                        "connector configuration is not valid: missing "
                        f"required attribute '{attr_name}'"
                    )
            elif value is None:
                continue

            # Split the configuration into secrets and non-secrets
            if secret:
                if isinstance(value, SecretStr):
                    self.secrets[attr_name] = value
                else:
                    self.secrets[attr_name] = SecretStr(value)
            else:
                self.configuration[attr_name] = value

        # Warn about attributes that are not part of the configuration schema
        for attr_name in set(list(configuration.keys())) - set(
            supported_attrs
        ):
            logger.warning(
                f"Ignoring unknown attribute in connector '{self.name}' "
                f"configuration {attr_name}. Supported attributes are: "
                f"{supported_attrs}",
            )
        # Warn about secrets that are not part of the configuration schema
        for attr_name in set(secrets.keys()) - self.secrets.keys():
            logger.warning(
                f"Ignoring unknown attribute in connector '{self.name}' "
                f"configuration {attr_name}. Supported attributes are: "
                f"{supported_attrs}",
            )
emojified_connector_type: str property readonly

Get the emojified connector type.

Returns:

Type Description
str

The emojified connector type.

emojified_resource_types: List[str] property readonly

Get the emojified connector type.

Returns:

Type Description
List[str]

The emojified connector type.

full_configuration: Dict[str, str] property readonly

Get the full connector configuration, including secrets.

Returns:

Type Description
Dict[str, str]

The full connector configuration, including secrets.

is_multi_instance: bool property readonly

Checks if the connector is multi-instance.

A multi-instance connector is configured to access multiple instances of the configured resource type.

Returns:

Type Description
bool

True if the connector is multi-instance, False otherwise.

is_multi_type: bool property readonly

Checks if the connector is multi-type.

A multi-type connector can be used to access multiple types of resources.

Returns:

Type Description
bool

True if the connector is multi-type, False otherwise.

is_single_instance: bool property readonly

Checks if the connector is single-instance.

A single-instance connector is configured to access only a single instance of the configured resource type or does not support multiple resource instances.

Returns:

Type Description
bool

True if the connector is single-instance, False otherwise.

type: str property readonly

Get the connector type.

Returns:

Type Description
str

The connector type.

has_expired(self)

Check if the connector credentials have expired.

Verify that the authentication credentials associated with the connector have not expired by checking the expiration time against the current time.

Returns:

Type Description
bool

True if the connector has expired, False otherwise.

Source code in zenml/models/service_connector_models.py
def has_expired(self) -> bool:
    """Check if the connector credentials have expired.

    Verify that the authentication credentials associated with the connector
    have not expired by checking the expiration time against the current
    time.

    Returns:
        True if the connector has expired, False otherwise.
    """
    if not self.expires_at:
        return False

    return self.expires_at < datetime.now(timezone.utc)
validate_and_configure_resources(self, connector_type, resource_types=None, resource_id=None, configuration=None, secrets=None)

Validate and configure the resources that the connector can be used to access.

Parameters:

Name Type Description Default
connector_type ServiceConnectorTypeModel

The connector type specification used to validate the connector configuration.

required
resource_types Union[str, List[str]]

The type(s) of resource that the connector instance can be used to access. If omitted, a multi-type connector is configured.

None
resource_id Optional[str]

Uniquely identifies a specific resource instance that the connector instance can be used to access.

None
configuration Optional[Dict[str, Any]]

The connector configuration.

None
secrets Optional[Dict[str, Union[pydantic.types.SecretStr, NoneType]]]

The connector secrets.

None

Exceptions:

Type Description
ValueError

If the connector configuration is not valid.

Source code in zenml/models/service_connector_models.py
def validate_and_configure_resources(
    self,
    connector_type: "ServiceConnectorTypeModel",
    resource_types: Optional[Union[str, List[str]]] = None,
    resource_id: Optional[str] = None,
    configuration: Optional[Dict[str, Any]] = None,
    secrets: Optional[Dict[str, Optional[SecretStr]]] = None,
) -> None:
    """Validate and configure the resources that the connector can be used to access.

    Args:
        connector_type: The connector type specification used to validate
            the connector configuration.
        resource_types: The type(s) of resource that the connector instance
            can be used to access. If omitted, a multi-type connector is
            configured.
        resource_id: Uniquely identifies a specific resource instance that
            the connector instance can be used to access.
        configuration: The connector configuration.
        secrets: The connector secrets.

    Raises:
        ValueError: If the connector configuration is not valid.
    """
    if resource_types is None:
        resource_type = None
    elif isinstance(resource_types, str):
        resource_type = resource_types
    elif len(resource_types) == 1:
        resource_type = resource_types[0]
    else:
        # Multiple or no resource types specified
        resource_type = None

    try:
        # Validate the connector configuration and retrieve the resource
        # specification
        (
            auth_method_spec,
            resource_spec,
        ) = connector_type.find_resource_specifications(
            self.auth_method,
            resource_type,
        )
    except (KeyError, ValueError) as e:
        raise ValueError(
            f"connector configuration is not valid: {e}"
        ) from e

    if resource_type and resource_spec:
        self.resource_types = [resource_spec.resource_type]
        self.resource_id = resource_id
        self.supports_instances = resource_spec.supports_instances
    else:
        # A multi-type connector is associated with all resource types
        # that it supports, does not have a resource ID configured
        # and it's unclear if it supports multiple instances or not
        self.resource_types = list(
            connector_type.resource_type_dict.keys()
        )
        self.supports_instances = False

    if configuration is None and secrets is None:
        # No configuration or secrets provided
        return

    self.configuration = {}
    self.secrets = {}

    # Validate and configure the connector configuration and secrets
    configuration = configuration or {}
    secrets = secrets or {}
    supported_attrs = []
    for attr_name, attr_schema in auth_method_spec.config_schema.get(
        "properties", {}
    ).items():
        supported_attrs.append(attr_name)
        required = attr_name in auth_method_spec.config_schema.get(
            "required", []
        )
        secret = attr_schema.get("format", "") == "password"
        value = configuration.get(attr_name, secrets.get(attr_name))
        if required:
            if value is None:
                raise ValueError(
                    "connector configuration is not valid: missing "
                    f"required attribute '{attr_name}'"
                )
        elif value is None:
            continue

        # Split the configuration into secrets and non-secrets
        if secret:
            if isinstance(value, SecretStr):
                self.secrets[attr_name] = value
            else:
                self.secrets[attr_name] = SecretStr(value)
        else:
            self.configuration[attr_name] = value

    # Warn about attributes that are not part of the configuration schema
    for attr_name in set(list(configuration.keys())) - set(
        supported_attrs
    ):
        logger.warning(
            f"Ignoring unknown attribute in connector '{self.name}' "
            f"configuration {attr_name}. Supported attributes are: "
            f"{supported_attrs}",
        )
    # Warn about secrets that are not part of the configuration schema
    for attr_name in set(secrets.keys()) - self.secrets.keys():
        logger.warning(
            f"Ignoring unknown attribute in connector '{self.name}' "
            f"configuration {attr_name}. Supported attributes are: "
            f"{supported_attrs}",
        )

ServiceConnectorFilterModel (ShareableWorkspaceScopedFilterModel) pydantic-model

Model to enable advanced filtering of service connectors.

Source code in zenml/models/service_connector_models.py
class ServiceConnectorFilterModel(ShareableWorkspaceScopedFilterModel):
    """Model to enable advanced filtering of service connectors."""

    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *ShareableWorkspaceScopedFilterModel.FILTER_EXCLUDE_FIELDS,
        "scope_type",
        "resource_type",
        "labels_str",
        "labels",
    ]
    CLI_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *ShareableWorkspaceScopedFilterModel.CLI_EXCLUDE_FIELDS,
        "scope_type",
        "labels_str",
        "labels",
    ]
    scope_type: Optional[str] = Field(
        default=None,
        description="The type to scope this query to.",
    )

    is_shared: Optional[Union[bool, str]] = Field(
        default=None,
        description="If the service connector is shared or private",
    )
    name: Optional[str] = Field(
        default=None,
        description="The name to filter by",
    )
    connector_type: Optional[str] = Field(
        default=None,
        description="The type of service connector to filter by",
    )
    workspace_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Workspace to filter by"
    )
    user_id: Optional[Union[UUID, str]] = Field(
        default=None, description="User to filter by"
    )
    auth_method: Optional[str] = Field(
        default=None,
        title="Filter by the authentication method configured for the "
        "connector",
    )
    resource_type: Optional[str] = Field(
        default=None,
        title="Filter by the type of resource that the connector can be used "
        "to access",
    )
    resource_id: Optional[str] = Field(
        default=None,
        title="Filter by the ID of the resource instance that the connector "
        "is configured to access",
    )
    labels_str: Optional[str] = Field(
        default=None,
        title="Filter by one or more labels. This field can be either a JSON "
        "formatted dictionary of label names and values, where the values are "
        'optional and can be set to None (e.g. `{"label1":"value1", "label2": '
        "null}` ), or a comma-separated list of label names and values (e.g "
        "`label1=value1,label2=`. If a label name is specified without a "
        "value, the filter will match all service connectors that have that "
        "label present, regardless of value.",
    )
    secret_id: Optional[Union[UUID, str]] = Field(
        default=None,
        title="Filter by the ID of the secret that contains the service "
        "connector's credentials",
    )

    # Use this internally to configure and access the labels as a dictionary
    labels: Optional[Dict[str, Optional[str]]] = Field(
        default=None,
        title="The labels to filter by, as a dictionary",
    )

    @root_validator
    def validate_labels(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        """Parse the labels string into a label dictionary and vice-versa.

        Args:
            values: The values to validate.

        Returns:
            The validated values.
        """
        labels_str = values.get("labels_str")
        labels = values.get("labels")
        if labels_str is not None:
            try:
                values["labels"] = json.loads(labels_str)
            except json.JSONDecodeError:
                # Interpret as comma-separated values instead
                values["labels"] = {
                    label.split("=", 1)[0]: label.split("=", 1)[1]
                    if "=" in label
                    else None
                    for label in labels_str.split(",")
                }
        elif labels is not None:
            values["labels_str"] = json.dumps(values["labels"])

        return values

    class Config:
        """Pydantic config class."""

        # Exclude the labels field from the serialized response
        # (it is only used internally). The labels_str field is a string
        # representation of the labels that can be used in the API.
        exclude = ["labels"]
connector_type: str pydantic-field

The type of service connector to filter by

is_shared: Union[bool, str] pydantic-field

If the service connector is shared or private

name: str pydantic-field

The name to filter by

scope_type: str pydantic-field

The type to scope this query to.

user_id: Union[uuid.UUID, str] pydantic-field

User to filter by

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace to filter by

Config

Pydantic config class.

Source code in zenml/models/service_connector_models.py
class Config:
    """Pydantic config class."""

    underscore_attrs_are_private = True
validate_labels(values) classmethod

Parse the labels string into a label dictionary and vice-versa.

Parameters:

Name Type Description Default
values Dict[str, Any]

The values to validate.

required

Returns:

Type Description
Dict[str, Any]

The validated values.

Source code in zenml/models/service_connector_models.py
@root_validator
def validate_labels(cls, values: Dict[str, Any]) -> Dict[str, Any]:
    """Parse the labels string into a label dictionary and vice-versa.

    Args:
        values: The values to validate.

    Returns:
        The validated values.
    """
    labels_str = values.get("labels_str")
    labels = values.get("labels")
    if labels_str is not None:
        try:
            values["labels"] = json.loads(labels_str)
        except json.JSONDecodeError:
            # Interpret as comma-separated values instead
            values["labels"] = {
                label.split("=", 1)[0]: label.split("=", 1)[1]
                if "=" in label
                else None
                for label in labels_str.split(",")
            }
    elif labels is not None:
        values["labels_str"] = json.dumps(values["labels"])

    return values

ServiceConnectorRequestModel (ServiceConnectorBaseModel, ShareableRequestModel) pydantic-model

Request model for service connectors.

Source code in zenml/models/service_connector_models.py
class ServiceConnectorRequestModel(
    ServiceConnectorBaseModel, ShareableRequestModel
):
    """Request model for service connectors."""

    ANALYTICS_FIELDS: ClassVar[List[str]] = [
        "connector_type",
        "auth_method",
        "resource_types",
    ]

    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Format the resource types in the analytics metadata.

        Returns:
            Dict of analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        if len(self.resource_types) == 1:
            metadata["resource_types"] = self.resource_types[0]
        else:
            metadata["resource_types"] = ", ".join(self.resource_types)
        if not isinstance(self.connector_type, str):
            metadata["connector_type"] = self.connector_type.connector_type
        return metadata
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_analytics_metadata(self)

Format the resource types in the analytics metadata.

Returns:

Type Description
Dict[str, Any]

Dict of analytics metadata.

Source code in zenml/models/service_connector_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Format the resource types in the analytics metadata.

    Returns:
        Dict of analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    if len(self.resource_types) == 1:
        metadata["resource_types"] = self.resource_types[0]
    else:
        metadata["resource_types"] = ", ".join(self.resource_types)
    if not isinstance(self.connector_type, str):
        metadata["connector_type"] = self.connector_type.connector_type
    return metadata

ServiceConnectorRequirements (BaseModel) pydantic-model

Service connector requirements.

Describes requirements that a service connector consumer has for a service connector instance that it needs in order to access a resource.

Attributes:

Name Type Description
connector_type Optional[str]

The type of service connector that is required. If omitted, any service connector type can be used.

resource_type str

The type of resource that the service connector instance must be able to access.

resource_id_attr Optional[str]

The name of an attribute in the stack component configuration that contains the resource ID of the resource that the service connector instance must be able to access.

Source code in zenml/models/service_connector_models.py
class ServiceConnectorRequirements(BaseModel):
    """Service connector requirements.

    Describes requirements that a service connector consumer has for a
    service connector instance that it needs in order to access a resource.

    Attributes:
        connector_type: The type of service connector that is required. If
            omitted, any service connector type can be used.
        resource_type: The type of resource that the service connector instance
            must be able to access.
        resource_id_attr: The name of an attribute in the stack component
            configuration that contains the resource ID of the resource that
            the service connector instance must be able to access.
    """

    connector_type: Optional[str] = None
    resource_type: str
    resource_id_attr: Optional[str] = None

    def is_satisfied_by(
        self,
        connector: "ServiceConnectorBaseModel",
        component: "ComponentBaseModel",
    ) -> Tuple[bool, str]:
        """Check if the requirements are satisfied by a connector.

        Args:
            connector: The connector to check.
            component: The stack component that the connector is associated
                with.

        Returns:
            True if the requirements are satisfied, False otherwise, and a
            message describing the reason for the failure.
        """
        if self.connector_type and self.connector_type != connector.type:
            return (
                False,
                f"connector type '{connector.type}' does not match the "
                f"'{self.connector_type}' connector type specified in the "
                "stack component requirements",
            )
        if self.resource_type not in connector.resource_types:
            return False, (
                f"connector does not provide the '{self.resource_type}' "
                "resource type specified in the stack component requirements. "
                "Only the following resource types are supported: "
                f"{', '.join(connector.resource_types)}"
            )
        if self.resource_id_attr:
            resource_id = component.configuration.get(self.resource_id_attr)
            if not resource_id:
                return (
                    False,
                    f"the '{self.resource_id_attr}' stack component "
                    f"configuration attribute plays the role of resource "
                    f"identifier, but the stack component does not contain a "
                    f"'{self.resource_id_attr}' attribute. Please add the "
                    f"'{self.resource_id_attr}' attribute to the stack "
                    "component configuration and try again.",
                )

        return True, ""
is_satisfied_by(self, connector, component)

Check if the requirements are satisfied by a connector.

Parameters:

Name Type Description Default
connector ServiceConnectorBaseModel

The connector to check.

required
component ComponentBaseModel

The stack component that the connector is associated with.

required

Returns:

Type Description
Tuple[bool, str]

True if the requirements are satisfied, False otherwise, and a message describing the reason for the failure.

Source code in zenml/models/service_connector_models.py
def is_satisfied_by(
    self,
    connector: "ServiceConnectorBaseModel",
    component: "ComponentBaseModel",
) -> Tuple[bool, str]:
    """Check if the requirements are satisfied by a connector.

    Args:
        connector: The connector to check.
        component: The stack component that the connector is associated
            with.

    Returns:
        True if the requirements are satisfied, False otherwise, and a
        message describing the reason for the failure.
    """
    if self.connector_type and self.connector_type != connector.type:
        return (
            False,
            f"connector type '{connector.type}' does not match the "
            f"'{self.connector_type}' connector type specified in the "
            "stack component requirements",
        )
    if self.resource_type not in connector.resource_types:
        return False, (
            f"connector does not provide the '{self.resource_type}' "
            "resource type specified in the stack component requirements. "
            "Only the following resource types are supported: "
            f"{', '.join(connector.resource_types)}"
        )
    if self.resource_id_attr:
        resource_id = component.configuration.get(self.resource_id_attr)
        if not resource_id:
            return (
                False,
                f"the '{self.resource_id_attr}' stack component "
                f"configuration attribute plays the role of resource "
                f"identifier, but the stack component does not contain a "
                f"'{self.resource_id_attr}' attribute. Please add the "
                f"'{self.resource_id_attr}' attribute to the stack "
                "component configuration and try again.",
            )

    return True, ""

ServiceConnectorResourcesModel (BaseModel) pydantic-model

Service connector resources list.

Lists the resource types and resource instances that a service connector can provide access to.

Source code in zenml/models/service_connector_models.py
class ServiceConnectorResourcesModel(BaseModel):
    """Service connector resources list.

    Lists the resource types and resource instances that a service connector
    can provide access to.
    """

    id: Optional[UUID] = Field(
        default=None,
        title="The ID of the service connector instance providing this "
        "resource.",
    )

    name: Optional[str] = Field(
        default=None,
        title="The name of the service connector instance providing this "
        "resource.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

    connector_type: Union[str, "ServiceConnectorTypeModel"] = Field(
        title="The type of service connector.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

    resources: List[ServiceConnectorTypedResourcesModel] = Field(
        default_factory=list,
        title="The list of resources that the service connector instance can "
        "give access to. Contains one entry for every resource type "
        "that the connector is configured for.",
    )

    error: Optional[str] = Field(
        default=None,
        title="A global error message describing why the service connector "
        "instance could not authenticate to the remote service.",
    )

    @property
    def resources_dict(self) -> Dict[str, ServiceConnectorTypedResourcesModel]:
        """Get the resources as a dictionary indexed by resource type.

        Returns:
            The resources as a dictionary indexed by resource type.
        """
        return {
            resource.resource_type: resource for resource in self.resources
        }

    @property
    def resource_types(self) -> List[str]:
        """Get the resource types.

        Returns:
            The resource types.
        """
        return [resource.resource_type for resource in self.resources]

    def set_error(
        self, error: str, resource_type: Optional[str] = None
    ) -> None:
        """Set a global error message or an error for a single resource type.

        Args:
            error: The error message.
            resource_type: The resource type to set the error message for. If
                omitted, or if there is only one resource type involved, the
                error message is (also) set globally.

        Raises:
            KeyError: If the resource type is not found in the resources list.
        """
        if resource_type:
            resource = self.resources_dict.get(resource_type)
            if not resource:
                raise KeyError(
                    f"resource type '{resource_type}' not found in "
                    "service connector resources list"
                )
            resource.error = error
            resource.resource_ids = None
            if len(self.resources) == 1:
                # If there is only one resource type involved, set the global
                # error message as well.
                self.error = error
        else:
            self.error = error
            for resource in self.resources:
                resource.error = error
                resource.resource_ids = None

    def set_resource_ids(
        self, resource_type: str, resource_ids: List[str]
    ) -> None:
        """Set the resource IDs for a resource type.

        Args:
            resource_type: The resource type to set the resource IDs for.
            resource_ids: The resource IDs to set.

        Raises:
            KeyError: If the resource type is not found in the resources list.
        """
        resource = self.resources_dict.get(resource_type)
        if not resource:
            raise KeyError(
                f"resource type '{resource_type}' not found in "
                "service connector resources list"
            )
        resource.resource_ids = resource_ids
        resource.error = None

    @property
    def type(self) -> str:
        """Get the connector type.

        Returns:
            The connector type.
        """
        if isinstance(self.connector_type, str):
            return self.connector_type
        return self.connector_type.connector_type

    @property
    def emojified_connector_type(self) -> str:
        """Get the emojified connector type.

        Returns:
            The emojified connector type.
        """
        if not isinstance(self.connector_type, str):
            return self.connector_type.emojified_connector_type

        return self.connector_type

    def get_emojified_resource_types(
        self, resource_type: Optional[str] = None
    ) -> List[str]:
        """Get the emojified resource type.

        Args:
            resource_type: The resource type to get the emojified resource type
                for. If omitted, the emojified resource type for all resource
                types is returned.


        Returns:
            The list of emojified resource types.
        """
        if not isinstance(self.connector_type, str):
            if resource_type:
                return [
                    self.connector_type.resource_type_dict[
                        resource_type
                    ].emojified_resource_type
                ]
            return [
                self.connector_type.resource_type_dict[
                    resource_type
                ].emojified_resource_type
                for resource_type in self.resources_dict.keys()
            ]
        if resource_type:
            return [resource_type]
        return list(self.resources_dict.keys())

    def get_default_resource_id(self) -> Optional[str]:
        """Get the default resource ID, if included in the resource list.

        The default resource ID is a resource ID supplied by the connector
        implementation only for resource types that do not support multiple
        instances.

        Returns:
            The default resource ID, or None if no resource ID is set.
        """
        if len(self.resources) != 1:
            # multi-type connectors do not have a default resource ID
            return None

        if isinstance(self.connector_type, str):
            # can't determine default resource ID for unknown connector types
            return None

        resource_type_spec = self.connector_type.resource_type_dict[
            self.resources[0].resource_type
        ]
        if resource_type_spec.supports_instances:
            # resource types that support multiple instances do not have a
            # default resource ID
            return None

        resource_ids = self.resources[0].resource_ids

        if not resource_ids or len(resource_ids) != 1:
            return None

        return resource_ids[0]

    @classmethod
    def from_connector_model(
        cls,
        connector_model: "ServiceConnectorResponseModel",
        resource_type: Optional[str] = None,
    ) -> "ServiceConnectorResourcesModel":
        """Initialize a resource model from a connector model.

        Args:
            connector_model: The connector model.
            resource_type: The resource type to set on the resource model. If
                omitted, the resource type is set according to the connector
                model.

        Returns:
            A resource list model instance.
        """
        resources = cls(
            id=connector_model.id,
            name=connector_model.name,
            connector_type=connector_model.type,
        )

        resource_types = resource_type or connector_model.resource_types
        for resource_type in resource_types:
            resources.resources.append(
                ServiceConnectorTypedResourcesModel(
                    resource_type=resource_type,
                    resource_ids=[connector_model.resource_id]
                    if connector_model.resource_id
                    else None,
                )
            )

        return resources
emojified_connector_type: str property readonly

Get the emojified connector type.

Returns:

Type Description
str

The emojified connector type.

resource_types: List[str] property readonly

Get the resource types.

Returns:

Type Description
List[str]

The resource types.

resources_dict: Dict[str, zenml.models.service_connector_models.ServiceConnectorTypedResourcesModel] property readonly

Get the resources as a dictionary indexed by resource type.

Returns:

Type Description
Dict[str, zenml.models.service_connector_models.ServiceConnectorTypedResourcesModel]

The resources as a dictionary indexed by resource type.

type: str property readonly

Get the connector type.

Returns:

Type Description
str

The connector type.

from_connector_model(connector_model, resource_type=None) classmethod

Initialize a resource model from a connector model.

Parameters:

Name Type Description Default
connector_model ServiceConnectorResponseModel

The connector model.

required
resource_type Optional[str]

The resource type to set on the resource model. If omitted, the resource type is set according to the connector model.

None

Returns:

Type Description
ServiceConnectorResourcesModel

A resource list model instance.

Source code in zenml/models/service_connector_models.py
@classmethod
def from_connector_model(
    cls,
    connector_model: "ServiceConnectorResponseModel",
    resource_type: Optional[str] = None,
) -> "ServiceConnectorResourcesModel":
    """Initialize a resource model from a connector model.

    Args:
        connector_model: The connector model.
        resource_type: The resource type to set on the resource model. If
            omitted, the resource type is set according to the connector
            model.

    Returns:
        A resource list model instance.
    """
    resources = cls(
        id=connector_model.id,
        name=connector_model.name,
        connector_type=connector_model.type,
    )

    resource_types = resource_type or connector_model.resource_types
    for resource_type in resource_types:
        resources.resources.append(
            ServiceConnectorTypedResourcesModel(
                resource_type=resource_type,
                resource_ids=[connector_model.resource_id]
                if connector_model.resource_id
                else None,
            )
        )

    return resources
get_default_resource_id(self)

Get the default resource ID, if included in the resource list.

The default resource ID is a resource ID supplied by the connector implementation only for resource types that do not support multiple instances.

Returns:

Type Description
Optional[str]

The default resource ID, or None if no resource ID is set.

Source code in zenml/models/service_connector_models.py
def get_default_resource_id(self) -> Optional[str]:
    """Get the default resource ID, if included in the resource list.

    The default resource ID is a resource ID supplied by the connector
    implementation only for resource types that do not support multiple
    instances.

    Returns:
        The default resource ID, or None if no resource ID is set.
    """
    if len(self.resources) != 1:
        # multi-type connectors do not have a default resource ID
        return None

    if isinstance(self.connector_type, str):
        # can't determine default resource ID for unknown connector types
        return None

    resource_type_spec = self.connector_type.resource_type_dict[
        self.resources[0].resource_type
    ]
    if resource_type_spec.supports_instances:
        # resource types that support multiple instances do not have a
        # default resource ID
        return None

    resource_ids = self.resources[0].resource_ids

    if not resource_ids or len(resource_ids) != 1:
        return None

    return resource_ids[0]
get_emojified_resource_types(self, resource_type=None)

Get the emojified resource type.

Parameters:

Name Type Description Default
resource_type Optional[str]

The resource type to get the emojified resource type for. If omitted, the emojified resource type for all resource types is returned.

None

Returns:

Type Description
List[str]

The list of emojified resource types.

Source code in zenml/models/service_connector_models.py
def get_emojified_resource_types(
    self, resource_type: Optional[str] = None
) -> List[str]:
    """Get the emojified resource type.

    Args:
        resource_type: The resource type to get the emojified resource type
            for. If omitted, the emojified resource type for all resource
            types is returned.


    Returns:
        The list of emojified resource types.
    """
    if not isinstance(self.connector_type, str):
        if resource_type:
            return [
                self.connector_type.resource_type_dict[
                    resource_type
                ].emojified_resource_type
            ]
        return [
            self.connector_type.resource_type_dict[
                resource_type
            ].emojified_resource_type
            for resource_type in self.resources_dict.keys()
        ]
    if resource_type:
        return [resource_type]
    return list(self.resources_dict.keys())
set_error(self, error, resource_type=None)

Set a global error message or an error for a single resource type.

Parameters:

Name Type Description Default
error str

The error message.

required
resource_type Optional[str]

The resource type to set the error message for. If omitted, or if there is only one resource type involved, the error message is (also) set globally.

None

Exceptions:

Type Description
KeyError

If the resource type is not found in the resources list.

Source code in zenml/models/service_connector_models.py
def set_error(
    self, error: str, resource_type: Optional[str] = None
) -> None:
    """Set a global error message or an error for a single resource type.

    Args:
        error: The error message.
        resource_type: The resource type to set the error message for. If
            omitted, or if there is only one resource type involved, the
            error message is (also) set globally.

    Raises:
        KeyError: If the resource type is not found in the resources list.
    """
    if resource_type:
        resource = self.resources_dict.get(resource_type)
        if not resource:
            raise KeyError(
                f"resource type '{resource_type}' not found in "
                "service connector resources list"
            )
        resource.error = error
        resource.resource_ids = None
        if len(self.resources) == 1:
            # If there is only one resource type involved, set the global
            # error message as well.
            self.error = error
    else:
        self.error = error
        for resource in self.resources:
            resource.error = error
            resource.resource_ids = None
set_resource_ids(self, resource_type, resource_ids)

Set the resource IDs for a resource type.

Parameters:

Name Type Description Default
resource_type str

The resource type to set the resource IDs for.

required
resource_ids List[str]

The resource IDs to set.

required

Exceptions:

Type Description
KeyError

If the resource type is not found in the resources list.

Source code in zenml/models/service_connector_models.py
def set_resource_ids(
    self, resource_type: str, resource_ids: List[str]
) -> None:
    """Set the resource IDs for a resource type.

    Args:
        resource_type: The resource type to set the resource IDs for.
        resource_ids: The resource IDs to set.

    Raises:
        KeyError: If the resource type is not found in the resources list.
    """
    resource = self.resources_dict.get(resource_type)
    if not resource:
        raise KeyError(
            f"resource type '{resource_type}' not found in "
            "service connector resources list"
        )
    resource.resource_ids = resource_ids
    resource.error = None

ServiceConnectorResponseModel (ServiceConnectorBaseModel, ShareableResponseModel) pydantic-model

Response model for service connectors.

Source code in zenml/models/service_connector_models.py
class ServiceConnectorResponseModel(
    ServiceConnectorBaseModel, ShareableResponseModel
):
    """Response model for service connectors."""

    ANALYTICS_FIELDS: ClassVar[List[str]] = [
        "connector_type",
        "auth_method",
        "resource_types",
    ]

    secret_id: Optional[UUID] = Field(
        default=None,
        title="The ID of the secret that contains the service connector "
        "secret configuration values.",
    )

    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Format the resource types in the analytics metadata.

        Returns:
            Dict of analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        if len(self.resource_types) == 1:
            metadata["resource_types"] = self.resource_types[0]
        else:
            metadata["resource_types"] = ", ".join(self.resource_types)
        if not isinstance(self.connector_type, str):
            metadata["connector_type"] = self.connector_type.connector_type
        return metadata
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_analytics_metadata(self)

Format the resource types in the analytics metadata.

Returns:

Type Description
Dict[str, Any]

Dict of analytics metadata.

Source code in zenml/models/service_connector_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Format the resource types in the analytics metadata.

    Returns:
        Dict of analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    if len(self.resource_types) == 1:
        metadata["resource_types"] = self.resource_types[0]
    else:
        metadata["resource_types"] = ", ".join(self.resource_types)
    if not isinstance(self.connector_type, str):
        metadata["connector_type"] = self.connector_type.connector_type
    return metadata

ServiceConnectorTypeModel (BaseModel) pydantic-model

Service connector type specification.

Describes the types of resources to which the service connector can be used to gain access and the authentication methods that are supported by the service connector.

The connector type, resource types, resource IDs and authentication methods can all be used as search criteria to lookup and filter service connector instances that are compatible with the requirements of a consumer (e.g. a stack component).

Source code in zenml/models/service_connector_models.py
class ServiceConnectorTypeModel(BaseModel):
    """Service connector type specification.

    Describes the types of resources to which the service connector can be used
    to gain access and the authentication methods that are supported by the
    service connector.

    The connector type, resource types, resource IDs and authentication
    methods can all be used as search criteria to lookup and filter service
    connector instances that are compatible with the requirements of a consumer
    (e.g. a stack component).
    """

    name: str = Field(
        title="User readable name for the service connector type.",
    )
    connector_type: str = Field(
        title="The type of service connector. It can be used to represent a "
        "generic resource (e.g. Docker, Kubernetes) or a group of different "
        "resources accessible through a common interface or point of access "
        "and authentication (e.g. a cloud provider or a platform).",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    description: str = Field(
        default="",
        title="A description of the service connector.",
    )
    resource_types: List[ResourceTypeModel] = Field(
        title="A list of resource types that the connector can be used to "
        "access.",
    )
    auth_methods: List[AuthenticationMethodModel] = Field(
        title="A list of specifications describing the authentication "
        "methods that are supported by the service connector, along with the "
        "configuration and secrets attributes that need to be configured for "
        "them.",
    )
    supports_auto_configuration: bool = Field(
        default=False,
        title="Models if the connector can be configured automatically based "
        "on information extracted from a local environment.",
    )
    logo_url: Optional[str] = Field(
        default=None,
        title="Optionally, a URL pointing to a png,"
        "svg or jpg can be attached.",
    )
    emoji: Optional[str] = Field(
        default=None,
        title="Optionally, a python-rich emoji can be attached.",
    )
    docs_url: Optional[str] = Field(
        default=None,
        title="Optionally, a URL pointing to docs, within docs.zenml.io.",
    )
    sdk_docs_url: Optional[str] = Field(
        default=None,
        title="Optionally, a URL pointing to SDK docs,"
        "within sdkdocs.zenml.io.",
    )
    local: bool = Field(
        default=True,
        title="If True, the service connector is available locally.",
    )
    remote: bool = Field(
        default=False,
        title="If True, the service connector is available remotely.",
    )
    _connector_class: Optional[Type["ServiceConnector"]] = None

    @property
    def connector_class(self) -> Optional[Type["ServiceConnector"]]:
        """Get the service connector class.

        Returns:
            The service connector class.
        """
        return self._connector_class

    @property
    def emojified_connector_type(self) -> str:
        """Get the emojified connector type.

        Returns:
            The emojified connector type.
        """
        if not self.emoji:
            return self.connector_type
        return f"{self.emoji} {self.connector_type}"

    @property
    def emojified_resource_types(self) -> List[str]:
        """Get the emojified connector types.

        Returns:
            The emojified connector types.
        """
        return [
            resource_type.emojified_resource_type
            for resource_type in self.resource_types
        ]

    def set_connector_class(
        self, connector_class: Type["ServiceConnector"]
    ) -> None:
        """Set the service connector class.

        Args:
            connector_class: The service connector class.
        """
        self._connector_class = connector_class

    @validator("resource_types")
    def validate_resource_types(
        cls, values: List[ResourceTypeModel]
    ) -> List[ResourceTypeModel]:
        """Validate that the resource types are unique.

        Args:
            values: The list of resource types.

        Returns:
            The list of resource types.

        Raises:
            ValueError: If two or more resource type specifications list the
                same resource type.
        """
        # Gather all resource types from the list of resource type
        # specifications.
        resource_types = [r.resource_type for r in values]
        if len(resource_types) != len(set(resource_types)):
            raise ValueError(
                "Two or more resource type specifications must not list "
                "the same resource type."
            )

        return values

    @validator("auth_methods")
    def validate_auth_methods(
        cls, values: List[AuthenticationMethodModel]
    ) -> List[AuthenticationMethodModel]:
        """Validate that the authentication methods are unique.

        Args:
            values: The list of authentication methods.

        Returns:
            The list of authentication methods.

        Raises:
            ValueError: If two or more authentication method specifications
                share the same authentication method value.
        """
        # Gather all auth methods from the list of auth method
        # specifications.
        auth_methods = [a.auth_method for a in values]
        if len(auth_methods) != len(set(auth_methods)):
            raise ValueError(
                "Two or more authentication method specifications must not "
                "share the same authentication method value."
            )

        return values

    @property
    def resource_type_dict(
        self,
    ) -> Dict[str, ResourceTypeModel]:
        """Returns a map of resource types to resource type specifications.

        Returns:
            A map of resource types to resource type specifications.
        """
        return {r.resource_type: r for r in self.resource_types}

    @property
    def auth_method_dict(
        self,
    ) -> Dict[str, AuthenticationMethodModel]:
        """Returns a map of authentication methods to authentication method specifications.

        Returns:
            A map of authentication methods to authentication method
            specifications.
        """
        return {a.auth_method: a for a in self.auth_methods}

    def find_resource_specifications(
        self,
        auth_method: str,
        resource_type: Optional[str] = None,
    ) -> Tuple[AuthenticationMethodModel, Optional[ResourceTypeModel]]:
        """Find the specifications for a configurable resource.

        Validate the supplied connector configuration parameters against the
        connector specification and return the matching authentication method
        specification and resource specification.

        Args:
            auth_method: The name of the authentication method.
            resource_type: The type of resource being configured.

        Returns:
            The authentication method specification and resource specification
            for the specified authentication method and resource type.

        Raises:
            KeyError: If the authentication method is not supported by the
                connector for the specified resource type and ID.
        """
        # Verify the authentication method
        auth_method_dict = self.auth_method_dict
        if auth_method in auth_method_dict:
            # A match was found for the authentication method
            auth_method_spec = auth_method_dict[auth_method]
        else:
            # No match was found for the authentication method
            raise KeyError(
                f"connector type '{self.connector_type}' does not support the "
                f"'{auth_method}' authentication method. Supported "
                f"authentication methods are: {list(auth_method_dict.keys())}."
            )

        if resource_type is None:
            # No resource type was specified, so no resource type
            # specification can be returned.
            return auth_method_spec, None

        # Verify the resource type
        resource_type_dict = self.resource_type_dict
        if resource_type in resource_type_dict:
            resource_type_spec = resource_type_dict[resource_type]
        else:
            raise KeyError(
                f"connector type '{self.connector_type}' does not support "
                f"resource type '{resource_type}'. Supported resource types "
                f"are: {list(resource_type_dict.keys())}."
            )

        if auth_method not in resource_type_spec.auth_methods:
            raise KeyError(
                f"the '{self.connector_type}' connector type does not support "
                f"the '{auth_method}' authentication method for the "
                f"'{resource_type}' resource type. Supported authentication "
                f"methods are: {resource_type_spec.auth_methods}."
            )

        return auth_method_spec, resource_type_spec

    class Config:
        """Pydantic config class."""

        underscore_attrs_are_private = True
auth_method_dict: Dict[str, zenml.models.service_connector_models.AuthenticationMethodModel] property readonly

Returns a map of authentication methods to authentication method specifications.

Returns:

Type Description
Dict[str, zenml.models.service_connector_models.AuthenticationMethodModel]

A map of authentication methods to authentication method specifications.

connector_class: Optional[Type[ServiceConnector]] property readonly

Get the service connector class.

Returns:

Type Description
Optional[Type[ServiceConnector]]

The service connector class.

emojified_connector_type: str property readonly

Get the emojified connector type.

Returns:

Type Description
str

The emojified connector type.

emojified_resource_types: List[str] property readonly

Get the emojified connector types.

Returns:

Type Description
List[str]

The emojified connector types.

resource_type_dict: Dict[str, zenml.models.service_connector_models.ResourceTypeModel] property readonly

Returns a map of resource types to resource type specifications.

Returns:

Type Description
Dict[str, zenml.models.service_connector_models.ResourceTypeModel]

A map of resource types to resource type specifications.

Config

Pydantic config class.

Source code in zenml/models/service_connector_models.py
class Config:
    """Pydantic config class."""

    underscore_attrs_are_private = True
find_resource_specifications(self, auth_method, resource_type=None)

Find the specifications for a configurable resource.

Validate the supplied connector configuration parameters against the connector specification and return the matching authentication method specification and resource specification.

Parameters:

Name Type Description Default
auth_method str

The name of the authentication method.

required
resource_type Optional[str]

The type of resource being configured.

None

Returns:

Type Description
Tuple[zenml.models.service_connector_models.AuthenticationMethodModel, Optional[zenml.models.service_connector_models.ResourceTypeModel]]

The authentication method specification and resource specification for the specified authentication method and resource type.

Exceptions:

Type Description
KeyError

If the authentication method is not supported by the connector for the specified resource type and ID.

Source code in zenml/models/service_connector_models.py
def find_resource_specifications(
    self,
    auth_method: str,
    resource_type: Optional[str] = None,
) -> Tuple[AuthenticationMethodModel, Optional[ResourceTypeModel]]:
    """Find the specifications for a configurable resource.

    Validate the supplied connector configuration parameters against the
    connector specification and return the matching authentication method
    specification and resource specification.

    Args:
        auth_method: The name of the authentication method.
        resource_type: The type of resource being configured.

    Returns:
        The authentication method specification and resource specification
        for the specified authentication method and resource type.

    Raises:
        KeyError: If the authentication method is not supported by the
            connector for the specified resource type and ID.
    """
    # Verify the authentication method
    auth_method_dict = self.auth_method_dict
    if auth_method in auth_method_dict:
        # A match was found for the authentication method
        auth_method_spec = auth_method_dict[auth_method]
    else:
        # No match was found for the authentication method
        raise KeyError(
            f"connector type '{self.connector_type}' does not support the "
            f"'{auth_method}' authentication method. Supported "
            f"authentication methods are: {list(auth_method_dict.keys())}."
        )

    if resource_type is None:
        # No resource type was specified, so no resource type
        # specification can be returned.
        return auth_method_spec, None

    # Verify the resource type
    resource_type_dict = self.resource_type_dict
    if resource_type in resource_type_dict:
        resource_type_spec = resource_type_dict[resource_type]
    else:
        raise KeyError(
            f"connector type '{self.connector_type}' does not support "
            f"resource type '{resource_type}'. Supported resource types "
            f"are: {list(resource_type_dict.keys())}."
        )

    if auth_method not in resource_type_spec.auth_methods:
        raise KeyError(
            f"the '{self.connector_type}' connector type does not support "
            f"the '{auth_method}' authentication method for the "
            f"'{resource_type}' resource type. Supported authentication "
            f"methods are: {resource_type_spec.auth_methods}."
        )

    return auth_method_spec, resource_type_spec
set_connector_class(self, connector_class)

Set the service connector class.

Parameters:

Name Type Description Default
connector_class Type[ServiceConnector]

The service connector class.

required
Source code in zenml/models/service_connector_models.py
def set_connector_class(
    self, connector_class: Type["ServiceConnector"]
) -> None:
    """Set the service connector class.

    Args:
        connector_class: The service connector class.
    """
    self._connector_class = connector_class
validate_auth_methods(values) classmethod

Validate that the authentication methods are unique.

Parameters:

Name Type Description Default
values List[zenml.models.service_connector_models.AuthenticationMethodModel]

The list of authentication methods.

required

Returns:

Type Description
List[zenml.models.service_connector_models.AuthenticationMethodModel]

The list of authentication methods.

Exceptions:

Type Description
ValueError

If two or more authentication method specifications share the same authentication method value.

Source code in zenml/models/service_connector_models.py
@validator("auth_methods")
def validate_auth_methods(
    cls, values: List[AuthenticationMethodModel]
) -> List[AuthenticationMethodModel]:
    """Validate that the authentication methods are unique.

    Args:
        values: The list of authentication methods.

    Returns:
        The list of authentication methods.

    Raises:
        ValueError: If two or more authentication method specifications
            share the same authentication method value.
    """
    # Gather all auth methods from the list of auth method
    # specifications.
    auth_methods = [a.auth_method for a in values]
    if len(auth_methods) != len(set(auth_methods)):
        raise ValueError(
            "Two or more authentication method specifications must not "
            "share the same authentication method value."
        )

    return values
validate_resource_types(values) classmethod

Validate that the resource types are unique.

Parameters:

Name Type Description Default
values List[zenml.models.service_connector_models.ResourceTypeModel]

The list of resource types.

required

Returns:

Type Description
List[zenml.models.service_connector_models.ResourceTypeModel]

The list of resource types.

Exceptions:

Type Description
ValueError

If two or more resource type specifications list the same resource type.

Source code in zenml/models/service_connector_models.py
@validator("resource_types")
def validate_resource_types(
    cls, values: List[ResourceTypeModel]
) -> List[ResourceTypeModel]:
    """Validate that the resource types are unique.

    Args:
        values: The list of resource types.

    Returns:
        The list of resource types.

    Raises:
        ValueError: If two or more resource type specifications list the
            same resource type.
    """
    # Gather all resource types from the list of resource type
    # specifications.
    resource_types = [r.resource_type for r in values]
    if len(resource_types) != len(set(resource_types)):
        raise ValueError(
            "Two or more resource type specifications must not list "
            "the same resource type."
        )

    return values

ServiceConnectorTypedResourcesModel (BaseModel) pydantic-model

Service connector typed resources list.

Lists the resource instances that a service connector can provide access to.

Source code in zenml/models/service_connector_models.py
class ServiceConnectorTypedResourcesModel(BaseModel):
    """Service connector typed resources list.

    Lists the resource instances that a service connector can provide
    access to.
    """

    resource_type: str = Field(
        title="The type of resource that the service connector instance can "
        "be used to access.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

    resource_ids: Optional[List[str]] = Field(
        default=None,
        title="The resource IDs of all resource instances that the service "
        "connector instance can be used to access. Omitted (set to None) for "
        "multi-type service connectors that didn't explicitly request to "
        "fetch resources for all resource types. Also omitted if an error "
        "occurred while listing the resource instances or if no resources are "
        "listed due to authorization issues or lack of permissions (in both "
        "cases the 'error' field is set to an error message). For resource "
        "types that do not support multiple instances, a single resource ID is "
        "listed.",
    )

    error: Optional[str] = Field(
        default=None,
        title="An error message describing why the service connector instance "
        "could not list the resources that it is configured to access.",
    )

ServiceConnectorUpdateModel (ServiceConnectorRequestModel) pydantic-model

Model used for service connector updates.

Most fields in the update model are optional and will not be updated if omitted. However, the following fields are "special" and leaving them out will also cause the corresponding value to be removed from the service connector in the database:

  • the resource_id field
  • the expiration_seconds field

In addition to the above exceptions, the following rules apply:

  • the configuration and secrets fields together represent a full valid configuration update, not just a partial update. If either is set (i.e. not None) in the update, their values are merged together and will replace the existing configuration and secrets values.
  • the secret_id field value in the update is ignored, given that secrets are managed internally by the ZenML store.
  • the labels field is also a full labels update: if set (i.e. not None), all existing labels are removed and replaced by the new labels in the update.
Source code in zenml/models/service_connector_models.py
class ServiceConnectorUpdateModel(ServiceConnectorRequestModel):
    """Model used for service connector updates.

    Most fields in the update model are optional and will not be updated if
    omitted. However, the following fields are "special" and leaving them out
    will also cause the corresponding value to be removed from the service
    connector in the database:

    * the `resource_id` field
    * the `expiration_seconds` field

    In addition to the above exceptions, the following rules apply:

    * the `configuration` and `secrets` fields together represent a full
    valid configuration update, not just a partial update. If either is
    set (i.e. not None) in the update, their values are merged together and
    will replace the existing configuration and secrets values.
    * the `secret_id` field value in the update is ignored, given that
    secrets are managed internally by the ZenML store.
    * the `labels` field is also a full labels update: if set (i.e. not
    `None`), all existing labels are removed and replaced by the new labels
    in the update.
    """

    resource_types: Optional[List[str]] = Field(  # type: ignore[assignment]
        default=None,
        title="The type(s) of resource that the connector instance can be used "
        "to gain access to.",
    )
    configuration: Optional[Dict[str, Any]] = Field(  # type: ignore[assignment]
        default=None,
        title="The service connector configuration, not including secrets.",
    )
    secrets: Optional[Dict[str, Optional[SecretStr]]] = Field(  # type: ignore[assignment]
        default=None,
        title="The service connector secrets.",
    )
    labels: Optional[Dict[str, str]] = Field(  # type: ignore[assignment]
        default=None,
        title="Service connector labels.",
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

stack_models

Models representing stacks.

StackBaseModel (BaseModel) pydantic-model

Base model for stacks.

Source code in zenml/models/stack_models.py
class StackBaseModel(BaseModel):
    """Base model for stacks."""

    name: str = Field(
        title="The name of the stack.", max_length=STR_FIELD_MAX_LENGTH
    )
    description: str = Field(
        default="",
        title="The description of the stack",
        max_length=STR_FIELD_MAX_LENGTH,
    )

StackFilterModel (ShareableWorkspaceScopedFilterModel) pydantic-model

Model to enable advanced filtering of all StackModels.

The Stack Model needs additional scoping. As such the _scope_user field can be set to the user that is doing the filtering. The generate_filter() method of the baseclass is overwritten to include the scoping.

Source code in zenml/models/stack_models.py
class StackFilterModel(ShareableWorkspaceScopedFilterModel):
    """Model to enable advanced filtering of all StackModels.

    The Stack Model needs additional scoping. As such the `_scope_user` field
    can be set to the user that is doing the filtering. The
    `generate_filter()` method of the baseclass is overwritten to include the
    scoping.
    """

    # `component_id` refers to a relationship through a link-table
    #  rather than a field in the db, hence it needs to be handled
    #  explicitly
    FILTER_EXCLUDE_FIELDS: ClassVar[List[str]] = [
        *ShareableWorkspaceScopedFilterModel.FILTER_EXCLUDE_FIELDS,
        "component_id",  # This is a relationship, not a field
    ]

    is_shared: Optional[Union[bool, str]] = Field(
        default=None, description="If the stack is shared or private"
    )
    name: Optional[str] = Field(
        default=None,
        description="Name of the stack",
    )
    description: Optional[str] = Field(
        default=None, description="Description of the stack"
    )
    workspace_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Workspace of the stack"
    )
    user_id: Optional[Union[UUID, str]] = Field(
        default=None, description="User of the stack"
    )
    component_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Component in the stack"
    )
component_id: Union[uuid.UUID, str] pydantic-field

Component in the stack

description: str pydantic-field

Description of the stack

is_shared: Union[bool, str] pydantic-field

If the stack is shared or private

name: str pydantic-field

Name of the stack

user_id: Union[uuid.UUID, str] pydantic-field

User of the stack

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace of the stack

StackRequestModel (StackBaseModel, ShareableRequestModel) pydantic-model

Stack model with components, user and workspace as UUIDs.

Source code in zenml/models/stack_models.py
class StackRequestModel(StackBaseModel, ShareableRequestModel):
    """Stack model with components, user and workspace as UUIDs."""

    components: Optional[Dict[StackComponentType, List[UUID]]] = Field(
        default=None,
        title="A mapping of stack component types to the actual"
        "instances of components of this type.",
    )

    @property
    def is_valid(self) -> bool:
        """Check if the stack is valid.

        Returns:
            True if the stack is valid, False otherwise.
        """
        if not self.components:
            return False
        return (
            StackComponentType.ARTIFACT_STORE in self.components
            and StackComponentType.ORCHESTRATOR in self.components
        )
is_valid: bool property readonly

Check if the stack is valid.

Returns:

Type Description
bool

True if the stack is valid, False otherwise.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

StackResponseModel (StackBaseModel, ShareableResponseModel) pydantic-model

Stack model with Components, User and Workspace fully hydrated.

Source code in zenml/models/stack_models.py
class StackResponseModel(StackBaseModel, ShareableResponseModel):
    """Stack model with Components, User and Workspace fully hydrated."""

    components: Dict[StackComponentType, List[ComponentResponseModel]] = Field(
        title="A mapping of stack component types to the actual"
        "instances of components of this type."
    )

    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Add the stack components to the stack analytics metadata.

        Returns:
            Dict of analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        metadata.update({ct: c[0].flavor for ct, c in self.components.items()})
        return metadata

    @property
    def is_valid(self) -> bool:
        """Check if the stack is valid.

        Returns:
            True if the stack is valid, False otherwise.
        """
        return (
            StackComponentType.ARTIFACT_STORE in self.components
            and StackComponentType.ORCHESTRATOR in self.components
        )

    def to_yaml(self) -> Dict[str, Any]:
        """Create yaml representation of the Stack Model.

        Returns:
            The yaml representation of the Stack Model.
        """
        component_data = {}
        for component_type, components_list in self.components.items():
            component = components_list[0]
            component_dict = json.loads(
                component.json(
                    include={"name", "type", "flavor", "configuration"}
                )
            )
            component_data[component_type.value] = component_dict

        # write zenml version and stack dict to YAML
        yaml_data = {
            "stack_name": self.name,
            "components": component_data,
        }

        return yaml_data
is_valid: bool property readonly

Check if the stack is valid.

Returns:

Type Description
bool

True if the stack is valid, False otherwise.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_analytics_metadata(self)

Add the stack components to the stack analytics metadata.

Returns:

Type Description
Dict[str, Any]

Dict of analytics metadata.

Source code in zenml/models/stack_models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Add the stack components to the stack analytics metadata.

    Returns:
        Dict of analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    metadata.update({ct: c[0].flavor for ct, c in self.components.items()})
    return metadata
to_yaml(self)

Create yaml representation of the Stack Model.

Returns:

Type Description
Dict[str, Any]

The yaml representation of the Stack Model.

Source code in zenml/models/stack_models.py
def to_yaml(self) -> Dict[str, Any]:
    """Create yaml representation of the Stack Model.

    Returns:
        The yaml representation of the Stack Model.
    """
    component_data = {}
    for component_type, components_list in self.components.items():
        component = components_list[0]
        component_dict = json.loads(
            component.json(
                include={"name", "type", "flavor", "configuration"}
            )
        )
        component_data[component_type.value] = component_dict

    # write zenml version and stack dict to YAML
    yaml_data = {
        "stack_name": self.name,
        "components": component_data,
    }

    return yaml_data

StackUpdateModel (StackRequestModel) pydantic-model

The update model for stacks.

Source code in zenml/models/stack_models.py
class StackUpdateModel(StackRequestModel):
    """The update model for stacks."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

step_run_models

Models representing steps of pipeline runs.

StepRunBaseModel (BaseModel) pydantic-model

Base model for step runs.

Source code in zenml/models/step_run_models.py
class StepRunBaseModel(BaseModel):
    """Base model for step runs."""

    name: str = Field(
        title="The name of the pipeline run step.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    config: StepConfiguration = Field(title="The configuration of the step.")
    spec: StepSpec = Field(title="The spec of the step.")
    pipeline_run_id: UUID = Field(
        title="The ID of the pipeline run that this step run belongs to.",
    )
    original_step_run_id: Optional[UUID] = Field(
        title="The ID of the original step run if this step was cached.",
        default=None,
    )
    status: ExecutionStatus = Field(title="The status of the step.")
    parent_step_ids: List[UUID] = Field(
        title="The IDs of the parent steps of this step run.",
        default_factory=list,
    )
    cache_key: Optional[str] = Field(
        title="The cache key of the step run.",
        default=None,
        max_length=STR_FIELD_MAX_LENGTH,
    )
    docstring: Optional[str] = Field(
        title="The docstring of the step function or class.",
        default=None,
        max_length=TEXT_FIELD_MAX_LENGTH,
    )
    source_code: Optional[str] = Field(
        title="The source code of the step function or class.",
        default=None,
        max_length=TEXT_FIELD_MAX_LENGTH,
    )
    start_time: Optional[datetime] = Field(
        title="The start time of the step run.",
        default=None,
    )
    end_time: Optional[datetime] = Field(
        title="The end time of the step run.",
        default=None,
    )

StepRunFilterModel (WorkspaceScopedFilterModel) pydantic-model

Model to enable advanced filtering of all Artifacts.

Source code in zenml/models/step_run_models.py
class StepRunFilterModel(WorkspaceScopedFilterModel):
    """Model to enable advanced filtering of all Artifacts."""

    name: Optional[str] = Field(
        default=None,
        description="Name of the step run",
    )
    entrypoint_name: Optional[str] = Field(
        default=None,
        description="Entrypoint name of the step run",
    )
    code_hash: Optional[str] = Field(
        default=None,
        description="Code hash for this step run",
    )
    cache_key: Optional[str] = Field(
        default=None,
        description="Cache key for this step run",
    )
    status: Optional[str] = Field(
        default=None,
        description="Status of the Step Run",
    )
    start_time: Optional[Union[datetime, str]] = Field(
        default=None, description="Start time for this run"
    )
    end_time: Optional[Union[datetime, str]] = Field(
        default=None, description="End time for this run"
    )
    pipeline_run_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Pipeline run of this step run"
    )
    original_step_run_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Original id for this step run"
    )
    user_id: Optional[Union[UUID, str]] = Field(
        default=None, description="User that produced this step run"
    )
    workspace_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Workspace of this step run"
    )
    num_outputs: Optional[int] = Field(
        default=None,
        description="Amount of outputs for this Step Run",
    )
cache_key: str pydantic-field

Cache key for this step run

code_hash: str pydantic-field

Code hash for this step run

end_time: Union[datetime.datetime, str] pydantic-field

End time for this run

entrypoint_name: str pydantic-field

Entrypoint name of the step run

name: str pydantic-field

Name of the step run

num_outputs: int pydantic-field

Amount of outputs for this Step Run

original_step_run_id: Union[uuid.UUID, str] pydantic-field

Original id for this step run

pipeline_run_id: Union[uuid.UUID, str] pydantic-field

Pipeline run of this step run

start_time: Union[datetime.datetime, str] pydantic-field

Start time for this run

status: str pydantic-field

Status of the Step Run

user_id: Union[uuid.UUID, str] pydantic-field

User that produced this step run

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace of this step run

StepRunRequestModel (StepRunBaseModel, WorkspaceScopedRequestModel) pydantic-model

Request model for step runs.

Source code in zenml/models/step_run_models.py
class StepRunRequestModel(StepRunBaseModel, WorkspaceScopedRequestModel):
    """Request model for step runs."""

    inputs: Dict[str, UUID] = Field(
        title="The IDs of the input artifacts of the step run.",
        default={},
    )
    outputs: Dict[str, UUID] = Field(
        title="The IDs of the output artifacts of the step run.",
        default={},
    )
    logs: Optional["LogsRequestModel"] = Field(
        title="Logs associated with this step run.",
        default=None,
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

StepRunResponseModel (StepRunBaseModel, WorkspaceScopedResponseModel) pydantic-model

Response model for step runs.

Source code in zenml/models/step_run_models.py
class StepRunResponseModel(StepRunBaseModel, WorkspaceScopedResponseModel):
    """Response model for step runs."""

    inputs: Dict[str, "ArtifactResponseModel"] = Field(
        title="The input artifacts of the step run.",
        default={},
    )
    outputs: Dict[str, "ArtifactResponseModel"] = Field(
        title="The output artifacts of the step run.",
        default={},
    )
    metadata: Dict[str, "RunMetadataResponseModel"] = Field(
        title="Metadata associated with this step run.",
        default={},
    )
    logs: Optional["LogsResponseModel"] = Field(
        title="Logs associated with this step run.",
        default=None,
    )

    @property
    def run(self) -> "PipelineRunResponseModel":
        """Returns the pipeline run that this step run belongs to.

        Returns:
            The pipeline run.
        """
        from zenml.client import Client

        return Client().get_pipeline_run(self.pipeline_run_id)

    @property
    def parent_steps(self) -> List["StepRunResponseModel"]:
        """Returns the parent (upstream) steps of this step run.

        Returns:
            The parent steps.
        """
        from zenml.client import Client

        return [
            Client().get_run_step(step_id) for step_id in self.parent_step_ids
        ]

    @property
    def input(self) -> "ArtifactResponseModel":
        """Returns the input artifact that was used to run this step.

        Returns:
            The input artifact.

        Raises:
            ValueError: If there were zero or multiple inputs to this step.
        """
        if not self.inputs:
            raise ValueError(f"Step {self.name} has no inputs.")
        if len(self.inputs) > 1:
            raise ValueError(
                f"Step {self.name} has multiple inputs, so `Step.input` is "
                "ambiguous. Please use `Step.inputs` instead."
            )
        return next(iter(self.inputs.values()))

    @property
    def output(self) -> "ArtifactResponseModel":
        """Returns the output artifact that was written by this step.

        Returns:
            The output artifact.

        Raises:
            ValueError: If there were zero or multiple step outputs.
        """
        if not self.outputs:
            raise ValueError(f"Step {self.name} has no outputs.")
        if len(self.outputs) > 1:
            raise ValueError(
                f"Step {self.name} has multiple outputs, so `Step.output` is "
                "ambiguous. Please use `Step.outputs` instead."
            )
        return next(iter(self.outputs.values()))
input: ArtifactResponseModel property readonly

Returns the input artifact that was used to run this step.

Returns:

Type Description
ArtifactResponseModel

The input artifact.

Exceptions:

Type Description
ValueError

If there were zero or multiple inputs to this step.

output: ArtifactResponseModel property readonly

Returns the output artifact that was written by this step.

Returns:

Type Description
ArtifactResponseModel

The output artifact.

Exceptions:

Type Description
ValueError

If there were zero or multiple step outputs.

parent_steps: List[StepRunResponseModel] property readonly

Returns the parent (upstream) steps of this step run.

Returns:

Type Description
List[StepRunResponseModel]

The parent steps.

run: PipelineRunResponseModel property readonly

Returns the pipeline run that this step run belongs to.

Returns:

Type Description
PipelineRunResponseModel

The pipeline run.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

StepRunUpdateModel (BaseModel) pydantic-model

Update model for step runs.

Source code in zenml/models/step_run_models.py
class StepRunUpdateModel(BaseModel):
    """Update model for step runs."""

    outputs: Dict[str, UUID] = Field(
        title="The IDs of the output artifacts of the step run.",
        default={},
    )
    status: Optional[ExecutionStatus] = Field(
        title="The status of the step.",
        default=None,
    )
    end_time: Optional[datetime] = Field(
        title="The end time of the step run.",
        default=None,
    )

team_models

Models representing teams.

TeamBaseModel (BaseModel) pydantic-model

Base model for teams.

Source code in zenml/models/team_models.py
class TeamBaseModel(BaseModel):
    """Base model for teams."""

    name: str = Field(
        title="The unique name of the team.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

TeamFilterModel (BaseFilterModel) pydantic-model

Model to enable advanced filtering of all Teams.

Source code in zenml/models/team_models.py
class TeamFilterModel(BaseFilterModel):
    """Model to enable advanced filtering of all Teams."""

    name: Optional[str] = Field(
        default=None,
        description="Name of the team",
    )
name: str pydantic-field

Name of the team

TeamRequestModel (TeamBaseModel, BaseRequestModel) pydantic-model

Request model for teams.

Source code in zenml/models/team_models.py
class TeamRequestModel(TeamBaseModel, BaseRequestModel):
    """Request model for teams."""

    users: Optional[List[UUID]] = Field(
        default=None, title="The list of users within this team."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

TeamResponseModel (TeamBaseModel, BaseResponseModel) pydantic-model

Response model for teams.

Source code in zenml/models/team_models.py
class TeamResponseModel(TeamBaseModel, BaseResponseModel):
    """Response model for teams."""

    users: List["UserResponseModel"] = Field(
        title="The list of users within this team."
    )

    @property
    def user_ids(self) -> List[UUID]:
        """Returns a list of user IDs that are part of this team.

        Returns:
            A list of user IDs.
        """
        if self.users:
            return [u.id for u in self.users]
        else:
            return []

    @property
    def user_names(self) -> List[str]:
        """Returns a list names of users that are part of this team.

        Returns:
            A list of names of users.
        """
        if self.users:
            return [u.name for u in self.users]
        else:
            return []
user_ids: List[uuid.UUID] property readonly

Returns a list of user IDs that are part of this team.

Returns:

Type Description
List[uuid.UUID]

A list of user IDs.

user_names: List[str] property readonly

Returns a list names of users that are part of this team.

Returns:

Type Description
List[str]

A list of names of users.

__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

TeamUpdateModel (TeamRequestModel) pydantic-model

Update model for teams.

Source code in zenml/models/team_models.py
class TeamUpdateModel(TeamRequestModel):
    """Update model for teams."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

team_role_assignment_models

Models representing role assignments.

TeamRoleAssignmentBaseModel (BaseModel) pydantic-model

Base model for role assignments.

Source code in zenml/models/team_role_assignment_models.py
class TeamRoleAssignmentBaseModel(BaseModel):
    """Base model for role assignments."""

TeamRoleAssignmentFilterModel (BaseFilterModel) pydantic-model

Model to enable advanced filtering of all Role Assignments.

Source code in zenml/models/team_role_assignment_models.py
class TeamRoleAssignmentFilterModel(BaseFilterModel):
    """Model to enable advanced filtering of all Role Assignments."""

    workspace_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Workspace of the RoleAssignment"
    )
    team_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Team in the RoleAssignment"
    )
    role_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Role in the RoleAssignment"
    )
role_id: Union[uuid.UUID, str] pydantic-field

Role in the RoleAssignment

team_id: Union[uuid.UUID, str] pydantic-field

Team in the RoleAssignment

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace of the RoleAssignment

TeamRoleAssignmentRequestModel (TeamRoleAssignmentBaseModel, BaseRequestModel) pydantic-model

Request model for role assignments using UUIDs for all entities.

Source code in zenml/models/team_role_assignment_models.py
class TeamRoleAssignmentRequestModel(
    TeamRoleAssignmentBaseModel, BaseRequestModel
):
    """Request model for role assignments using UUIDs for all entities."""

    workspace: Optional[UUID] = Field(
        default=None, title="The workspace that the role is limited to."
    )
    team: UUID = Field(
        title="The user that the role is assigned to.",
    )

    role: UUID = Field(title="The role.")
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

TeamRoleAssignmentResponseModel (TeamRoleAssignmentBaseModel, BaseResponseModel) pydantic-model

Response model for role assignments with all entities hydrated.

Source code in zenml/models/team_role_assignment_models.py
class TeamRoleAssignmentResponseModel(
    TeamRoleAssignmentBaseModel, BaseResponseModel
):
    """Response model for role assignments with all entities hydrated."""

    workspace: Optional["WorkspaceResponseModel"] = Field(
        title="The workspace scope of this role assignment.", default=None
    )
    team: Optional["TeamResponseModel"] = Field(
        title="The team the role is assigned to.", default=None
    )
    role: Optional["RoleResponseModel"] = Field(
        title="The assigned role.", default=None
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

user_models

Models representing users.

JWTToken (BaseModel) pydantic-model

Pydantic object representing a JWT token.

Attributes:

Name Type Description
token_type JWTTokenType

The type of token.

user_id UUID

The id of the authenticated User

permissions List[str]

The permissions scope of the authenticated user

Source code in zenml/models/user_models.py
class JWTToken(BaseModel):
    """Pydantic object representing a JWT token.

    Attributes:
        token_type: The type of token.
        user_id: The id of the authenticated User
        permissions: The permissions scope of the authenticated user
    """

    JWT_ALGORITHM: ClassVar[str] = "HS256"

    token_type: JWTTokenType
    user_id: UUID
    permissions: List[str]

    @classmethod
    def decode(cls, token_type: JWTTokenType, token: str) -> "JWTToken":
        """Decodes a JWT access token.

        Decodes a JWT access token and returns a `JWTToken` object with the
        information retrieved from its subject claim.

        Args:
            token_type: The type of token.
            token: The encoded JWT token.

        Returns:
            The decoded JWT access token.

        Raises:
            AuthorizationException: If the token is invalid.
        """
        # import here to keep these dependencies out of the client
        from jose import JWTError, jwt

        try:
            payload = jwt.decode(
                token,
                GlobalConfiguration().jwt_secret_key,
                algorithms=[cls.JWT_ALGORITHM],
            )
        except JWTError as e:
            raise AuthorizationException(f"Invalid JWT token: {e}") from e

        subject: str = payload.get("sub")
        if subject is None:
            raise AuthorizationException(
                "Invalid JWT token: the subject claim is missing"
            )
        permissions: List[str] = payload.get("permissions")
        if permissions is None:
            raise AuthorizationException(
                "Invalid JWT token: the permissions scope is missing"
            )

        try:
            return cls(
                token_type=token_type,
                user_id=UUID(subject),
                permissions=set(permissions),
            )
        except ValueError as e:
            raise AuthorizationException(
                f"Invalid JWT token: could not decode subject claim: {e}"
            ) from e

    def encode(self, expire_minutes: Optional[int] = None) -> str:
        """Creates a JWT access token.

        Generates and returns a JWT access token with the subject claim set to
        contain the information in this Pydantic object.

        Args:
            expire_minutes: Number of minutes the token should be valid. If not
                provided, the token will not be set to expire.

        Returns:
            The generated access token.
        """
        # import here to keep these dependencies out of the client
        from jose import jwt

        claims: Dict[str, Any] = {
            "sub": str(self.user_id),
            "permissions": list(self.permissions),
        }

        if expire_minutes:
            expire = datetime.utcnow() + timedelta(minutes=expire_minutes)
            claims["exp"] = expire

        token: str = jwt.encode(
            claims,
            GlobalConfiguration().jwt_secret_key,
            algorithm=self.JWT_ALGORITHM,
        )
        return token
decode(token_type, token) classmethod

Decodes a JWT access token.

Decodes a JWT access token and returns a JWTToken object with the information retrieved from its subject claim.

Parameters:

Name Type Description Default
token_type JWTTokenType

The type of token.

required
token str

The encoded JWT token.

required

Returns:

Type Description
JWTToken

The decoded JWT access token.

Exceptions:

Type Description
AuthorizationException

If the token is invalid.

Source code in zenml/models/user_models.py
@classmethod
def decode(cls, token_type: JWTTokenType, token: str) -> "JWTToken":
    """Decodes a JWT access token.

    Decodes a JWT access token and returns a `JWTToken` object with the
    information retrieved from its subject claim.

    Args:
        token_type: The type of token.
        token: The encoded JWT token.

    Returns:
        The decoded JWT access token.

    Raises:
        AuthorizationException: If the token is invalid.
    """
    # import here to keep these dependencies out of the client
    from jose import JWTError, jwt

    try:
        payload = jwt.decode(
            token,
            GlobalConfiguration().jwt_secret_key,
            algorithms=[cls.JWT_ALGORITHM],
        )
    except JWTError as e:
        raise AuthorizationException(f"Invalid JWT token: {e}") from e

    subject: str = payload.get("sub")
    if subject is None:
        raise AuthorizationException(
            "Invalid JWT token: the subject claim is missing"
        )
    permissions: List[str] = payload.get("permissions")
    if permissions is None:
        raise AuthorizationException(
            "Invalid JWT token: the permissions scope is missing"
        )

    try:
        return cls(
            token_type=token_type,
            user_id=UUID(subject),
            permissions=set(permissions),
        )
    except ValueError as e:
        raise AuthorizationException(
            f"Invalid JWT token: could not decode subject claim: {e}"
        ) from e
encode(self, expire_minutes=None)

Creates a JWT access token.

Generates and returns a JWT access token with the subject claim set to contain the information in this Pydantic object.

Parameters:

Name Type Description Default
expire_minutes Optional[int]

Number of minutes the token should be valid. If not provided, the token will not be set to expire.

None

Returns:

Type Description
str

The generated access token.

Source code in zenml/models/user_models.py
def encode(self, expire_minutes: Optional[int] = None) -> str:
    """Creates a JWT access token.

    Generates and returns a JWT access token with the subject claim set to
    contain the information in this Pydantic object.

    Args:
        expire_minutes: Number of minutes the token should be valid. If not
            provided, the token will not be set to expire.

    Returns:
        The generated access token.
    """
    # import here to keep these dependencies out of the client
    from jose import jwt

    claims: Dict[str, Any] = {
        "sub": str(self.user_id),
        "permissions": list(self.permissions),
    }

    if expire_minutes:
        expire = datetime.utcnow() + timedelta(minutes=expire_minutes)
        claims["exp"] = expire

    token: str = jwt.encode(
        claims,
        GlobalConfiguration().jwt_secret_key,
        algorithm=self.JWT_ALGORITHM,
    )
    return token

JWTTokenType (StrEnum)

The type of JWT token.

Source code in zenml/models/user_models.py
class JWTTokenType(StrEnum):
    """The type of JWT token."""

    ACCESS_TOKEN = "access_token"

UserAuthModel (UserBaseModel, BaseResponseModel) pydantic-model

Authentication Model for the User.

This model is only used server-side. The server endpoints can use this model to authenticate the user credentials (Token, Password).

Source code in zenml/models/user_models.py
class UserAuthModel(UserBaseModel, BaseResponseModel):
    """Authentication Model for the User.

    This model is only used server-side. The server endpoints can use this model
    to authenticate the user credentials (Token, Password).
    """

    active: bool = Field(default=False, title="Active account.")

    activation_token: Optional[SecretStr] = Field(default=None, exclude=True)
    password: Optional[SecretStr] = Field(default=None, exclude=True)
    teams: Optional[List["TeamResponseModel"]] = Field(
        default=None, title="The list of teams for this user."
    )

    def generate_access_token(self, permissions: List[str]) -> str:
        """Generates an access token.

        Generates an access token and returns it.

        Args:
            permissions: Permissions to add to the token

        Returns:
            The generated access token.
        """
        return JWTToken(
            token_type=JWTTokenType.ACCESS_TOKEN,
            user_id=self.id,
            permissions=permissions,
        ).encode()

    @classmethod
    def _is_hashed_secret(cls, secret: SecretStr) -> bool:
        """Checks if a secret value is already hashed.

        Args:
            secret: The secret value to check.

        Returns:
            True if the secret value is hashed, otherwise False.
        """
        return (
            re.match(r"^\$2[ayb]\$.{56}$", secret.get_secret_value())
            is not None
        )

    @classmethod
    def _get_hashed_secret(cls, secret: Optional[SecretStr]) -> Optional[str]:
        """Hashes the input secret and returns the hash value.

        Only applied if supplied and if not already hashed.

        Args:
            secret: The secret value to hash.

        Returns:
            The secret hash value, or None if no secret was supplied.
        """
        if secret is None:
            return None
        if cls._is_hashed_secret(secret):
            return secret.get_secret_value()
        pwd_context = cls._get_crypt_context()
        return cast(str, pwd_context.hash(secret.get_secret_value()))

    def get_password(self) -> Optional[str]:
        """Get the password.

        Returns:
            The password as a plain string, if it exists.
        """
        if self.password is None:
            return None
        return self.password.get_secret_value()

    def get_hashed_password(self) -> Optional[str]:
        """Returns the hashed password, if configured.

        Returns:
            The hashed password.
        """
        return self._get_hashed_secret(self.password)

    def get_hashed_activation_token(self) -> Optional[str]:
        """Returns the hashed activation token, if configured.

        Returns:
            The hashed activation token.
        """
        return self._get_hashed_secret(self.activation_token)

    @classmethod
    def verify_password(
        cls, plain_password: str, user: Optional["UserAuthModel"] = None
    ) -> bool:
        """Verifies a given plain password against the stored password.

        Args:
            plain_password: Input password to be verified.
            user: User for which the password is to be verified.

        Returns:
            True if the passwords match.
        """
        # even when the user or password is not set, we still want to execute
        # the password hash verification to protect against response discrepancy
        # attacks (https://cwe.mitre.org/data/definitions/204.html)
        password_hash: Optional[str] = None
        if user is not None and user.password is not None:  # and user.active:
            password_hash = user.get_hashed_password()
        pwd_context = cls._get_crypt_context()
        return cast(bool, pwd_context.verify(plain_password, password_hash))

    @classmethod
    def verify_access_token(cls, token: str) -> Optional["UserAuthModel"]:
        """Verifies an access token.

        Verifies an access token and returns the user that was used to generate
        it if the token is valid and None otherwise.

        Args:
            token: The access token to verify.

        Returns:
            The user that generated the token if valid, None otherwise.
        """
        from zenml.zen_stores.sql_zen_store import SqlZenStore

        try:
            access_token = JWTToken.decode(
                token_type=JWTTokenType.ACCESS_TOKEN, token=token
            )
        except AuthorizationException:
            return None

        zen_store = GlobalConfiguration().zen_store
        if not isinstance(zen_store, SqlZenStore):
            return None
        try:
            user = zen_store.get_auth_user(
                user_name_or_id=access_token.user_id
            )
        except KeyError:
            return None
        else:
            if user.active:
                return user

        return None

    @classmethod
    def verify_activation_token(
        cls, activation_token: str, user: Optional["UserAuthModel"] = None
    ) -> bool:
        """Verifies a given activation token against the stored token.

        Args:
            activation_token: Input activation token to be verified.
            user: User for which the activation token is to be verified.

        Returns:
            True if the token is valid.
        """
        # even when the user or token is not set, we still want to execute the
        # token hash verification to protect against response discrepancy
        # attacks (https://cwe.mitre.org/data/definitions/204.html)
        token_hash: Optional[str] = None
        if (
            user is not None
            and user.activation_token is not None
            and not user.active
        ):
            token_hash = user.get_hashed_activation_token()
        pwd_context = cls._get_crypt_context()
        return cast(bool, pwd_context.verify(activation_token, token_hash))
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

generate_access_token(self, permissions)

Generates an access token.

Generates an access token and returns it.

Parameters:

Name Type Description Default
permissions List[str]

Permissions to add to the token

required

Returns:

Type Description
str

The generated access token.

Source code in zenml/models/user_models.py
def generate_access_token(self, permissions: List[str]) -> str:
    """Generates an access token.

    Generates an access token and returns it.

    Args:
        permissions: Permissions to add to the token

    Returns:
        The generated access token.
    """
    return JWTToken(
        token_type=JWTTokenType.ACCESS_TOKEN,
        user_id=self.id,
        permissions=permissions,
    ).encode()
get_hashed_activation_token(self)

Returns the hashed activation token, if configured.

Returns:

Type Description
Optional[str]

The hashed activation token.

Source code in zenml/models/user_models.py
def get_hashed_activation_token(self) -> Optional[str]:
    """Returns the hashed activation token, if configured.

    Returns:
        The hashed activation token.
    """
    return self._get_hashed_secret(self.activation_token)
get_hashed_password(self)

Returns the hashed password, if configured.

Returns:

Type Description
Optional[str]

The hashed password.

Source code in zenml/models/user_models.py
def get_hashed_password(self) -> Optional[str]:
    """Returns the hashed password, if configured.

    Returns:
        The hashed password.
    """
    return self._get_hashed_secret(self.password)
get_password(self)

Get the password.

Returns:

Type Description
Optional[str]

The password as a plain string, if it exists.

Source code in zenml/models/user_models.py
def get_password(self) -> Optional[str]:
    """Get the password.

    Returns:
        The password as a plain string, if it exists.
    """
    if self.password is None:
        return None
    return self.password.get_secret_value()
verify_access_token(token) classmethod

Verifies an access token.

Verifies an access token and returns the user that was used to generate it if the token is valid and None otherwise.

Parameters:

Name Type Description Default
token str

The access token to verify.

required

Returns:

Type Description
Optional[UserAuthModel]

The user that generated the token if valid, None otherwise.

Source code in zenml/models/user_models.py
@classmethod
def verify_access_token(cls, token: str) -> Optional["UserAuthModel"]:
    """Verifies an access token.

    Verifies an access token and returns the user that was used to generate
    it if the token is valid and None otherwise.

    Args:
        token: The access token to verify.

    Returns:
        The user that generated the token if valid, None otherwise.
    """
    from zenml.zen_stores.sql_zen_store import SqlZenStore

    try:
        access_token = JWTToken.decode(
            token_type=JWTTokenType.ACCESS_TOKEN, token=token
        )
    except AuthorizationException:
        return None

    zen_store = GlobalConfiguration().zen_store
    if not isinstance(zen_store, SqlZenStore):
        return None
    try:
        user = zen_store.get_auth_user(
            user_name_or_id=access_token.user_id
        )
    except KeyError:
        return None
    else:
        if user.active:
            return user

    return None
verify_activation_token(activation_token, user=None) classmethod

Verifies a given activation token against the stored token.

Parameters:

Name Type Description Default
activation_token str

Input activation token to be verified.

required
user Optional[UserAuthModel]

User for which the activation token is to be verified.

None

Returns:

Type Description
bool

True if the token is valid.

Source code in zenml/models/user_models.py
@classmethod
def verify_activation_token(
    cls, activation_token: str, user: Optional["UserAuthModel"] = None
) -> bool:
    """Verifies a given activation token against the stored token.

    Args:
        activation_token: Input activation token to be verified.
        user: User for which the activation token is to be verified.

    Returns:
        True if the token is valid.
    """
    # even when the user or token is not set, we still want to execute the
    # token hash verification to protect against response discrepancy
    # attacks (https://cwe.mitre.org/data/definitions/204.html)
    token_hash: Optional[str] = None
    if (
        user is not None
        and user.activation_token is not None
        and not user.active
    ):
        token_hash = user.get_hashed_activation_token()
    pwd_context = cls._get_crypt_context()
    return cast(bool, pwd_context.verify(activation_token, token_hash))
verify_password(plain_password, user=None) classmethod

Verifies a given plain password against the stored password.

Parameters:

Name Type Description Default
plain_password str

Input password to be verified.

required
user Optional[UserAuthModel]

User for which the password is to be verified.

None

Returns:

Type Description
bool

True if the passwords match.

Source code in zenml/models/user_models.py
@classmethod
def verify_password(
    cls, plain_password: str, user: Optional["UserAuthModel"] = None
) -> bool:
    """Verifies a given plain password against the stored password.

    Args:
        plain_password: Input password to be verified.
        user: User for which the password is to be verified.

    Returns:
        True if the passwords match.
    """
    # even when the user or password is not set, we still want to execute
    # the password hash verification to protect against response discrepancy
    # attacks (https://cwe.mitre.org/data/definitions/204.html)
    password_hash: Optional[str] = None
    if user is not None and user.password is not None:  # and user.active:
        password_hash = user.get_hashed_password()
    pwd_context = cls._get_crypt_context()
    return cast(bool, pwd_context.verify(plain_password, password_hash))

UserBaseModel (BaseModel) pydantic-model

Base model for users.

Source code in zenml/models/user_models.py
class UserBaseModel(BaseModel):
    """Base model for users."""

    name: str = Field(
        title="The unique username for the account.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    full_name: str = Field(
        default="",
        title="The full name for the account owner.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

    email_opted_in: Optional[bool] = Field(
        default=None,
        title="Whether the user agreed to share their email.",
        description="`null` if not answered, `true` if agreed, "
        "`false` if skipped.",
    )

    hub_token: Optional[str] = Field(
        default=None,
        title="JWT Token for the connected Hub account.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

    active: bool = Field(default=False, title="Active account.")

    @classmethod
    def _get_crypt_context(cls) -> "CryptContext":
        """Returns the password encryption context.

        Returns:
            The password encryption context.
        """
        from passlib.context import CryptContext

        return CryptContext(schemes=["bcrypt"], deprecated="auto")
email_opted_in: bool pydantic-field

null if not answered, true if agreed, false if skipped.

UserFilterModel (BaseFilterModel) pydantic-model

Model to enable advanced filtering of all Users.

Source code in zenml/models/user_models.py
class UserFilterModel(BaseFilterModel):
    """Model to enable advanced filtering of all Users."""

    name: Optional[str] = Field(
        default=None,
        description="Name of the user",
    )
    full_name: Optional[str] = Field(
        default=None,
        description="Full Name of the user",
    )
    email: Optional[str] = Field(
        default=None,
        description="Full Name of the user",
    )
    active: Optional[Union[bool, str]] = Field(
        default=None,
        description="Full Name of the user",
    )
    email_opted_in: Optional[Union[bool, str]] = Field(
        default=None,
        description="Full Name of the user",
    )
active: Union[bool, str] pydantic-field

Full Name of the user

email: str pydantic-field

Full Name of the user

email_opted_in: Union[bool, str] pydantic-field

Full Name of the user

full_name: str pydantic-field

Full Name of the user

name: str pydantic-field

Name of the user

UserRequestModel (UserBaseModel, BaseRequestModel) pydantic-model

Request model for users.

This model is used to create a user. The email field is optional but is more commonly set on the UpdateRequestModel which inherits from this model. Users can also optionally set their password during creation.

Source code in zenml/models/user_models.py
class UserRequestModel(UserBaseModel, BaseRequestModel):
    """Request model for users.

    This model is used to create a user. The email field is optional but is
    more commonly set on the UpdateRequestModel which inherits from this model.
    Users can also optionally set their password during creation.
    """

    ANALYTICS_FIELDS: ClassVar[List[str]] = [
        "name",
        "full_name",
        "active",
        "email_opted_in",
    ]

    email: Optional[str] = Field(
        default=None,
        title="The email address associated with the account.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

    password: Optional[str] = Field(
        default=None,
        title="A password for the user.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    activation_token: Optional[str] = Field(
        default=None, max_length=STR_FIELD_MAX_LENGTH
    )

    class Config:
        """Pydantic configuration class."""

        # Validate attributes when assigning them
        validate_assignment = True
        # Forbid extra attributes to prevent unexpected behavior
        extra = "forbid"
        underscore_attrs_are_private = True

    @classmethod
    def _create_hashed_secret(cls, secret: Optional[str]) -> Optional[str]:
        """Hashes the input secret and returns the hash value.

        Only applied if supplied and if not already hashed.

        Args:
            secret: The secret value to hash.

        Returns:
            The secret hash value, or None if no secret was supplied.
        """
        if secret is None:
            return None
        pwd_context = cls._get_crypt_context()
        return cast(str, pwd_context.hash(secret))

    def create_hashed_password(self) -> Optional[str]:
        """Hashes the password.

        Returns:
            The hashed password.
        """
        return self._create_hashed_secret(self.password)

    def create_hashed_activation_token(self) -> Optional[str]:
        """Hashes the activation token.

        Returns:
            The hashed activation token.
        """
        return self._create_hashed_secret(self.activation_token)

    def generate_activation_token(self) -> str:
        """Generates and stores a new activation token.

        Returns:
            The generated activation token.
        """
        self.activation_token = token_hex(32)
        return self.activation_token
Config

Pydantic configuration class.

Source code in zenml/models/user_models.py
class Config:
    """Pydantic configuration class."""

    # Validate attributes when assigning them
    validate_assignment = True
    # Forbid extra attributes to prevent unexpected behavior
    extra = "forbid"
    underscore_attrs_are_private = True
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

create_hashed_activation_token(self)

Hashes the activation token.

Returns:

Type Description
Optional[str]

The hashed activation token.

Source code in zenml/models/user_models.py
def create_hashed_activation_token(self) -> Optional[str]:
    """Hashes the activation token.

    Returns:
        The hashed activation token.
    """
    return self._create_hashed_secret(self.activation_token)
create_hashed_password(self)

Hashes the password.

Returns:

Type Description
Optional[str]

The hashed password.

Source code in zenml/models/user_models.py
def create_hashed_password(self) -> Optional[str]:
    """Hashes the password.

    Returns:
        The hashed password.
    """
    return self._create_hashed_secret(self.password)
generate_activation_token(self)

Generates and stores a new activation token.

Returns:

Type Description
str

The generated activation token.

Source code in zenml/models/user_models.py
def generate_activation_token(self) -> str:
    """Generates and stores a new activation token.

    Returns:
        The generated activation token.
    """
    self.activation_token = token_hex(32)
    return self.activation_token

UserResponseModel (UserBaseModel, BaseResponseModel) pydantic-model

Response model for users.

This returns the activation_token (which is required for the user-invitation-flow of the frontend. This also optionally includes the team the user is a part of. The email is returned optionally as well for use by the analytics on the client-side.

Source code in zenml/models/user_models.py
class UserResponseModel(UserBaseModel, BaseResponseModel):
    """Response model for users.

    This returns the activation_token (which is required for the
    user-invitation-flow of the frontend. This also optionally includes the
    team the user is a part of. The email is returned optionally as well
    for use by the analytics on the client-side.
    """

    ANALYTICS_FIELDS: ClassVar[List[str]] = [
        "name",
        "full_name",
        "active",
        "email_opted_in",
    ]

    activation_token: Optional[str] = Field(
        default=None, max_length=STR_FIELD_MAX_LENGTH
    )
    teams: Optional[List["TeamResponseModel"]] = Field(
        default=None, title="The list of teams for this user."
    )
    roles: Optional[List["RoleResponseModel"]] = Field(
        default=None, title="The list of roles for this user."
    )
    email: Optional[str] = Field(
        default="",
        title="The email address associated with the account.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

    def generate_access_token(self, permissions: List[str]) -> str:
        """Generates an access token.

        Generates an access token and returns it.

        Args:
            permissions: Permissions to add to the token

        Returns:
            The generated access token.
        """
        return JWTToken(
            token_type=JWTTokenType.ACCESS_TOKEN,
            user_id=self.id,
            permissions=permissions,
        ).encode()
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

generate_access_token(self, permissions)

Generates an access token.

Generates an access token and returns it.

Parameters:

Name Type Description Default
permissions List[str]

Permissions to add to the token

required

Returns:

Type Description
str

The generated access token.

Source code in zenml/models/user_models.py
def generate_access_token(self, permissions: List[str]) -> str:
    """Generates an access token.

    Generates an access token and returns it.

    Args:
        permissions: Permissions to add to the token

    Returns:
        The generated access token.
    """
    return JWTToken(
        token_type=JWTTokenType.ACCESS_TOKEN,
        user_id=self.id,
        permissions=permissions,
    ).encode()

UserUpdateModel (UserRequestModel) pydantic-model

Update model for users.

Source code in zenml/models/user_models.py
class UserUpdateModel(UserRequestModel):
    """Update model for users."""

    @root_validator
    def user_email_updates(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        """Validate that the UserUpdateModel conforms to the email-opt-in-flow.

        Args:
            values: The values to validate.

        Returns:
            The validated values.

        Raises:
            ValueError: If the email was not provided when the email_opted_in
                field was set to True.
        """
        # When someone sets the email, or updates the email and hasn't
        #  before explicitly opted out, they are opted in
        if values["email"] is not None:
            if values["email_opted_in"] is None:
                values["email_opted_in"] = True

        # It should not be possible to do opt in without an email
        if values["email_opted_in"] is True:
            if values["email"] is None:
                raise ValueError(
                    "Please provide an email, when you are opting-in with "
                    "your email."
                )
        return values
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

user_email_updates(values) classmethod

Validate that the UserUpdateModel conforms to the email-opt-in-flow.

Parameters:

Name Type Description Default
values Dict[str, Any]

The values to validate.

required

Returns:

Type Description
Dict[str, Any]

The validated values.

Exceptions:

Type Description
ValueError

If the email was not provided when the email_opted_in field was set to True.

Source code in zenml/models/user_models.py
@root_validator
def user_email_updates(cls, values: Dict[str, Any]) -> Dict[str, Any]:
    """Validate that the UserUpdateModel conforms to the email-opt-in-flow.

    Args:
        values: The values to validate.

    Returns:
        The validated values.

    Raises:
        ValueError: If the email was not provided when the email_opted_in
            field was set to True.
    """
    # When someone sets the email, or updates the email and hasn't
    #  before explicitly opted out, they are opted in
    if values["email"] is not None:
        if values["email_opted_in"] is None:
            values["email_opted_in"] = True

    # It should not be possible to do opt in without an email
    if values["email_opted_in"] is True:
        if values["email"] is None:
            raise ValueError(
                "Please provide an email, when you are opting-in with "
                "your email."
            )
    return values

user_role_assignment_models

Models representing role assignments.

UserRoleAssignmentBaseModel (BaseModel) pydantic-model

Base model for role assignments.

Source code in zenml/models/user_role_assignment_models.py
class UserRoleAssignmentBaseModel(BaseModel):
    """Base model for role assignments."""

UserRoleAssignmentFilterModel (BaseFilterModel) pydantic-model

Model to enable advanced filtering of all Role Assignments.

Source code in zenml/models/user_role_assignment_models.py
class UserRoleAssignmentFilterModel(BaseFilterModel):
    """Model to enable advanced filtering of all Role Assignments."""

    workspace_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Workspace of the RoleAssignment"
    )
    user_id: Optional[Union[UUID, str]] = Field(
        default=None, description="User in the RoleAssignment"
    )
    role_id: Optional[Union[UUID, str]] = Field(
        default=None, description="Role in the RoleAssignment"
    )
role_id: Union[uuid.UUID, str] pydantic-field

Role in the RoleAssignment

user_id: Union[uuid.UUID, str] pydantic-field

User in the RoleAssignment

workspace_id: Union[uuid.UUID, str] pydantic-field

Workspace of the RoleAssignment

UserRoleAssignmentRequestModel (UserRoleAssignmentBaseModel, BaseRequestModel) pydantic-model

Request model for role assignments using UUIDs for all entities.

Source code in zenml/models/user_role_assignment_models.py
class UserRoleAssignmentRequestModel(
    UserRoleAssignmentBaseModel, BaseRequestModel
):
    """Request model for role assignments using UUIDs for all entities."""

    workspace: Optional[UUID] = Field(
        default=None,
        title="The workspace that the role is limited to.",
    )
    user: UUID = Field(title="The user that the role is assigned to.")

    role: UUID = Field(title="The role.")
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

UserRoleAssignmentResponseModel (UserRoleAssignmentBaseModel, BaseResponseModel) pydantic-model

Response model for role assignments with all entities hydrated.

Source code in zenml/models/user_role_assignment_models.py
class UserRoleAssignmentResponseModel(
    UserRoleAssignmentBaseModel, BaseResponseModel
):
    """Response model for role assignments with all entities hydrated."""

    workspace: Optional["WorkspaceResponseModel"] = Field(
        title="The workspace scope of this role assignment.", default=None
    )
    user: Optional["UserResponseModel"] = Field(
        title="The user the role is assigned to.", default=None
    )
    role: Optional["RoleResponseModel"] = Field(
        title="The assigned role.", default=None
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

visualization_models

Models representing visualizations.

BaseVisualizationModel (BaseModel) pydantic-model

Base model for visualizations.

Source code in zenml/models/visualization_models.py
class BaseVisualizationModel(BaseModel):
    """Base model for visualizations."""

    type: VisualizationType

LoadedVisualizationModel (BaseVisualizationModel) pydantic-model

Model for loaded visualization.

Source code in zenml/models/visualization_models.py
class LoadedVisualizationModel(BaseVisualizationModel):
    """Model for loaded visualization."""

    value: Union[str, bytes]

VisualizationModel (BaseVisualizationModel) pydantic-model

Model for unloaded visualization.

Source code in zenml/models/visualization_models.py
class VisualizationModel(BaseVisualizationModel):
    """Model for unloaded visualization."""

    uri: str

workspace_models

Models representing workspaces.

WorkspaceBaseModel (BaseModel) pydantic-model

Base model for workspaces.

Source code in zenml/models/workspace_models.py
class WorkspaceBaseModel(BaseModel):
    """Base model for workspaces."""

    name: str = Field(
        title="The unique name of the workspace.",
        max_length=STR_FIELD_MAX_LENGTH,
    )
    description: str = Field(
        default="",
        title="The description of the workspace.",
        max_length=STR_FIELD_MAX_LENGTH,
    )

WorkspaceFilterModel (BaseFilterModel) pydantic-model

Model to enable advanced filtering of all Workspaces.

Source code in zenml/models/workspace_models.py
class WorkspaceFilterModel(BaseFilterModel):
    """Model to enable advanced filtering of all Workspaces."""

    name: Optional[str] = Field(
        default=None,
        description="Name of the workspace",
    )
name: str pydantic-field

Name of the workspace

WorkspaceRequestModel (WorkspaceBaseModel, BaseRequestModel) pydantic-model

Request model for workspaces.

Source code in zenml/models/workspace_models.py
class WorkspaceRequestModel(WorkspaceBaseModel, BaseRequestModel):
    """Request model for workspaces."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

WorkspaceResponseModel (WorkspaceBaseModel, BaseResponseModel) pydantic-model

Response model for workspaces.

Source code in zenml/models/workspace_models.py
class WorkspaceResponseModel(WorkspaceBaseModel, BaseResponseModel):
    """Response model for workspaces."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

WorkspaceUpdateModel (WorkspaceRequestModel) pydantic-model

Update model for workspaces.

Source code in zenml/models/workspace_models.py
class WorkspaceUpdateModel(WorkspaceRequestModel):
    """Update model for workspaces."""
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.