Skip to content

Post Execution

zenml.post_execution special

Initialization for the post-execution module.

After executing a pipeline, the user needs to be able to fetch it from history and perform certain tasks. The post_execution submodule provides a set of interfaces with which the user can interact with artifacts, the pipeline, steps, and the post-run pipeline object.

artifact

Initialization for the post-execution artifact class.

ArtifactView (BaseView)

Post-execution artifact class.

This can be used to read artifact data that was created during a pipeline execution.

Source code in zenml/post_execution/artifact.py
class ArtifactView(BaseView):
    """Post-execution artifact class.

    This can be used to read artifact data that was created during a pipeline
    execution.
    """

    MODEL_CLASS: Type[BaseResponseModel] = ArtifactResponseModel
    REPR_KEYS = ["id", "name", "uri"]

    @property
    def model(self) -> ArtifactResponseModel:
        """Returns the underlying `ArtifactResponseModel`.

        Returns:
            The underlying `ArtifactResponseModel`.
        """
        return cast(ArtifactResponseModel, self._model)

    def read(self) -> Any:
        """Materializes (loads) the data stored in this artifact.

        Returns:
            The materialized data.
        """
        from zenml.utils.artifact_utils import load_artifact

        return load_artifact(self.model)

    def visualize(self, title: Optional[str] = None) -> None:
        """Visualize the artifact in notebook environments.

        Args:
            title: Optional title to show before the visualizations.

        Raises:
            RuntimeError: If not in a notebook environment.
        """
        from IPython.core.display import HTML, Image, Markdown, display

        from zenml.environment import Environment
        from zenml.utils.artifact_utils import load_artifact_visualization

        if not Environment.in_notebook() and not Environment.in_google_colab():
            raise RuntimeError(
                "The `output.visualize()` method is only available in Jupyter "
                "notebooks. In all other runtime environments, please open "
                "your ZenML dashboard using `zenml up` and view the "
                "visualizations by clicking on the respective artifacts in the "
                "pipeline run DAG instead."
            )

        if not self.model.visualizations:
            return

        if title:
            display(Markdown(f"### {title}"))
        for i in range(len(self.model.visualizations)):
            visualization = load_artifact_visualization(self.model, index=i)
            if visualization.type == VisualizationType.IMAGE:
                display(Image(visualization.value))
            elif visualization.type == VisualizationType.HTML:
                display(HTML(visualization.value))
            elif visualization.type == VisualizationType.MARKDOWN:
                display(Markdown(visualization.value))
            elif visualization.type == VisualizationType.CSV:
                assert isinstance(visualization.value, str)
                table = format_csv_visualization_as_html(visualization.value)
                display(HTML(table))
            else:
                display(visualization.value)
model: ArtifactResponseModel property readonly

Returns the underlying ArtifactResponseModel.

Returns:

Type Description
ArtifactResponseModel

The underlying ArtifactResponseModel.

MODEL_CLASS (ArtifactBaseModel, WorkspaceScopedResponseModel) pydantic-model

Response model for artifacts.

Source code in zenml/post_execution/artifact.py
class ArtifactResponseModel(ArtifactBaseModel, WorkspaceScopedResponseModel):
    """Response model for artifacts."""

    producer_step_run_id: Optional[UUID]
    metadata: Dict[str, "RunMetadataResponseModel"] = Field(
        default={}, title="Metadata of the artifact."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

read(self)

Materializes (loads) the data stored in this artifact.

Returns:

Type Description
Any

The materialized data.

Source code in zenml/post_execution/artifact.py
def read(self) -> Any:
    """Materializes (loads) the data stored in this artifact.

    Returns:
        The materialized data.
    """
    from zenml.utils.artifact_utils import load_artifact

    return load_artifact(self.model)
visualize(self, title=None)

Visualize the artifact in notebook environments.

Parameters:

Name Type Description Default
title Optional[str]

Optional title to show before the visualizations.

None

Exceptions:

Type Description
RuntimeError

If not in a notebook environment.

Source code in zenml/post_execution/artifact.py
def visualize(self, title: Optional[str] = None) -> None:
    """Visualize the artifact in notebook environments.

    Args:
        title: Optional title to show before the visualizations.

    Raises:
        RuntimeError: If not in a notebook environment.
    """
    from IPython.core.display import HTML, Image, Markdown, display

    from zenml.environment import Environment
    from zenml.utils.artifact_utils import load_artifact_visualization

    if not Environment.in_notebook() and not Environment.in_google_colab():
        raise RuntimeError(
            "The `output.visualize()` method is only available in Jupyter "
            "notebooks. In all other runtime environments, please open "
            "your ZenML dashboard using `zenml up` and view the "
            "visualizations by clicking on the respective artifacts in the "
            "pipeline run DAG instead."
        )

    if not self.model.visualizations:
        return

    if title:
        display(Markdown(f"### {title}"))
    for i in range(len(self.model.visualizations)):
        visualization = load_artifact_visualization(self.model, index=i)
        if visualization.type == VisualizationType.IMAGE:
            display(Image(visualization.value))
        elif visualization.type == VisualizationType.HTML:
            display(HTML(visualization.value))
        elif visualization.type == VisualizationType.MARKDOWN:
            display(Markdown(visualization.value))
        elif visualization.type == VisualizationType.CSV:
            assert isinstance(visualization.value, str)
            table = format_csv_visualization_as_html(visualization.value)
            display(HTML(table))
        else:
            display(visualization.value)

base_view

Base classes for post-execution views.

BaseView (ABC)

Base class for post-execution views.

Subclasses should override the following attributes/methods: - MODEL_CLASS should be set to the model class the subclass is wrapping. - The model property should return self._model cast to the correct type. - (Optionally) REPR_KEYS can be set to include a list of custom attributes in the __repr__ method.

Source code in zenml/post_execution/base_view.py
class BaseView(ABC):
    """Base class for post-execution views.

    Subclasses should override the following attributes/methods:
    - `MODEL_CLASS` should be set to the model class the subclass is wrapping.
    - The `model` property should return `self._model` cast to the correct type.
    - (Optionally) `REPR_KEYS` can be set to include a list of custom attributes
        in the `__repr__` method.
    """

    MODEL_CLASS = BaseResponseModel  # The model class to wrap.
    REPR_KEYS = ["id"]  # Keys to include in the `__repr__` method.

    def __init__(self, model: BaseResponseModel):
        """Initializes a view for the given model.

        Args:
            model: The model to create a view for.

        Raises:
            TypeError: If the model is not of the correct type.
            ValueError: If any of the `REPR_KEYS` are not valid.
        """
        # Check that the model is of the correct type.
        if not isinstance(model, self.MODEL_CLASS):
            raise TypeError(
                f"Model of {self.__class__.__name__} must be of type "
                f"{self.MODEL_CLASS.__name__} but is {type(model).__name__}."
            )

        # Validate that all `REPR_KEYS` are valid.
        for key in self.REPR_KEYS:
            if (
                key not in model.__fields__
                and key not in self._custom_view_properties
            ):
                raise ValueError(
                    f"Key {key} in {self.__class__.__name__}.REPR_KEYS is "
                    f"neither a field of {self.MODEL_CLASS.__name__} nor a "
                    f"custom property of {self.__class__.__name__}."
                )

        self._model = model

    @property
    def _custom_view_properties(self) -> List[str]:
        """Returns a list of custom view properties.

        Returns:
            A list of custom view properties.
        """
        return [attr for attr in dir(self) if not attr.startswith("__")]

    @property
    @abstractmethod
    def model(self) -> BaseResponseModel:
        """Returns the underlying model.

        Subclasses should override this property to return `self._model` with
        the correct model type.

        E.g. `return cast(ArtifactResponseModel, self._model)`

        Returns:
            The underlying model.
        """

    def __getattribute__(self, __name: str) -> Any:
        """Returns the attribute with the given name.

        This method is overridden so we can access the model fields as if they
        were attributes of this class.

        Args:
            __name: The name of the attribute to return.

        Returns:
            The attribute with the given name.
        """
        # Handle special cases that are required by the `dir` call below
        if __name in {"__dict__", "__class__"}:
            return super().__getattribute__(__name)

        # Check the custom view properties first in case of overwrites
        if __name in {attr for attr in dir(self) if not attr.startswith("__")}:
            return super().__getattribute__(__name)

        # Then check if the attribute is a field in the model
        if __name in self._model.__fields__:
            return getattr(self._model, __name)

        # Otherwise, fall back to the default behavior
        return super().__getattribute__(__name)

    def __repr__(self) -> str:
        """Returns a string representation of this artifact.

        The string representation is of the form
        `__qualname__(<key1>=<value1>, <key2>=<value2>, ...)` where the keys
        are the ones specified in `REPR_KEYS`.

        Returns:
            A string representation of this artifact.
        """
        repr = self.__class__.__qualname__
        repr_key_strs = [
            f"{key}={getattr(self, key)}" for key in self.REPR_KEYS
        ]
        details = ", ".join(repr_key_strs)
        if details:
            repr += f"({details})"
        return repr

    def __eq__(self, other: Any) -> bool:
        """Returns whether the other object is referring to the same model.

        Args:
            other: The other object to compare to.

        Returns:
            True if the other object is referring to the same model, else False.
        """
        if not isinstance(other, self.__class__):
            return False
        return self._model == other._model  # Use the model's `__eq__` method.
model: BaseResponseModel property readonly

Returns the underlying model.

Subclasses should override this property to return self._model with the correct model type.

E.g. return cast(ArtifactResponseModel, self._model)

Returns:

Type Description
BaseResponseModel

The underlying model.

MODEL_CLASS (BaseZenModel) pydantic-model

Base domain model.

Used as a base class for all domain models that have the following common characteristics:

  • are uniquely identified by a UUID
  • have a creation timestamp and a last modified timestamp
Source code in zenml/post_execution/base_view.py
class BaseResponseModel(BaseZenModel):
    """Base domain model.

    Used as a base class for all domain models that have the following common
    characteristics:

      * are uniquely identified by a UUID
      * have a creation timestamp and a last modified timestamp
    """

    id: UUID = Field(title="The unique resource id.")

    created: datetime = Field(title="Time when this resource was created.")
    updated: datetime = Field(
        title="Time when this resource was last updated."
    )

    def __hash__(self) -> int:
        """Implementation of hash magic method.

        Returns:
            Hash of the UUID.
        """
        return hash((type(self),) + tuple([self.id]))

    def __eq__(self, other: Any) -> bool:
        """Implementation of equality magic method.

        Args:
            other: The other object to compare to.

        Returns:
            True if the other object is of the same type and has the same UUID.
        """
        if isinstance(other, BaseResponseModel):
            return self.id == other.id
        else:
            return False

    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Fetches the analytics metadata for base response models.

        Returns:
            The analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        metadata["entity_id"] = self.id
        return metadata
__eq__(self, other) special

Implementation of equality magic method.

Parameters:

Name Type Description Default
other Any

The other object to compare to.

required

Returns:

Type Description
bool

True if the other object is of the same type and has the same UUID.

Source code in zenml/post_execution/base_view.py
def __eq__(self, other: Any) -> bool:
    """Implementation of equality magic method.

    Args:
        other: The other object to compare to.

    Returns:
        True if the other object is of the same type and has the same UUID.
    """
    if isinstance(other, BaseResponseModel):
        return self.id == other.id
    else:
        return False
__hash__(self) special

Implementation of hash magic method.

Returns:

Type Description
int

Hash of the UUID.

Source code in zenml/post_execution/base_view.py
def __hash__(self) -> int:
    """Implementation of hash magic method.

    Returns:
        Hash of the UUID.
    """
    return hash((type(self),) + tuple([self.id]))
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_analytics_metadata(self)

Fetches the analytics metadata for base response models.

Returns:

Type Description
Dict[str, Any]

The analytics metadata.

Source code in zenml/post_execution/base_view.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Fetches the analytics metadata for base response models.

    Returns:
        The analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    metadata["entity_id"] = self.id
    return metadata
__eq__(self, other) special

Returns whether the other object is referring to the same model.

Parameters:

Name Type Description Default
other Any

The other object to compare to.

required

Returns:

Type Description
bool

True if the other object is referring to the same model, else False.

Source code in zenml/post_execution/base_view.py
def __eq__(self, other: Any) -> bool:
    """Returns whether the other object is referring to the same model.

    Args:
        other: The other object to compare to.

    Returns:
        True if the other object is referring to the same model, else False.
    """
    if not isinstance(other, self.__class__):
        return False
    return self._model == other._model  # Use the model's `__eq__` method.
__getattribute__(self, _BaseView__name) special

Returns the attribute with the given name.

This method is overridden so we can access the model fields as if they were attributes of this class.

Parameters:

Name Type Description Default
__name

The name of the attribute to return.

required

Returns:

Type Description
Any

The attribute with the given name.

Source code in zenml/post_execution/base_view.py
def __getattribute__(self, __name: str) -> Any:
    """Returns the attribute with the given name.

    This method is overridden so we can access the model fields as if they
    were attributes of this class.

    Args:
        __name: The name of the attribute to return.

    Returns:
        The attribute with the given name.
    """
    # Handle special cases that are required by the `dir` call below
    if __name in {"__dict__", "__class__"}:
        return super().__getattribute__(__name)

    # Check the custom view properties first in case of overwrites
    if __name in {attr for attr in dir(self) if not attr.startswith("__")}:
        return super().__getattribute__(__name)

    # Then check if the attribute is a field in the model
    if __name in self._model.__fields__:
        return getattr(self._model, __name)

    # Otherwise, fall back to the default behavior
    return super().__getattribute__(__name)
__init__(self, model) special

Initializes a view for the given model.

Parameters:

Name Type Description Default
model BaseResponseModel

The model to create a view for.

required

Exceptions:

Type Description
TypeError

If the model is not of the correct type.

ValueError

If any of the REPR_KEYS are not valid.

Source code in zenml/post_execution/base_view.py
def __init__(self, model: BaseResponseModel):
    """Initializes a view for the given model.

    Args:
        model: The model to create a view for.

    Raises:
        TypeError: If the model is not of the correct type.
        ValueError: If any of the `REPR_KEYS` are not valid.
    """
    # Check that the model is of the correct type.
    if not isinstance(model, self.MODEL_CLASS):
        raise TypeError(
            f"Model of {self.__class__.__name__} must be of type "
            f"{self.MODEL_CLASS.__name__} but is {type(model).__name__}."
        )

    # Validate that all `REPR_KEYS` are valid.
    for key in self.REPR_KEYS:
        if (
            key not in model.__fields__
            and key not in self._custom_view_properties
        ):
            raise ValueError(
                f"Key {key} in {self.__class__.__name__}.REPR_KEYS is "
                f"neither a field of {self.MODEL_CLASS.__name__} nor a "
                f"custom property of {self.__class__.__name__}."
            )

    self._model = model
__repr__(self) special

Returns a string representation of this artifact.

The string representation is of the form __qualname__(<key1>=<value1>, <key2>=<value2>, ...) where the keys are the ones specified in REPR_KEYS.

Returns:

Type Description
str

A string representation of this artifact.

Source code in zenml/post_execution/base_view.py
def __repr__(self) -> str:
    """Returns a string representation of this artifact.

    The string representation is of the form
    `__qualname__(<key1>=<value1>, <key2>=<value2>, ...)` where the keys
    are the ones specified in `REPR_KEYS`.

    Returns:
        A string representation of this artifact.
    """
    repr = self.__class__.__qualname__
    repr_key_strs = [
        f"{key}={getattr(self, key)}" for key in self.REPR_KEYS
    ]
    details = ", ".join(repr_key_strs)
    if details:
        repr += f"({details})"
    return repr

lineage special

Initialization of lineage generation module.

edge

Class for Edges in a lineage graph.

Edge (BaseModel) pydantic-model

A class that represents an edge in a lineage graph.

Source code in zenml/post_execution/lineage/edge.py
class Edge(BaseModel):
    """A class that represents an edge in a lineage graph."""

    id: str
    source: str
    target: str

lineage_graph

Class for lineage graph generation.

LineageGraph (BaseModel) pydantic-model

A lineage graph representation of a PipelineRunView.

Source code in zenml/post_execution/lineage/lineage_graph.py
class LineageGraph(BaseModel):
    """A lineage graph representation of a PipelineRunView."""

    nodes: List[Union[StepNode, ArtifactNode]] = []
    edges: List[Edge] = []
    root_step_id: Optional[str] = None
    run_metadata: List[Tuple[str, str, str]] = []

    def generate_step_nodes_and_edges(self, step: StepView) -> None:
        """Generates the step nodes and the edges between them.

        Args:
            step: The step to generate the nodes and edges for.
        """
        step_id = STEP_PREFIX + str(step.id)
        if self.root_step_id is None:
            self.root_step_id = step_id
        step_config = step.step_configuration.dict()
        if step_config:
            step_config = {
                key: value
                for key, value in step_config.items()
                if key not in ["inputs", "outputs", "parameters"] and value
            }
        self.nodes.append(
            StepNode(
                id=step_id,
                data=StepNodeDetails(
                    execution_id=str(step.id),
                    name=step.name,  # redundant for consistency
                    status=step.status,
                    entrypoint_name=step.entrypoint_name,  # redundant for consistency
                    parameters=step.parameters,
                    configuration=step_config,
                    inputs={k: v.uri for k, v in step.inputs.items()},
                    outputs={k: v.uri for k, v in step.outputs.items()},
                    metadata=[
                        (m.key, str(m.value), str(m.type))
                        for m in step.metadata.values()
                    ],
                ),
            )
        )

        for artifact_name, artifact in step.outputs.items():
            artifact_id = ARTIFACT_PREFIX + str(artifact.id)
            self.nodes.append(
                ArtifactNode(
                    id=artifact_id,
                    data=ArtifactNodeDetails(
                        execution_id=str(artifact.id),
                        name=artifact_name,
                        status=step.status,
                        is_cached=step.status == ExecutionStatus.CACHED,
                        artifact_type=artifact.type,
                        artifact_data_type=artifact.data_type.import_path,
                        parent_step_id=str(step.id),
                        producer_step_id=str(artifact.producer_step_run_id),
                        uri=artifact.uri,
                        metadata=[
                            (m.key, str(m.value), str(m.type))
                            for m in artifact.metadata.values()
                        ],
                    ),
                )
            )
            self.edges.append(
                Edge(
                    id=step_id + "_" + artifact_id,
                    source=step_id,
                    target=artifact_id,
                )
            )

        for artifact_name, artifact in step.inputs.items():
            artifact_id = ARTIFACT_PREFIX + str(artifact.id)
            self.edges.append(
                Edge(
                    id=step_id + "_" + artifact_id,
                    source=artifact_id,
                    target=step_id,
                )
            )

    def generate_run_nodes_and_edges(self, run: PipelineRunView) -> None:
        """Generates the run nodes and the edges between them.

        Args:
            run: The PipelineRunView to generate the lineage graph for.
        """
        self.run_metadata = [
            (m.key, str(m.value), str(m.type)) for m in run.metadata.values()
        ]
        for step in run.steps:
            self.generate_step_nodes_and_edges(step)
generate_run_nodes_and_edges(self, run)

Generates the run nodes and the edges between them.

Parameters:

Name Type Description Default
run PipelineRunView

The PipelineRunView to generate the lineage graph for.

required
Source code in zenml/post_execution/lineage/lineage_graph.py
def generate_run_nodes_and_edges(self, run: PipelineRunView) -> None:
    """Generates the run nodes and the edges between them.

    Args:
        run: The PipelineRunView to generate the lineage graph for.
    """
    self.run_metadata = [
        (m.key, str(m.value), str(m.type)) for m in run.metadata.values()
    ]
    for step in run.steps:
        self.generate_step_nodes_and_edges(step)
generate_step_nodes_and_edges(self, step)

Generates the step nodes and the edges between them.

Parameters:

Name Type Description Default
step StepView

The step to generate the nodes and edges for.

required
Source code in zenml/post_execution/lineage/lineage_graph.py
def generate_step_nodes_and_edges(self, step: StepView) -> None:
    """Generates the step nodes and the edges between them.

    Args:
        step: The step to generate the nodes and edges for.
    """
    step_id = STEP_PREFIX + str(step.id)
    if self.root_step_id is None:
        self.root_step_id = step_id
    step_config = step.step_configuration.dict()
    if step_config:
        step_config = {
            key: value
            for key, value in step_config.items()
            if key not in ["inputs", "outputs", "parameters"] and value
        }
    self.nodes.append(
        StepNode(
            id=step_id,
            data=StepNodeDetails(
                execution_id=str(step.id),
                name=step.name,  # redundant for consistency
                status=step.status,
                entrypoint_name=step.entrypoint_name,  # redundant for consistency
                parameters=step.parameters,
                configuration=step_config,
                inputs={k: v.uri for k, v in step.inputs.items()},
                outputs={k: v.uri for k, v in step.outputs.items()},
                metadata=[
                    (m.key, str(m.value), str(m.type))
                    for m in step.metadata.values()
                ],
            ),
        )
    )

    for artifact_name, artifact in step.outputs.items():
        artifact_id = ARTIFACT_PREFIX + str(artifact.id)
        self.nodes.append(
            ArtifactNode(
                id=artifact_id,
                data=ArtifactNodeDetails(
                    execution_id=str(artifact.id),
                    name=artifact_name,
                    status=step.status,
                    is_cached=step.status == ExecutionStatus.CACHED,
                    artifact_type=artifact.type,
                    artifact_data_type=artifact.data_type.import_path,
                    parent_step_id=str(step.id),
                    producer_step_id=str(artifact.producer_step_run_id),
                    uri=artifact.uri,
                    metadata=[
                        (m.key, str(m.value), str(m.type))
                        for m in artifact.metadata.values()
                    ],
                ),
            )
        )
        self.edges.append(
            Edge(
                id=step_id + "_" + artifact_id,
                source=step_id,
                target=artifact_id,
            )
        )

    for artifact_name, artifact in step.inputs.items():
        artifact_id = ARTIFACT_PREFIX + str(artifact.id)
        self.edges.append(
            Edge(
                id=step_id + "_" + artifact_id,
                source=artifact_id,
                target=step_id,
            )
        )

node special

Initialization of lineage nodes.

artifact_node

Class for all lineage artifact nodes.

ArtifactNode (BaseNode) pydantic-model

A class that represents an artifact node in a lineage graph.

Source code in zenml/post_execution/lineage/node/artifact_node.py
class ArtifactNode(BaseNode):
    """A class that represents an artifact node in a lineage graph."""

    type: str = "artifact"
    data: ArtifactNodeDetails
ArtifactNodeDetails (BaseNodeDetails) pydantic-model

Captures all artifact details for the node.

Source code in zenml/post_execution/lineage/node/artifact_node.py
class ArtifactNodeDetails(BaseNodeDetails):
    """Captures all artifact details for the node."""

    is_cached: bool
    artifact_type: str
    artifact_data_type: str
    parent_step_id: str
    producer_step_id: Optional[str]
    uri: str
    metadata: List[Tuple[str, str, str]]  # (key, value, type)
base_node

Base class for all lineage nodes.

BaseNode (BaseModel) pydantic-model

A class that represents a node in a lineage graph.

Source code in zenml/post_execution/lineage/node/base_node.py
class BaseNode(BaseModel):
    """A class that represents a node in a lineage graph."""

    id: str
    type: str
    data: BaseNodeDetails
BaseNodeDetails (BaseModel) pydantic-model

Captures all details for the node.

Source code in zenml/post_execution/lineage/node/base_node.py
class BaseNodeDetails(BaseModel):
    """Captures all details for the node."""

    execution_id: str
    name: str
    status: ExecutionStatus
step_node

Class for all lineage step nodes.

StepNode (BaseNode) pydantic-model

A class that represents a step node in a lineage graph.

Source code in zenml/post_execution/lineage/node/step_node.py
class StepNode(BaseNode):
    """A class that represents a step node in a lineage graph."""

    type: str = "step"
    data: StepNodeDetails
StepNodeDetails (BaseNodeDetails) pydantic-model

Captures all artifact details for the node.

Source code in zenml/post_execution/lineage/node/step_node.py
class StepNodeDetails(BaseNodeDetails):
    """Captures all artifact details for the node."""

    entrypoint_name: str
    parameters: Dict[str, Any]
    configuration: Dict[str, Any]
    inputs: Dict[str, Any]
    outputs: Dict[str, Any]
    metadata: List[Tuple[str, str, str]]  # (key, value, type)

pipeline

Implementation of the post-execution pipeline.

PipelineVersionView (BaseView)

Post-execution class for a specific version/instance of a pipeline.

Source code in zenml/post_execution/pipeline.py
class PipelineVersionView(BaseView):
    """Post-execution class for a specific version/instance of a pipeline."""

    MODEL_CLASS: Type[BaseResponseModel] = PipelineResponseModel
    REPR_KEYS = ["id", "name", "version"]

    @property
    def model(self) -> PipelineResponseModel:
        """Returns the underlying `PipelineResponseModel`.

        Returns:
            The underlying `PipelineResponseModel`.
        """
        return cast(PipelineResponseModel, self._model)

    @property
    def num_runs(self) -> int:
        """Returns the number of runs of this pipeline.

        Returns:
            The number of runs of this pipeline.
        """
        active_workspace_id = Client().active_workspace.id
        return (
            Client()
            .zen_store.list_runs(
                PipelineRunFilterModel(
                    workspace_id=active_workspace_id,
                    pipeline_id=self._model.id,
                )
            )
            .total
        )

    @property
    def runs(self) -> List["PipelineRunView"]:
        """Returns the last 50 stored runs of this pipeline.

        The runs are returned in reverse chronological order, so the latest
        run will be the first element in this list.

        Returns:
            A list of all stored runs of this pipeline.
        """
        # Do not cache runs as new runs might appear during this objects
        # lifecycle
        active_workspace_id = Client().active_workspace.id
        runs = Client().list_runs(
            workspace_id=active_workspace_id,
            pipeline_id=self.model.id,
            size=50,
            sort_by="desc:created",
        )

        return [PipelineRunView(run) for run in runs.items]
model: PipelineResponseModel property readonly

Returns the underlying PipelineResponseModel.

Returns:

Type Description
PipelineResponseModel

The underlying PipelineResponseModel.

num_runs: int property readonly

Returns the number of runs of this pipeline.

Returns:

Type Description
int

The number of runs of this pipeline.

runs: List[PipelineRunView] property readonly

Returns the last 50 stored runs of this pipeline.

The runs are returned in reverse chronological order, so the latest run will be the first element in this list.

Returns:

Type Description
List[PipelineRunView]

A list of all stored runs of this pipeline.

MODEL_CLASS (PipelineBaseModel, WorkspaceScopedResponseModel) pydantic-model

Pipeline response model user, workspace, runs, and status hydrated.

Source code in zenml/post_execution/pipeline.py
class PipelineResponseModel(PipelineBaseModel, WorkspaceScopedResponseModel):
    """Pipeline response model user, workspace, runs, and status hydrated."""

    runs: Optional[List["PipelineRunResponseModel"]] = Field(
        default=None, title="A list of the last x Pipeline Runs."
    )
    status: Optional[List[ExecutionStatus]] = Field(
        default=None, title="The status of the last x Pipeline Runs."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

PipelineView

Post-execution class for a pipeline name/class.

Source code in zenml/post_execution/pipeline.py
class PipelineView:
    """Post-execution class for a pipeline name/class."""

    def __init__(self, name: str):
        """Initializes a post-execution class for a pipeline name/class.

        Args:
            name: The name of the pipeline.
        """
        self.name = name

    @property
    def versions(self) -> List["PipelineVersionView"]:
        """Returns all versions/instances of this pipeline name/class.

        Returns:
            A list of all versions of this pipeline.
        """
        client = Client()

        pipelines = depaginate(
            partial(
                client.list_pipelines,
                workspace_id=client.active_workspace.id,
                name=self.name,
                sort_by="desc:created",
            )
        )
        return [PipelineVersionView(model) for model in pipelines]

    @property
    def num_runs(self) -> int:
        """Returns the number of runs of this pipeline name/class.

        This is the sum of all runs of all versions of this pipeline.

        Returns:
            The number of runs of this pipeline name/class.
        """
        return sum(version.num_runs for version in self.versions)

    @property
    def runs(self) -> List["PipelineRunView"]:
        """Returns the last 50 stored runs of this pipeline name/class.

        The runs are returned in reverse chronological order, so the latest
        run will be the first element in this list.

        Returns:
            A list of all stored runs of this pipeline name/class.
        """
        all_runs = [run for version in self.versions for run in version.runs]
        sorted_runs = sorted(
            all_runs, key=lambda x: x.model.created, reverse=True
        )
        return sorted_runs[:50]

    def __eq__(self, other: Any) -> bool:
        """Compares this pipeline class view to another object.

        Args:
            other: The other object to compare to.

        Returns:
            Whether the other object is a pipeline class view with same name.
        """
        if not isinstance(other, PipelineView):
            return False
        return self.name == other.name
num_runs: int property readonly

Returns the number of runs of this pipeline name/class.

This is the sum of all runs of all versions of this pipeline.

Returns:

Type Description
int

The number of runs of this pipeline name/class.

runs: List[PipelineRunView] property readonly

Returns the last 50 stored runs of this pipeline name/class.

The runs are returned in reverse chronological order, so the latest run will be the first element in this list.

Returns:

Type Description
List[PipelineRunView]

A list of all stored runs of this pipeline name/class.

versions: List[PipelineVersionView] property readonly

Returns all versions/instances of this pipeline name/class.

Returns:

Type Description
List[PipelineVersionView]

A list of all versions of this pipeline.

__eq__(self, other) special

Compares this pipeline class view to another object.

Parameters:

Name Type Description Default
other Any

The other object to compare to.

required

Returns:

Type Description
bool

Whether the other object is a pipeline class view with same name.

Source code in zenml/post_execution/pipeline.py
def __eq__(self, other: Any) -> bool:
    """Compares this pipeline class view to another object.

    Args:
        other: The other object to compare to.

    Returns:
        Whether the other object is a pipeline class view with same name.
    """
    if not isinstance(other, PipelineView):
        return False
    return self.name == other.name
__init__(self, name) special

Initializes a post-execution class for a pipeline name/class.

Parameters:

Name Type Description Default
name str

The name of the pipeline.

required
Source code in zenml/post_execution/pipeline.py
def __init__(self, name: str):
    """Initializes a post-execution class for a pipeline name/class.

    Args:
        name: The name of the pipeline.
    """
    self.name = name

pipeline_run

Implementation of the post-execution pipeline run class.

PipelineRunView (BaseView)

Post-execution pipeline run class.

This can be used to query steps and artifact information associated with a pipeline execution.

Source code in zenml/post_execution/pipeline_run.py
class PipelineRunView(BaseView):
    """Post-execution pipeline run class.

    This can be used to query steps and artifact information associated with a
    pipeline execution.
    """

    MODEL_CLASS: Type[BaseResponseModel] = PipelineRunResponseModel
    REPR_KEYS = ["id", "name"]

    def __init__(self, model: PipelineRunResponseModel):
        """Initializes a post-execution pipeline run object.

        In most cases `PipelineRunView` objects should not be created manually
        but retrieved from a `PipelineView` or `PipelineVersionView` instead.

        Args:
            model: The model to initialize this object from.
        """
        super().__init__(model)
        self._steps: Dict[str, StepView] = OrderedDict()

    @property
    def model(self) -> PipelineRunResponseModel:
        """Returns the underlying `PipelineRunResponseModel`.

        Returns:
            The underlying `PipelineRunResponseModel`.
        """
        return cast(PipelineRunResponseModel, self._model)

    @property
    def settings(self) -> Dict[str, Any]:
        """Returns the pipeline settings.

        These are runtime settings passed down to stack components, which
        can be set at pipeline level.

        Returns:
            The pipeline settings.
        """
        return self.model.pipeline_configuration.settings

    @property
    def extra(self) -> Dict[str, Any]:
        """Returns the pipeline extras.

        This dict is meant to be used to pass any configuration down to the
        pipeline or stack components that the user has use of.

        Returns:
            The pipeline extras.
        """
        return self.model.pipeline_configuration.extra

    @property
    def enable_cache(self) -> Optional[bool]:
        """Returns whether caching is enabled for this pipeline run.

        Returns:
            True if caching is enabled for this pipeline run.
        """
        return self.model.pipeline_configuration.enable_cache

    @property
    def enable_artifact_metadata(self) -> Optional[bool]:
        """Returns whether artifact metadata is enabled for this pipeline run.

        Returns:
            True if artifact metadata is enabled for this pipeline run.
        """
        return self.model.pipeline_configuration.enable_artifact_metadata

    @property
    def enable_artifact_visualization(self) -> Optional[bool]:
        """Returns whether artifact visualization is enabled for this run.

        Returns:
            True if artifact visualization is enabled for this pipeline run.
        """
        return self.model.pipeline_configuration.enable_artifact_visualization

    @property
    def commit(self) -> Optional[str]:
        """Returns the code repository commit of the pipeline run.

        Returns:
            The code repository commit of the pipeline run.
        """
        deployment = self.model.deployment

        if not deployment:
            return None

        if not deployment.code_reference:
            return None

        return deployment.code_reference.commit

    @property
    def status(self) -> ExecutionStatus:
        """Returns the current status of the pipeline run.

        Returns:
            The current status of the pipeline run.
        """
        # Query the run again since the status might have changed since this
        # object was created.
        return Client().get_pipeline_run(self.model.id).status

    @property
    def steps(self) -> List[StepView]:
        """Returns all steps that were executed as part of this pipeline run.

        Returns:
            A list of all steps that were executed as part of this pipeline run.
        """
        self._ensure_steps_fetched()
        return list(self._steps.values())

    def get_step_names(self) -> List[str]:
        """Returns a list of all step names.

        Returns:
            A list of all step names.
        """
        self._ensure_steps_fetched()
        return list(self._steps.keys())

    def get_step(
        self,
        step: Optional[str] = None,
        **kwargs: Any,
    ) -> StepView:
        """Returns a step for the given name.

        The name refers to the name of the step in the pipeline definition, not
        the class name of the step-class.

        Use it like this:
        ```python
        # Get the step by name
        pipeline_run_view.get_step("first_step")
        ```

        Args:
            step: Class or class instance of the step
            **kwargs: The deprecated `name` is caught as a kwarg to
                specify the step instead of using the `step` argument.

        Returns:
            A step for the given name.

        Raises:
            KeyError: If there is no step with the given name.
            RuntimeError: If no step has been specified at all.
        """
        self._ensure_steps_fetched()

        api_doc_link = get_apidocs_link(
            "core-post_execution",
            "zenml.post_execution.pipeline_run.PipelineRunView" ".get_step",
        )
        step_name = kwargs.get("name", None)

        # Raise an error if neither `step` nor `name` args were provided.
        if not step and not isinstance(step_name, str):
            raise RuntimeError(
                "No step specified. Please specify a step using "
                "pipeline_run_view.get_step(step=`step_name`). "
                f"Please refer to the API docs to learn more: "
                f"{api_doc_link}"
            )

        # If `name` was provided but not `step`, print a depreciation warning.
        if not step:
            logger.warning(
                "Using 'name' to get a step from "
                "'PipelineRunView.get_step()' is deprecated and "
                "will be removed in the future. Instead please "
                "use 'step' to access a step from your past "
                "pipeline runs. Learn more in our API docs: %s",
                api_doc_link,
            )
            step = step_name

        # Raise an error if there is no such step in the given pipeline run.
        if step not in self._steps:
            raise KeyError(
                f"No step found for name `{step}`. This pipeline "
                f"run only has steps with the following "
                f"names: `{self.get_step_names()}`"
            )

        return self._steps[step]

    def visualize(self) -> None:
        """Visualizes all output artifacts produced by this pipeline run."""
        for step_ in self.steps:
            step_.visualize()

    def _ensure_steps_fetched(self) -> None:
        """Fetches all steps for this pipeline run from the metadata store."""
        if self._steps:
            # we already fetched the steps, no need to do anything
            return

        client = Client()
        steps = depaginate(
            partial(client.list_run_steps, pipeline_run_id=self.model.id)
        )

        self._steps = {step.name: StepView(step) for step in steps}
commit: Optional[str] property readonly

Returns the code repository commit of the pipeline run.

Returns:

Type Description
Optional[str]

The code repository commit of the pipeline run.

enable_artifact_metadata: Optional[bool] property readonly

Returns whether artifact metadata is enabled for this pipeline run.

Returns:

Type Description
Optional[bool]

True if artifact metadata is enabled for this pipeline run.

enable_artifact_visualization: Optional[bool] property readonly

Returns whether artifact visualization is enabled for this run.

Returns:

Type Description
Optional[bool]

True if artifact visualization is enabled for this pipeline run.

enable_cache: Optional[bool] property readonly

Returns whether caching is enabled for this pipeline run.

Returns:

Type Description
Optional[bool]

True if caching is enabled for this pipeline run.

extra: Dict[str, Any] property readonly

Returns the pipeline extras.

This dict is meant to be used to pass any configuration down to the pipeline or stack components that the user has use of.

Returns:

Type Description
Dict[str, Any]

The pipeline extras.

model: PipelineRunResponseModel property readonly

Returns the underlying PipelineRunResponseModel.

Returns:

Type Description
PipelineRunResponseModel

The underlying PipelineRunResponseModel.

settings: Dict[str, Any] property readonly

Returns the pipeline settings.

These are runtime settings passed down to stack components, which can be set at pipeline level.

Returns:

Type Description
Dict[str, Any]

The pipeline settings.

status: ExecutionStatus property readonly

Returns the current status of the pipeline run.

Returns:

Type Description
ExecutionStatus

The current status of the pipeline run.

steps: List[zenml.post_execution.step.StepView] property readonly

Returns all steps that were executed as part of this pipeline run.

Returns:

Type Description
List[zenml.post_execution.step.StepView]

A list of all steps that were executed as part of this pipeline run.

MODEL_CLASS (PipelineRunBaseModel, WorkspaceScopedResponseModel) pydantic-model

Pipeline run model with user, workspace, pipeline, and stack hydrated.

Source code in zenml/post_execution/pipeline_run.py
class PipelineRunResponseModel(
    PipelineRunBaseModel, WorkspaceScopedResponseModel
):
    """Pipeline run model with user, workspace, pipeline, and stack hydrated."""

    pipeline: Optional["PipelineResponseModel"] = Field(
        default=None, title="The pipeline this run belongs to."
    )
    stack: Optional["StackResponseModel"] = Field(
        default=None, title="The stack that was used for this run."
    )

    metadata: Dict[str, "RunMetadataResponseModel"] = Field(
        default={},
        title="Metadata associated with this pipeline run.",
    )

    build: Optional["PipelineBuildResponseModel"] = Field(
        default=None, title="The pipeline build that was used for this run."
    )

    deployment: Optional["PipelineDeploymentResponseModel"] = Field(
        default=None, title="The deployment that was used for this run."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

__init__(self, model) special

Initializes a post-execution pipeline run object.

In most cases PipelineRunView objects should not be created manually but retrieved from a PipelineView or PipelineVersionView instead.

Parameters:

Name Type Description Default
model PipelineRunResponseModel

The model to initialize this object from.

required
Source code in zenml/post_execution/pipeline_run.py
def __init__(self, model: PipelineRunResponseModel):
    """Initializes a post-execution pipeline run object.

    In most cases `PipelineRunView` objects should not be created manually
    but retrieved from a `PipelineView` or `PipelineVersionView` instead.

    Args:
        model: The model to initialize this object from.
    """
    super().__init__(model)
    self._steps: Dict[str, StepView] = OrderedDict()
get_step(self, step=None, **kwargs)

Returns a step for the given name.

The name refers to the name of the step in the pipeline definition, not the class name of the step-class.

Use it like this:

# Get the step by name
pipeline_run_view.get_step("first_step")

Parameters:

Name Type Description Default
step Optional[str]

Class or class instance of the step

None
**kwargs Any

The deprecated name is caught as a kwarg to specify the step instead of using the step argument.

{}

Returns:

Type Description
StepView

A step for the given name.

Exceptions:

Type Description
KeyError

If there is no step with the given name.

RuntimeError

If no step has been specified at all.

Source code in zenml/post_execution/pipeline_run.py
def get_step(
    self,
    step: Optional[str] = None,
    **kwargs: Any,
) -> StepView:
    """Returns a step for the given name.

    The name refers to the name of the step in the pipeline definition, not
    the class name of the step-class.

    Use it like this:
    ```python
    # Get the step by name
    pipeline_run_view.get_step("first_step")
    ```

    Args:
        step: Class or class instance of the step
        **kwargs: The deprecated `name` is caught as a kwarg to
            specify the step instead of using the `step` argument.

    Returns:
        A step for the given name.

    Raises:
        KeyError: If there is no step with the given name.
        RuntimeError: If no step has been specified at all.
    """
    self._ensure_steps_fetched()

    api_doc_link = get_apidocs_link(
        "core-post_execution",
        "zenml.post_execution.pipeline_run.PipelineRunView" ".get_step",
    )
    step_name = kwargs.get("name", None)

    # Raise an error if neither `step` nor `name` args were provided.
    if not step and not isinstance(step_name, str):
        raise RuntimeError(
            "No step specified. Please specify a step using "
            "pipeline_run_view.get_step(step=`step_name`). "
            f"Please refer to the API docs to learn more: "
            f"{api_doc_link}"
        )

    # If `name` was provided but not `step`, print a depreciation warning.
    if not step:
        logger.warning(
            "Using 'name' to get a step from "
            "'PipelineRunView.get_step()' is deprecated and "
            "will be removed in the future. Instead please "
            "use 'step' to access a step from your past "
            "pipeline runs. Learn more in our API docs: %s",
            api_doc_link,
        )
        step = step_name

    # Raise an error if there is no such step in the given pipeline run.
    if step not in self._steps:
        raise KeyError(
            f"No step found for name `{step}`. This pipeline "
            f"run only has steps with the following "
            f"names: `{self.get_step_names()}`"
        )

    return self._steps[step]
get_step_names(self)

Returns a list of all step names.

Returns:

Type Description
List[str]

A list of all step names.

Source code in zenml/post_execution/pipeline_run.py
def get_step_names(self) -> List[str]:
    """Returns a list of all step names.

    Returns:
        A list of all step names.
    """
    self._ensure_steps_fetched()
    return list(self._steps.keys())
visualize(self)

Visualizes all output artifacts produced by this pipeline run.

Source code in zenml/post_execution/pipeline_run.py
def visualize(self) -> None:
    """Visualizes all output artifacts produced by this pipeline run."""
    for step_ in self.steps:
        step_.visualize()

get_run(name)

Fetches the post-execution view of a run with the given name.

Parameters:

Name Type Description Default
name str

The name of the run to fetch.

required

Returns:

Type Description
PipelineRunView

The post-execution view of the run with the given name.

Exceptions:

Type Description
KeyError

If no run with the given name exists.

RuntimeError

If multiple runs with the given name exist.

Source code in zenml/post_execution/pipeline_run.py
def get_run(name: str) -> "PipelineRunView":
    """Fetches the post-execution view of a run with the given name.

    Args:
        name: The name of the run to fetch.

    Returns:
        The post-execution view of the run with the given name.

    Raises:
        KeyError: If no run with the given name exists.
        RuntimeError: If multiple runs with the given name exist.
    """
    client = Client()
    active_workspace_id = client.active_workspace.id
    runs = client.list_runs(
        name=name,
        workspace_id=active_workspace_id,
    )

    # TODO: [server] this error handling could be improved
    if not runs:
        raise KeyError(f"No run with name '{name}' exists.")
    elif runs.total > 1:
        raise RuntimeError(
            f"Multiple runs have been found for name  '{name}'.", runs
        )
    return PipelineRunView(runs.items[0])

get_unlisted_runs()

Fetches the post-execution views of the 50 most recent unlisted runs.

Unlisted runs are runs that are not associated with any pipeline.

Returns:

Type Description
List[PipelineRunView]

A list of post-execution run views.

Source code in zenml/post_execution/pipeline_run.py
def get_unlisted_runs() -> List["PipelineRunView"]:
    """Fetches the post-execution views of the 50 most recent unlisted runs.

    Unlisted runs are runs that are not associated with any pipeline.

    Returns:
        A list of post-execution run views.
    """
    client = Client()
    runs = client.list_runs(
        workspace_id=client.active_workspace.id,
        unlisted=True,
        size=50,
        sort_by="desc:created",
    )
    return [PipelineRunView(model) for model in runs.items]

step

Implementation of a post-execution step class.

StepView (BaseView)

Post-execution step class.

This can be used to query artifact information associated with a pipeline step.

Source code in zenml/post_execution/step.py
class StepView(BaseView):
    """Post-execution step class.

    This can be used to query artifact information associated with a pipeline step.
    """

    MODEL_CLASS: Type[BaseResponseModel] = StepRunResponseModel
    REPR_KEYS = ["id", "name", "entrypoint_name"]

    def __init__(self, model: StepRunResponseModel):
        """Initializes a post-execution step object.

        In most cases `StepView` objects should not be created manually
        but retrieved from a `PipelineRunView` object instead.

        Args:
            model: The model to initialize this object from.
        """
        super().__init__(model)
        self._inputs: Dict[str, ArtifactView] = {}
        self._outputs: Dict[str, ArtifactView] = {}

    @property
    def model(self) -> StepRunResponseModel:
        """Returns the underlying `StepRunResponseModel`.

        Returns:
            The underlying `StepRunResponseModel`.
        """
        return cast(StepRunResponseModel, self._model)

    @property
    def step_configuration(self) -> "StepConfiguration":
        """Returns the step configuration.

        Returns:
            The step configuration.
        """
        return self.model.step.config

    @property
    def entrypoint_name(self) -> str:
        """Returns the step entrypoint_name.

        This name is equal to the name argument passed to the @step decorator
        or the actual function name if no explicit name was given.

        Examples:
            # the step entrypoint_name will be "my_step"
            @step(step="my_step")
            def my_step_function(...)

            # the step entrypoint_name will be "my_step_function"
            @step
            def my_step_function(...)

        Returns:
            The step entrypoint_name.
        """
        return self.step_configuration.name

    @property
    def parameters(self) -> Dict[str, str]:
        """The parameters used to run this step.

        Returns:
            The parameters used to run this step.
        """
        return self.step_configuration.parameters

    @property
    def settings(self) -> Dict[str, "BaseSettings"]:
        """Returns the step settings.

        These are runtime settings passed down to stack components, which
        can be set at step level.

        Returns:
            The step settings.
        """
        return self.step_configuration.settings

    @property
    def extra(self) -> Dict[str, Any]:
        """Returns the extra dictionary.

        This dict is meant to be used to pass any configuration down to the
        step that the user has use of.

        Returns:
            The extra dictionary.
        """
        return self.step_configuration.extra

    @property
    def enable_cache(self) -> Optional[bool]:
        """Returns whether caching is enabled for this step.

        Returns:
            Whether caching is enabled for this step.
        """
        return self.step_configuration.enable_cache

    @property
    def enable_artifact_metadata(self) -> Optional[bool]:
        """Returns whether artifact metadata is enabled for this step.

        Returns:
            Whether artifact metadata is enabled for this step.
        """
        return self.step_configuration.enable_artifact_metadata

    @property
    def enable_artifact_visualization(self) -> Optional[bool]:
        """Returns whether artifact visualization is enabled for this step.

        Returns:
            Whether artifact visualization is enabled for this step.
        """
        return self.step_configuration.enable_artifact_visualization

    @property
    def step_operator(self) -> Optional[str]:
        """Returns the name of the step operator of the step.

        Returns:
            The name of the step operator of the step.
        """
        return self.step_configuration.step_operator

    @property
    def experiment_tracker(self) -> Optional[str]:
        """Returns the name of the experiment tracker of the step.

        Returns:
            The name of the experiment tracker of the step.
        """
        return self.step_configuration.experiment_tracker

    @property
    def spec(self) -> "StepSpec":
        """Returns the step spec.

        The step spec defines the source path and upstream steps of a step and
        is used primarily to compare whether two steps are the same.

        Returns:
            The step spec.
        """
        return self.model.step.spec

    @property
    def status(self) -> ExecutionStatus:
        """Returns the current status of the step.

        Returns:
            The current status of the step.
        """
        # Query the step again since the status might have changed since this
        # object was created.
        return Client().zen_store.get_run_step(self.model.id).status

    @property
    def is_cached(self) -> bool:
        """Returns whether the step is cached or not.

        Returns:
            True if the step is cached, False otherwise.
        """
        return self.status == ExecutionStatus.CACHED

    @property
    def is_completed(self) -> bool:
        """Returns whether the step is cached or not.

        Returns:
            True if the step is completed, False otherwise.
        """
        return self.status == ExecutionStatus.COMPLETED

    @property
    def inputs(self) -> Dict[str, ArtifactView]:
        """Returns all input artifacts that were used to run this step.

        Returns:
            A dictionary of artifact names to artifact views.
        """
        self._ensure_inputs_fetched()
        return self._inputs

    @property
    def input(self) -> ArtifactView:
        """Returns the input artifact that was used to run this step.

        Returns:
            The input artifact.

        Raises:
            ValueError: If there were zero or multiple inputs to this step.
        """
        if len(self.inputs) != 1:
            raise ValueError(
                "Can't use the `StepView.input` property for steps with zero "
                "or multiple inputs, use `StepView.inputs` instead."
            )
        return next(iter(self.inputs.values()))

    @property
    def outputs(self) -> Dict[str, ArtifactView]:
        """Returns all output artifacts that were written by this step.

        Returns:
            A dictionary of artifact names to artifact views.
        """
        self._ensure_outputs_fetched()
        return self._outputs

    @property
    def output(self) -> ArtifactView:
        """Returns the output artifact that was written by this step.

        Returns:
            The output artifact.

        Raises:
            ValueError: If there were zero or multiple step outputs.
        """
        if len(self.outputs) != 1:
            raise ValueError(
                "Can't use the `StepView.output` property for steps with zero "
                "or multiple outputs, use `StepView.outputs` instead."
            )
        return next(iter(self.outputs.values()))

    def visualize(self) -> None:
        """Visualizes all output artifacts of the step."""
        output_artifacts = self.outputs.values()
        for artifact in sorted(output_artifacts, key=lambda a: a.model.name):
            title = f"{self.model.name} - {artifact.model.name}"
            artifact.visualize(title=title)

    def _ensure_inputs_fetched(self) -> None:
        """Fetches all step inputs from the ZenStore."""
        if self._inputs:
            # we already fetched inputs, no need to do anything
            return

        self._inputs = {
            name: ArtifactView(artifact_model)
            for name, artifact_model in self.model.input_artifacts.items()
        }

    def _ensure_outputs_fetched(self) -> None:
        """Fetches all step outputs from the ZenStore."""
        if self._outputs:
            # we already fetched outputs, no need to do anything
            return

        self._outputs = {
            name: ArtifactView(artifact_model)
            for name, artifact_model in self.model.output_artifacts.items()
        }
enable_artifact_metadata: Optional[bool] property readonly

Returns whether artifact metadata is enabled for this step.

Returns:

Type Description
Optional[bool]

Whether artifact metadata is enabled for this step.

enable_artifact_visualization: Optional[bool] property readonly

Returns whether artifact visualization is enabled for this step.

Returns:

Type Description
Optional[bool]

Whether artifact visualization is enabled for this step.

enable_cache: Optional[bool] property readonly

Returns whether caching is enabled for this step.

Returns:

Type Description
Optional[bool]

Whether caching is enabled for this step.

entrypoint_name: str property readonly

Returns the step entrypoint_name.

This name is equal to the name argument passed to the @step decorator or the actual function name if no explicit name was given.

Examples:

the step entrypoint_name will be "my_step"

@step(step="my_step") def my_step_function(...)

the step entrypoint_name will be "my_step_function"

@step def my_step_function(...)

Returns:

Type Description
str

The step entrypoint_name.

experiment_tracker: Optional[str] property readonly

Returns the name of the experiment tracker of the step.

Returns:

Type Description
Optional[str]

The name of the experiment tracker of the step.

extra: Dict[str, Any] property readonly

Returns the extra dictionary.

This dict is meant to be used to pass any configuration down to the step that the user has use of.

Returns:

Type Description
Dict[str, Any]

The extra dictionary.

input: ArtifactView property readonly

Returns the input artifact that was used to run this step.

Returns:

Type Description
ArtifactView

The input artifact.

Exceptions:

Type Description
ValueError

If there were zero or multiple inputs to this step.

inputs: Dict[str, zenml.post_execution.artifact.ArtifactView] property readonly

Returns all input artifacts that were used to run this step.

Returns:

Type Description
Dict[str, zenml.post_execution.artifact.ArtifactView]

A dictionary of artifact names to artifact views.

is_cached: bool property readonly

Returns whether the step is cached or not.

Returns:

Type Description
bool

True if the step is cached, False otherwise.

is_completed: bool property readonly

Returns whether the step is cached or not.

Returns:

Type Description
bool

True if the step is completed, False otherwise.

model: StepRunResponseModel property readonly

Returns the underlying StepRunResponseModel.

Returns:

Type Description
StepRunResponseModel

The underlying StepRunResponseModel.

output: ArtifactView property readonly

Returns the output artifact that was written by this step.

Returns:

Type Description
ArtifactView

The output artifact.

Exceptions:

Type Description
ValueError

If there were zero or multiple step outputs.

outputs: Dict[str, zenml.post_execution.artifact.ArtifactView] property readonly

Returns all output artifacts that were written by this step.

Returns:

Type Description
Dict[str, zenml.post_execution.artifact.ArtifactView]

A dictionary of artifact names to artifact views.

parameters: Dict[str, str] property readonly

The parameters used to run this step.

Returns:

Type Description
Dict[str, str]

The parameters used to run this step.

settings: Dict[str, BaseSettings] property readonly

Returns the step settings.

These are runtime settings passed down to stack components, which can be set at step level.

Returns:

Type Description
Dict[str, BaseSettings]

The step settings.

spec: StepSpec property readonly

Returns the step spec.

The step spec defines the source path and upstream steps of a step and is used primarily to compare whether two steps are the same.

Returns:

Type Description
StepSpec

The step spec.

status: ExecutionStatus property readonly

Returns the current status of the step.

Returns:

Type Description
ExecutionStatus

The current status of the step.

step_configuration: StepConfiguration property readonly

Returns the step configuration.

Returns:

Type Description
StepConfiguration

The step configuration.

step_operator: Optional[str] property readonly

Returns the name of the step operator of the step.

Returns:

Type Description
Optional[str]

The name of the step operator of the step.

MODEL_CLASS (StepRunBaseModel, WorkspaceScopedResponseModel) pydantic-model

Response model for step runs.

Source code in zenml/post_execution/step.py
class StepRunResponseModel(StepRunBaseModel, WorkspaceScopedResponseModel):
    """Response model for step runs."""

    input_artifacts: Dict[str, "ArtifactResponseModel"] = {}
    output_artifacts: Dict[str, "ArtifactResponseModel"] = {}
    metadata: Dict[str, "RunMetadataResponseModel"] = Field(
        default={},
        title="Metadata associated with this step run.",
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

__init__(self, model) special

Initializes a post-execution step object.

In most cases StepView objects should not be created manually but retrieved from a PipelineRunView object instead.

Parameters:

Name Type Description Default
model StepRunResponseModel

The model to initialize this object from.

required
Source code in zenml/post_execution/step.py
def __init__(self, model: StepRunResponseModel):
    """Initializes a post-execution step object.

    In most cases `StepView` objects should not be created manually
    but retrieved from a `PipelineRunView` object instead.

    Args:
        model: The model to initialize this object from.
    """
    super().__init__(model)
    self._inputs: Dict[str, ArtifactView] = {}
    self._outputs: Dict[str, ArtifactView] = {}
visualize(self)

Visualizes all output artifacts of the step.

Source code in zenml/post_execution/step.py
def visualize(self) -> None:
    """Visualizes all output artifacts of the step."""
    output_artifacts = self.outputs.values()
    for artifact in sorted(output_artifacts, key=lambda a: a.model.name):
        title = f"{self.model.name} - {artifact.model.name}"
        artifact.visualize(title=title)