Skip to content

Post Execution

zenml.post_execution special

Initialization for the post-execution module.

After executing a pipeline, the user needs to be able to fetch it from history and perform certain tasks. The post_execution submodule provides a set of interfaces with which the user can interact with artifacts, the pipeline, steps, and the post-run pipeline object.

artifact

Initialization for the post-execution artifact class.

ArtifactView

Post-execution artifact class.

This can be used to read artifact data that was created during a pipeline execution.

Source code in zenml/post_execution/artifact.py
class ArtifactView:
    """Post-execution artifact class.

    This can be used to read artifact data that was created during a pipeline
    execution.
    """

    def __init__(self, model: "ArtifactResponseModel"):
        """Initializes a post-execution artifact object.

        In most cases `ArtifactView` objects should not be created manually but
        retrieved from a `StepView` via the `inputs` or `outputs` properties.

        Args:
            model: The model to initialize this object from.
        """
        self._model = model

    @property
    def id(self) -> UUID:
        """Returns the artifact id.

        Returns:
            The artifact id.
        """
        return self._model.id

    @property
    def name(self) -> str:
        """Returns the name of the artifact (output name in the parent step).

        Returns:
            The name of the artifact.
        """
        return self._model.name

    @property
    def type(self) -> str:
        """Returns the artifact type.

        Returns:
            The artifact type.
        """
        return self._model.type

    @property
    def data_type(self) -> str:
        """Returns the data type of the artifact.

        Returns:
            The data type of the artifact.
        """
        return self._model.data_type

    @property
    def uri(self) -> str:
        """Returns the URI where the artifact data is stored.

        Returns:
            The URI where the artifact data is stored.
        """
        return self._model.uri

    @property
    def materializer(self) -> str:
        """Returns the materializer that was used to write this artifact.

        Returns:
            The materializer that was used to write this artifact.
        """
        return self._model.materializer

    @property
    def producer_step_id(self) -> Optional[UUID]:
        """Returns the ID of the original step that produced the artifact.

        Returns:
            The ID of the original step that produced the artifact.
        """
        return self._model.producer_step_run_id

    def read(
        self,
        output_data_type: Optional[Type[Any]] = None,
        materializer_class: Optional[Type["BaseMaterializer"]] = None,
    ) -> Any:
        """Materializes the data stored in this artifact.

        Args:
            output_data_type: Deprecated; will be ignored.
            materializer_class: Deprecated; will be ignored.

        Returns:
            The materialized data.
        """
        if output_data_type is not None:
            logger.warning(
                "The `output_data_type` argument is deprecated and will be "
                "removed in a future release."
            )
        if materializer_class is not None:
            logger.warning(
                "The `materializer_class` argument is deprecated and will be "
                "removed in a future release."
            )

        from zenml.utils.materializer_utils import load_artifact

        return load_artifact(self._model)

    def __repr__(self) -> str:
        """Returns a string representation of this artifact.

        Returns:
            A string representation of this artifact.
        """
        return (
            f"{self.__class__.__qualname__}(id={self.id}, "
            f"type='{self.type}', uri='{self.uri}', "
            f"materializer='{self.materializer}')"
        )

    def __eq__(self, other: Any) -> bool:
        """Returns whether the other object is referring to the same artifact.

        Args:
            other: The other object to compare to.

        Returns:
            True if the other object is referring to the same artifact, else
            False.
        """
        if isinstance(other, ArtifactView):
            return self.id == other.id and self.uri == other.uri
        return NotImplemented
data_type: str property readonly

Returns the data type of the artifact.

Returns:

Type Description
str

The data type of the artifact.

id: UUID property readonly

Returns the artifact id.

Returns:

Type Description
UUID

The artifact id.

materializer: str property readonly

Returns the materializer that was used to write this artifact.

Returns:

Type Description
str

The materializer that was used to write this artifact.

name: str property readonly

Returns the name of the artifact (output name in the parent step).

Returns:

Type Description
str

The name of the artifact.

producer_step_id: Optional[uuid.UUID] property readonly

Returns the ID of the original step that produced the artifact.

Returns:

Type Description
Optional[uuid.UUID]

The ID of the original step that produced the artifact.

type: str property readonly

Returns the artifact type.

Returns:

Type Description
str

The artifact type.

uri: str property readonly

Returns the URI where the artifact data is stored.

Returns:

Type Description
str

The URI where the artifact data is stored.

__eq__(self, other) special

Returns whether the other object is referring to the same artifact.

Parameters:

Name Type Description Default
other Any

The other object to compare to.

required

Returns:

Type Description
bool

True if the other object is referring to the same artifact, else False.

Source code in zenml/post_execution/artifact.py
def __eq__(self, other: Any) -> bool:
    """Returns whether the other object is referring to the same artifact.

    Args:
        other: The other object to compare to.

    Returns:
        True if the other object is referring to the same artifact, else
        False.
    """
    if isinstance(other, ArtifactView):
        return self.id == other.id and self.uri == other.uri
    return NotImplemented
__init__(self, model) special

Initializes a post-execution artifact object.

In most cases ArtifactView objects should not be created manually but retrieved from a StepView via the inputs or outputs properties.

Parameters:

Name Type Description Default
model ArtifactResponseModel

The model to initialize this object from.

required
Source code in zenml/post_execution/artifact.py
def __init__(self, model: "ArtifactResponseModel"):
    """Initializes a post-execution artifact object.

    In most cases `ArtifactView` objects should not be created manually but
    retrieved from a `StepView` via the `inputs` or `outputs` properties.

    Args:
        model: The model to initialize this object from.
    """
    self._model = model
__repr__(self) special

Returns a string representation of this artifact.

Returns:

Type Description
str

A string representation of this artifact.

Source code in zenml/post_execution/artifact.py
def __repr__(self) -> str:
    """Returns a string representation of this artifact.

    Returns:
        A string representation of this artifact.
    """
    return (
        f"{self.__class__.__qualname__}(id={self.id}, "
        f"type='{self.type}', uri='{self.uri}', "
        f"materializer='{self.materializer}')"
    )
read(self, output_data_type=None, materializer_class=None)

Materializes the data stored in this artifact.

Parameters:

Name Type Description Default
output_data_type Optional[Type[Any]]

Deprecated; will be ignored.

None
materializer_class Optional[Type[BaseMaterializer]]

Deprecated; will be ignored.

None

Returns:

Type Description
Any

The materialized data.

Source code in zenml/post_execution/artifact.py
def read(
    self,
    output_data_type: Optional[Type[Any]] = None,
    materializer_class: Optional[Type["BaseMaterializer"]] = None,
) -> Any:
    """Materializes the data stored in this artifact.

    Args:
        output_data_type: Deprecated; will be ignored.
        materializer_class: Deprecated; will be ignored.

    Returns:
        The materialized data.
    """
    if output_data_type is not None:
        logger.warning(
            "The `output_data_type` argument is deprecated and will be "
            "removed in a future release."
        )
    if materializer_class is not None:
        logger.warning(
            "The `materializer_class` argument is deprecated and will be "
            "removed in a future release."
        )

    from zenml.utils.materializer_utils import load_artifact

    return load_artifact(self._model)

lineage special

Initialization of lineage generation module.

edge

Class for Edges in a lineage graph.

Edge (BaseModel) pydantic-model

A class that represents an edge in a lineage graph.

Source code in zenml/post_execution/lineage/edge.py
class Edge(BaseModel):
    """A class that represents an edge in a lineage graph."""

    id: str
    source: str
    target: str

lineage_graph

Class for lineage graph generation.

LineageGraph (BaseModel) pydantic-model

A lineage graph representation of a PipelineRunView.

Source code in zenml/post_execution/lineage/lineage_graph.py
class LineageGraph(BaseModel):
    """A lineage graph representation of a PipelineRunView."""

    nodes: List[Union[StepNode, ArtifactNode]] = []
    edges: List[Edge] = []
    root_step_id: Optional[str]

    def generate_step_nodes_and_edges(self, step: StepView) -> None:
        """Generates the step nodes and the edges between them.

        Args:
            step: The step to generate the nodes and edges for.
        """
        step_id = STEP_PREFIX + str(step.id)
        if self.root_step_id is None:
            self.root_step_id = step_id
        step_config = step.step_configuration.dict()
        if step_config:
            step_config = {
                key: value
                for key, value in step_config.items()
                if key not in ["inputs", "outputs", "parameters"] and value
            }
        self.nodes.append(
            StepNode(
                id=step_id,
                data=StepNodeDetails(
                    execution_id=str(step.id),
                    name=step.name,  # redundant for consistency
                    status=step.status,
                    entrypoint_name=step.entrypoint_name,  # redundant for consistency
                    parameters=step.parameters,
                    configuration=step_config,
                    inputs={k: v.uri for k, v in step.inputs.items()},
                    outputs={k: v.uri for k, v in step.outputs.items()},
                ),
            )
        )

        for artifact_name, artifact in step.outputs.items():
            artifact_id = ARTIFACT_PREFIX + str(artifact.id)
            self.nodes.append(
                ArtifactNode(
                    id=artifact_id,
                    data=ArtifactNodeDetails(
                        execution_id=str(artifact.id),
                        name=artifact_name,
                        status=step.status,
                        is_cached=step.status == ExecutionStatus.CACHED,
                        artifact_type=artifact.type,
                        artifact_data_type=artifact.data_type,
                        parent_step_id=str(step.id),
                        producer_step_id=str(artifact.producer_step_id),
                        uri=artifact.uri,
                    ),
                )
            )
            self.edges.append(
                Edge(
                    id=step_id + "_" + artifact_id,
                    source=step_id,
                    target=artifact_id,
                )
            )

        for artifact_name, artifact in step.inputs.items():
            artifact_id = ARTIFACT_PREFIX + str(artifact.id)
            self.edges.append(
                Edge(
                    id=step_id + "_" + artifact_id,
                    source=artifact_id,
                    target=step_id,
                )
            )

    def generate_run_nodes_and_edges(self, run: PipelineRunView) -> None:
        """Generates the run nodes and the edges between them.

        Args:
            run: The PipelineRunView to generate the lineage graph for.
        """
        for step in run.steps:
            self.generate_step_nodes_and_edges(step)
generate_run_nodes_and_edges(self, run)

Generates the run nodes and the edges between them.

Parameters:

Name Type Description Default
run PipelineRunView

The PipelineRunView to generate the lineage graph for.

required
Source code in zenml/post_execution/lineage/lineage_graph.py
def generate_run_nodes_and_edges(self, run: PipelineRunView) -> None:
    """Generates the run nodes and the edges between them.

    Args:
        run: The PipelineRunView to generate the lineage graph for.
    """
    for step in run.steps:
        self.generate_step_nodes_and_edges(step)
generate_step_nodes_and_edges(self, step)

Generates the step nodes and the edges between them.

Parameters:

Name Type Description Default
step StepView

The step to generate the nodes and edges for.

required
Source code in zenml/post_execution/lineage/lineage_graph.py
def generate_step_nodes_and_edges(self, step: StepView) -> None:
    """Generates the step nodes and the edges between them.

    Args:
        step: The step to generate the nodes and edges for.
    """
    step_id = STEP_PREFIX + str(step.id)
    if self.root_step_id is None:
        self.root_step_id = step_id
    step_config = step.step_configuration.dict()
    if step_config:
        step_config = {
            key: value
            for key, value in step_config.items()
            if key not in ["inputs", "outputs", "parameters"] and value
        }
    self.nodes.append(
        StepNode(
            id=step_id,
            data=StepNodeDetails(
                execution_id=str(step.id),
                name=step.name,  # redundant for consistency
                status=step.status,
                entrypoint_name=step.entrypoint_name,  # redundant for consistency
                parameters=step.parameters,
                configuration=step_config,
                inputs={k: v.uri for k, v in step.inputs.items()},
                outputs={k: v.uri for k, v in step.outputs.items()},
            ),
        )
    )

    for artifact_name, artifact in step.outputs.items():
        artifact_id = ARTIFACT_PREFIX + str(artifact.id)
        self.nodes.append(
            ArtifactNode(
                id=artifact_id,
                data=ArtifactNodeDetails(
                    execution_id=str(artifact.id),
                    name=artifact_name,
                    status=step.status,
                    is_cached=step.status == ExecutionStatus.CACHED,
                    artifact_type=artifact.type,
                    artifact_data_type=artifact.data_type,
                    parent_step_id=str(step.id),
                    producer_step_id=str(artifact.producer_step_id),
                    uri=artifact.uri,
                ),
            )
        )
        self.edges.append(
            Edge(
                id=step_id + "_" + artifact_id,
                source=step_id,
                target=artifact_id,
            )
        )

    for artifact_name, artifact in step.inputs.items():
        artifact_id = ARTIFACT_PREFIX + str(artifact.id)
        self.edges.append(
            Edge(
                id=step_id + "_" + artifact_id,
                source=artifact_id,
                target=step_id,
            )
        )

node special

Initialization of lineage nodes.

artifact_node

Class for all lineage artifact nodes.

ArtifactNode (BaseNode) pydantic-model

A class that represents an artifact node in a lineage graph.

Source code in zenml/post_execution/lineage/node/artifact_node.py
class ArtifactNode(BaseNode):
    """A class that represents an artifact node in a lineage graph."""

    type: str = "artifact"
    data: ArtifactNodeDetails
ArtifactNodeDetails (BaseNodeDetails) pydantic-model

Captures all artifact details for the node.

Source code in zenml/post_execution/lineage/node/artifact_node.py
class ArtifactNodeDetails(BaseNodeDetails):
    """Captures all artifact details for the node."""

    is_cached: bool
    artifact_type: str
    artifact_data_type: str
    parent_step_id: str
    producer_step_id: Optional[str]
    uri: str
base_node

Base class for all lineage nodes.

BaseNode (BaseModel) pydantic-model

A class that represents a node in a lineage graph.

Source code in zenml/post_execution/lineage/node/base_node.py
class BaseNode(BaseModel):
    """A class that represents a node in a lineage graph."""

    id: str
    type: str
    data: BaseNodeDetails
BaseNodeDetails (BaseModel) pydantic-model

Captures all details for the node.

Source code in zenml/post_execution/lineage/node/base_node.py
class BaseNodeDetails(BaseModel):
    """Captures all details for the node."""

    execution_id: str
    name: str
    status: ExecutionStatus
step_node

Class for all lineage step nodes.

StepNode (BaseNode) pydantic-model

A class that represents a step node in a lineage graph.

Source code in zenml/post_execution/lineage/node/step_node.py
class StepNode(BaseNode):
    """A class that represents a step node in a lineage graph."""

    type: str = "step"
    data: StepNodeDetails
StepNodeDetails (BaseNodeDetails) pydantic-model

Captures all artifact details for the node.

Source code in zenml/post_execution/lineage/node/step_node.py
class StepNodeDetails(BaseNodeDetails):
    """Captures all artifact details for the node."""

    entrypoint_name: str
    parameters: Dict[str, Any]
    configuration: Dict[str, Any]
    inputs: Dict[str, Any]
    outputs: Dict[str, Any]

pipeline

Implementation of the post-execution pipeline.

PipelineView

Post-execution pipeline class.

Source code in zenml/post_execution/pipeline.py
class PipelineView:
    """Post-execution pipeline class."""

    def __init__(self, model: PipelineResponseModel):
        """Initializes a post-execution pipeline object.

        In most cases `PipelineView` objects should not be created manually
        but retrieved using the `get_pipelines()` utility from
        `zenml.post_execution` instead.

        Args:
            model: The model to initialize this pipeline view from.
        """
        self._model = model

    @property
    def id(self) -> UUID:
        """Returns the ID of this pipeline.

        Returns:
            The ID of this pipeline.
        """
        assert self._model.id is not None
        return self._model.id

    @property
    def name(self) -> str:
        """Returns the name of the pipeline.

        Returns:
            The name of the pipeline.
        """
        return self._model.name

    @property
    def docstring(self) -> Optional[str]:
        """Returns the docstring of the pipeline.

        Returns:
            The docstring of the pipeline.
        """
        return self._model.docstring

    @property
    def spec(self) -> "PipelineSpec":
        """Returns the spec of the pipeline.

        The pipeline spec contains the source paths of all steps, as well as
        each of their upstream step names. This is primarily used to compare
        whether two pipelines are the same.

        Returns:
            The spec of the pipeline.
        """
        return self._model.spec

    @property
    def runs(self) -> List["PipelineRunView"]:
        """Returns all stored runs of this pipeline.

        The runs are returned in chronological order, so the latest
        run will be the last element in this list.

        Returns:
            A list of all stored runs of this pipeline.
        """
        # Do not cache runs as new runs might appear during this objects
        # lifecycle
        active_project_id = Client().active_project.id
        runs = Client().zen_store.list_runs(
            project_name_or_id=active_project_id,
            pipeline_id=self._model.id,
        )
        return [PipelineRunView(run) for run in runs]

    def get_run_for_completed_step(self, step_name: str) -> "PipelineRunView":
        """Ascertains which pipeline run produced the cached artifact of a given step.

        Args:
            step_name: Name of step at hand

        Returns:
            None if no run is found that completed the given step,
                else the original pipeline_run.

        Raises:
            LookupError: If no run is found that completed the given step
        """
        orig_pipeline_run = None

        for run in reversed(self.runs):
            try:
                step = run.get_step(step_name)
                if step.is_completed:
                    orig_pipeline_run = run
                    break
            except KeyError:
                pass
        if not orig_pipeline_run:
            raise LookupError(
                "No Pipeline Run could be found, that has"
                f" completed the provided step: [{step_name}]"
            )

        return orig_pipeline_run

    def __repr__(self) -> str:
        """Returns a string representation of this pipeline.

        Returns:
            A string representation of this pipeline.
        """
        return (
            f"{self.__class__.__qualname__}(id={self.id}, "
            f"name='{self.name}')"
        )

    def __eq__(self, other: Any) -> bool:
        """Returns whether the other object is referring to the same pipeline.

        Args:
            other: The other object to compare to.

        Returns:
            True if the other object is referring to the same pipeline,
            False otherwise.
        """
        if isinstance(other, PipelineView):
            return self.id == other.id
        return NotImplemented
docstring: Optional[str] property readonly

Returns the docstring of the pipeline.

Returns:

Type Description
Optional[str]

The docstring of the pipeline.

id: UUID property readonly

Returns the ID of this pipeline.

Returns:

Type Description
UUID

The ID of this pipeline.

name: str property readonly

Returns the name of the pipeline.

Returns:

Type Description
str

The name of the pipeline.

runs: List[PipelineRunView] property readonly

Returns all stored runs of this pipeline.

The runs are returned in chronological order, so the latest run will be the last element in this list.

Returns:

Type Description
List[PipelineRunView]

A list of all stored runs of this pipeline.

spec: PipelineSpec property readonly

Returns the spec of the pipeline.

The pipeline spec contains the source paths of all steps, as well as each of their upstream step names. This is primarily used to compare whether two pipelines are the same.

Returns:

Type Description
PipelineSpec

The spec of the pipeline.

__eq__(self, other) special

Returns whether the other object is referring to the same pipeline.

Parameters:

Name Type Description Default
other Any

The other object to compare to.

required

Returns:

Type Description
bool

True if the other object is referring to the same pipeline, False otherwise.

Source code in zenml/post_execution/pipeline.py
def __eq__(self, other: Any) -> bool:
    """Returns whether the other object is referring to the same pipeline.

    Args:
        other: The other object to compare to.

    Returns:
        True if the other object is referring to the same pipeline,
        False otherwise.
    """
    if isinstance(other, PipelineView):
        return self.id == other.id
    return NotImplemented
__init__(self, model) special

Initializes a post-execution pipeline object.

In most cases PipelineView objects should not be created manually but retrieved using the get_pipelines() utility from zenml.post_execution instead.

Parameters:

Name Type Description Default
model PipelineResponseModel

The model to initialize this pipeline view from.

required
Source code in zenml/post_execution/pipeline.py
def __init__(self, model: PipelineResponseModel):
    """Initializes a post-execution pipeline object.

    In most cases `PipelineView` objects should not be created manually
    but retrieved using the `get_pipelines()` utility from
    `zenml.post_execution` instead.

    Args:
        model: The model to initialize this pipeline view from.
    """
    self._model = model
__repr__(self) special

Returns a string representation of this pipeline.

Returns:

Type Description
str

A string representation of this pipeline.

Source code in zenml/post_execution/pipeline.py
def __repr__(self) -> str:
    """Returns a string representation of this pipeline.

    Returns:
        A string representation of this pipeline.
    """
    return (
        f"{self.__class__.__qualname__}(id={self.id}, "
        f"name='{self.name}')"
    )
get_run_for_completed_step(self, step_name)

Ascertains which pipeline run produced the cached artifact of a given step.

Parameters:

Name Type Description Default
step_name str

Name of step at hand

required

Returns:

Type Description
PipelineRunView

None if no run is found that completed the given step, else the original pipeline_run.

Exceptions:

Type Description
LookupError

If no run is found that completed the given step

Source code in zenml/post_execution/pipeline.py
def get_run_for_completed_step(self, step_name: str) -> "PipelineRunView":
    """Ascertains which pipeline run produced the cached artifact of a given step.

    Args:
        step_name: Name of step at hand

    Returns:
        None if no run is found that completed the given step,
            else the original pipeline_run.

    Raises:
        LookupError: If no run is found that completed the given step
    """
    orig_pipeline_run = None

    for run in reversed(self.runs):
        try:
            step = run.get_step(step_name)
            if step.is_completed:
                orig_pipeline_run = run
                break
        except KeyError:
            pass
    if not orig_pipeline_run:
        raise LookupError(
            "No Pipeline Run could be found, that has"
            f" completed the provided step: [{step_name}]"
        )

    return orig_pipeline_run

pipeline_run

Implementation of the post-execution pipeline run class.

PipelineRunView

Post-execution pipeline run class.

This can be used to query steps and artifact information associated with a pipeline execution.

Source code in zenml/post_execution/pipeline_run.py
class PipelineRunView:
    """Post-execution pipeline run class.

    This can be used to query steps and artifact information associated with a
    pipeline execution.
    """

    def __init__(self, model: PipelineRunResponseModel):
        """Initializes a post-execution pipeline run object.

        In most cases `PipelineRunView` objects should not be created manually
        but retrieved from a `PipelineView` object instead.

        Args:
            model: The model to initialize this object from.
        """
        self._model = model
        self._steps: Dict[str, StepView] = OrderedDict()

    @property
    def id(self) -> UUID:
        """Returns the ID of this pipeline run.

        Returns:
            The ID of this pipeline run.
        """
        assert self._model.id is not None
        return self._model.id

    @property
    def name(self) -> str:
        """Returns the name of the pipeline run.

        Returns:
            The name of the pipeline run.
        """
        return self._model.name

    @property
    def pipeline_configuration(self) -> Dict[str, Any]:
        """Returns the pipeline configuration.

        Returns:
            The pipeline configuration.
        """
        return self._model.pipeline_configuration

    @property
    def settings(self) -> Dict[str, Any]:
        """Returns the pipeline settings.

        These are runtime settings passed down to stack components, which
        can be set at pipeline level.

        Returns:
            The pipeline settings.
        """
        settings = self.pipeline_configuration["settings"]
        return cast(Dict[str, Any], settings)

    @property
    def extra(self) -> Dict[str, Any]:
        """Returns the pipeline extras.

        This dict is meant to be used to pass any configuration down to the
        pipeline or stack components that the user has use of.

        Returns:
            The pipeline extras.
        """
        extra = self.pipeline_configuration["extra"]
        return cast(Dict[str, Any], extra)

    @property
    def enable_cache(self) -> bool:
        """Returns whether caching is enabled for this pipeline run.

        Returns:
            True if caching is enabled for this pipeline run.
        """
        enable_cache = self.pipeline_configuration["enable_cache"]
        return cast(bool, enable_cache)

    @property
    def zenml_version(self) -> Optional[str]:
        """Version of ZenML that this pipeline run was performed with.

        Returns:
            The version of ZenML that this pipeline run was performed with.
        """
        return self._model.zenml_version

    @property
    def git_sha(self) -> Optional[str]:
        """Git commit SHA that this pipeline run was performed on.

        This will only be set if the pipeline code is in a git repository and
        there are no dirty files when running the pipeline.

        Returns:
            The git commit SHA that this pipeline run was performed on.
        """
        return self._model.git_sha

    @property
    def status(self) -> ExecutionStatus:
        """Returns the current status of the pipeline run.

        Returns:
            The current status of the pipeline run.
        """
        # Query the run again since the status might have changed since this
        # object was created.
        return Client().get_pipeline_run(self.id).status

    @property
    def created(self) -> datetime:
        """Returns the creation time of the pipeline run.

        Returns:
            The creation time of the pipeline run.
        """
        return self._model.created

    @property
    def steps(self) -> List[StepView]:
        """Returns all steps that were executed as part of this pipeline run.

        Returns:
            A list of all steps that were executed as part of this pipeline run.
        """
        self._ensure_steps_fetched()
        return list(self._steps.values())

    def get_step_names(self) -> List[str]:
        """Returns a list of all step names.

        Returns:
            A list of all step names.
        """
        self._ensure_steps_fetched()
        return list(self._steps.keys())

    def get_step(
        self,
        step: Optional[str] = None,
        **kwargs: Any,
    ) -> StepView:
        """Returns a step for the given name.

        The name refers to the name of the step in the pipeline definition, not
        the class name of the step-class.

        Use it like this:
        ```python
        # Get the step by name
        pipeline_run_view.get_step("first_step")
        ```

        Args:
            step: Class or class instance of the step
            **kwargs: The deprecated `name` is caught as a kwarg to
                specify the step instead of using the `step` argument.

        Returns:
            A step for the given name.

        Raises:
            KeyError: If there is no step with the given name.
            RuntimeError: If no step has been specified at all.
        """
        self._ensure_steps_fetched()

        api_doc_link = get_apidocs_link(
            "core-post_execution",
            "zenml.post_execution.pipeline_run.PipelineRunView" ".get_step",
        )
        step_name = kwargs.get("name", None)

        # Raise an error if neither `step` nor `name` args were provided.
        if not step and not isinstance(step_name, str):
            raise RuntimeError(
                "No step specified. Please specify a step using "
                "pipeline_run_view.get_step(step=`step_name`). "
                f"Please refer to the API docs to learn more: "
                f"{api_doc_link}"
            )

        # If `name` was provided but not `step`, print a depreciation warning.
        if not step:
            logger.warning(
                "Using 'name' to get a step from "
                "'PipelineRunView.get_step()' is deprecated and "
                "will be removed in the future. Instead please "
                "use 'step' to access a step from your past "
                "pipeline runs. Learn more in our API docs: %s",
                api_doc_link,
            )
            step = step_name

        # Raise an error if there is no such step in the given pipeline run.
        if step not in self._steps:
            raise KeyError(
                f"No step found for name `{step}`. This pipeline "
                f"run only has steps with the following "
                f"names: `{self.get_step_names()}`"
            )

        return self._steps[step]

    def _ensure_steps_fetched(self) -> None:
        """Fetches all steps for this pipeline run from the metadata store."""
        if self._steps:
            # we already fetched the steps, no need to do anything
            return

        assert self._model.id is not None
        steps = Client().zen_store.list_run_steps(self._model.id)
        self._steps = {step.name: StepView(step) for step in steps}

    def __repr__(self) -> str:
        """Returns a string representation of this pipeline run.

        Returns:
            A string representation of this pipeline run.
        """
        return (
            f"{self.__class__.__qualname__}(id={self.id}, "
            f"name='{self.name}')"
        )

    def __eq__(self, other: Any) -> bool:
        """Returns whether the other object is referring to the same pipeline run.

        Args:
            other: The other object to compare to.

        Returns:
            True if the other object is referring to the same pipeline run.
        """
        if isinstance(other, PipelineRunView):
            return self.id == other.id
        return NotImplemented
created: datetime property readonly

Returns the creation time of the pipeline run.

Returns:

Type Description
datetime

The creation time of the pipeline run.

enable_cache: bool property readonly

Returns whether caching is enabled for this pipeline run.

Returns:

Type Description
bool

True if caching is enabled for this pipeline run.

extra: Dict[str, Any] property readonly

Returns the pipeline extras.

This dict is meant to be used to pass any configuration down to the pipeline or stack components that the user has use of.

Returns:

Type Description
Dict[str, Any]

The pipeline extras.

git_sha: Optional[str] property readonly

Git commit SHA that this pipeline run was performed on.

This will only be set if the pipeline code is in a git repository and there are no dirty files when running the pipeline.

Returns:

Type Description
Optional[str]

The git commit SHA that this pipeline run was performed on.

id: UUID property readonly

Returns the ID of this pipeline run.

Returns:

Type Description
UUID

The ID of this pipeline run.

name: str property readonly

Returns the name of the pipeline run.

Returns:

Type Description
str

The name of the pipeline run.

pipeline_configuration: Dict[str, Any] property readonly

Returns the pipeline configuration.

Returns:

Type Description
Dict[str, Any]

The pipeline configuration.

settings: Dict[str, Any] property readonly

Returns the pipeline settings.

These are runtime settings passed down to stack components, which can be set at pipeline level.

Returns:

Type Description
Dict[str, Any]

The pipeline settings.

status: ExecutionStatus property readonly

Returns the current status of the pipeline run.

Returns:

Type Description
ExecutionStatus

The current status of the pipeline run.

steps: List[zenml.post_execution.step.StepView] property readonly

Returns all steps that were executed as part of this pipeline run.

Returns:

Type Description
List[zenml.post_execution.step.StepView]

A list of all steps that were executed as part of this pipeline run.

zenml_version: Optional[str] property readonly

Version of ZenML that this pipeline run was performed with.

Returns:

Type Description
Optional[str]

The version of ZenML that this pipeline run was performed with.

__eq__(self, other) special

Returns whether the other object is referring to the same pipeline run.

Parameters:

Name Type Description Default
other Any

The other object to compare to.

required

Returns:

Type Description
bool

True if the other object is referring to the same pipeline run.

Source code in zenml/post_execution/pipeline_run.py
def __eq__(self, other: Any) -> bool:
    """Returns whether the other object is referring to the same pipeline run.

    Args:
        other: The other object to compare to.

    Returns:
        True if the other object is referring to the same pipeline run.
    """
    if isinstance(other, PipelineRunView):
        return self.id == other.id
    return NotImplemented
__init__(self, model) special

Initializes a post-execution pipeline run object.

In most cases PipelineRunView objects should not be created manually but retrieved from a PipelineView object instead.

Parameters:

Name Type Description Default
model PipelineRunResponseModel

The model to initialize this object from.

required
Source code in zenml/post_execution/pipeline_run.py
def __init__(self, model: PipelineRunResponseModel):
    """Initializes a post-execution pipeline run object.

    In most cases `PipelineRunView` objects should not be created manually
    but retrieved from a `PipelineView` object instead.

    Args:
        model: The model to initialize this object from.
    """
    self._model = model
    self._steps: Dict[str, StepView] = OrderedDict()
__repr__(self) special

Returns a string representation of this pipeline run.

Returns:

Type Description
str

A string representation of this pipeline run.

Source code in zenml/post_execution/pipeline_run.py
def __repr__(self) -> str:
    """Returns a string representation of this pipeline run.

    Returns:
        A string representation of this pipeline run.
    """
    return (
        f"{self.__class__.__qualname__}(id={self.id}, "
        f"name='{self.name}')"
    )
get_step(self, step=None, **kwargs)

Returns a step for the given name.

The name refers to the name of the step in the pipeline definition, not the class name of the step-class.

Use it like this:

# Get the step by name
pipeline_run_view.get_step("first_step")

Parameters:

Name Type Description Default
step Optional[str]

Class or class instance of the step

None
**kwargs Any

The deprecated name is caught as a kwarg to specify the step instead of using the step argument.

{}

Returns:

Type Description
StepView

A step for the given name.

Exceptions:

Type Description
KeyError

If there is no step with the given name.

RuntimeError

If no step has been specified at all.

Source code in zenml/post_execution/pipeline_run.py
def get_step(
    self,
    step: Optional[str] = None,
    **kwargs: Any,
) -> StepView:
    """Returns a step for the given name.

    The name refers to the name of the step in the pipeline definition, not
    the class name of the step-class.

    Use it like this:
    ```python
    # Get the step by name
    pipeline_run_view.get_step("first_step")
    ```

    Args:
        step: Class or class instance of the step
        **kwargs: The deprecated `name` is caught as a kwarg to
            specify the step instead of using the `step` argument.

    Returns:
        A step for the given name.

    Raises:
        KeyError: If there is no step with the given name.
        RuntimeError: If no step has been specified at all.
    """
    self._ensure_steps_fetched()

    api_doc_link = get_apidocs_link(
        "core-post_execution",
        "zenml.post_execution.pipeline_run.PipelineRunView" ".get_step",
    )
    step_name = kwargs.get("name", None)

    # Raise an error if neither `step` nor `name` args were provided.
    if not step and not isinstance(step_name, str):
        raise RuntimeError(
            "No step specified. Please specify a step using "
            "pipeline_run_view.get_step(step=`step_name`). "
            f"Please refer to the API docs to learn more: "
            f"{api_doc_link}"
        )

    # If `name` was provided but not `step`, print a depreciation warning.
    if not step:
        logger.warning(
            "Using 'name' to get a step from "
            "'PipelineRunView.get_step()' is deprecated and "
            "will be removed in the future. Instead please "
            "use 'step' to access a step from your past "
            "pipeline runs. Learn more in our API docs: %s",
            api_doc_link,
        )
        step = step_name

    # Raise an error if there is no such step in the given pipeline run.
    if step not in self._steps:
        raise KeyError(
            f"No step found for name `{step}`. This pipeline "
            f"run only has steps with the following "
            f"names: `{self.get_step_names()}`"
        )

    return self._steps[step]
get_step_names(self)

Returns a list of all step names.

Returns:

Type Description
List[str]

A list of all step names.

Source code in zenml/post_execution/pipeline_run.py
def get_step_names(self) -> List[str]:
    """Returns a list of all step names.

    Returns:
        A list of all step names.
    """
    self._ensure_steps_fetched()
    return list(self._steps.keys())

get_run(name)

Fetches the post-execution view of a run with the given name.

Parameters:

Name Type Description Default
name str

The name of the run to fetch.

required

Returns:

Type Description
PipelineRunView

The post-execution view of the run with the given name.

Exceptions:

Type Description
KeyError

If no run with the given name exists.

RuntimeError

If multiple runs with the given name exist.

Source code in zenml/post_execution/pipeline_run.py
def get_run(name: str) -> "PipelineRunView":
    """Fetches the post-execution view of a run with the given name.

    Args:
        name: The name of the run to fetch.

    Returns:
        The post-execution view of the run with the given name.

    Raises:
        KeyError: If no run with the given name exists.
        RuntimeError: If multiple runs with the given name exist.
    """
    client = Client()
    active_project_id = client.active_project.id
    runs = client.zen_store.list_runs(
        name=name,
        project_name_or_id=active_project_id,
    )

    # TODO: [server] this error handling could be improved
    if not runs:
        raise KeyError(f"No run with name '{name}' exists.")
    elif len(runs) > 1:
        raise RuntimeError(
            f"Multiple runs have been found for name  '{name}'.", runs
        )
    return PipelineRunView(runs[0])

get_unlisted_runs()

Fetches post-execution views of all unlisted runs.

Unlisted runs are runs that are not associated with any pipeline.

Returns:

Type Description
List[PipelineRunView]

A list of post-execution run views.

Source code in zenml/post_execution/pipeline_run.py
def get_unlisted_runs() -> List["PipelineRunView"]:
    """Fetches post-execution views of all unlisted runs.

    Unlisted runs are runs that are not associated with any pipeline.

    Returns:
        A list of post-execution run views.
    """
    client = Client()
    runs = client.zen_store.list_runs(
        project_name_or_id=client.active_project.id,
        unlisted=True,
    )
    return [PipelineRunView(model) for model in runs]

step

Implementation of a post-execution step class.

StepView

Post-execution step class.

This can be used to query artifact information associated with a pipeline step.

Source code in zenml/post_execution/step.py
class StepView:
    """Post-execution step class.

    This can be used to query artifact information associated with a pipeline step.
    """

    def __init__(self, model: StepRunResponseModel):
        """Initializes a post-execution step object.

        In most cases `StepView` objects should not be created manually
        but retrieved from a `PipelineRunView` object instead.

        Args:
            model: The model to initialize this object from.
        """
        self._model = model
        self._inputs: Dict[str, ArtifactView] = {}
        self._outputs: Dict[str, ArtifactView] = {}

    @property
    def id(self) -> UUID:
        """Returns the step id.

        Returns:
            The step id.
        """
        assert self._model.id
        return self._model.id

    @property
    def parent_step_ids(self) -> List[UUID]:
        """Returns a list of IDs of all parents of this step.

        Returns:
            A list of IDs of all parents of this step.
        """
        assert self._model.parent_step_ids
        return self._model.parent_step_ids

    @property
    def entrypoint_name(self) -> str:
        """Returns the step entrypoint_name.

        This name is equal to the name argument passed to the @step decorator
        or the actual function name if no explicit name was given.

        Examples:
            # the step entrypoint_name will be "my_step"
            @step(step="my_step")
            def my_step_function(...)

            # the step entrypoint_name will be "my_step_function"
            @step
            def my_step_function(...)

        Returns:
            The step entrypoint_name.
        """
        return self.step_configuration.name

    @property
    def name(self) -> str:
        """Returns the name as it is defined in the pipeline.

        This name is equal to the name given to the step within the pipeline
        context

        Examples:
            @step()
            def my_step_function(...)

            @pipeline
            def my_pipeline_function(step_a)

            p = my_pipeline_function(
                    step_a = my_step_function()
                )

            The name will be `step_a`

        Returns:
            The name of this step.
        """
        return self._model.name

    @property
    def docstring(self) -> Optional[str]:
        """Docstring of the step function or class.

        Returns:
            The docstring of the step function or class.
        """
        return self.step_configuration.docstring

    @property
    def parameters(self) -> Dict[str, str]:
        """The parameters used to run this step.

        Returns:
            The parameters used to run this step.
        """
        return self.step_configuration.parameters

    @property
    def step_configuration(self) -> "StepConfiguration":
        """Returns the step configuration.

        Returns:
            The step configuration.
        """
        return self._model.step.config

    @property
    def settings(self) -> Dict[str, "BaseSettings"]:
        """Returns the step settings.

        These are runtime settings passed down to stack components, which
        can be set at step level.

        Returns:
            The step settings.
        """
        return self.step_configuration.settings

    @property
    def extra(self) -> Dict[str, Any]:
        """Returns the extra dictionary.

        This dict is meant to be used to pass any configuration down to the
        step that the user has use of.

        Returns:
            The extra dictionary.
        """
        return self.step_configuration.extra

    @property
    def enable_cache(self) -> bool:
        """Returns whether caching is enabled for this step.

        Returns:
            Whether caching is enabled for this step.
        """
        return self.step_configuration.enable_cache

    @property
    def step_operator(self) -> Optional[str]:
        """Returns the name of the step operator of the step.

        Returns:
            The name of the step operator of the step.
        """
        return self.step_configuration.step_operator

    @property
    def experiment_tracker(self) -> Optional[str]:
        """Returns the name of the experiment tracker of the step.

        Returns:
            The name of the experiment tracker of the step.
        """
        return self.step_configuration.experiment_tracker

    @property
    def spec(self) -> "StepSpec":
        """Returns the step spec.

        The step spec defines the source path and upstream steps of a step and
        is used primarily to compare whether two steps are the same.

        Returns:
            The step spec.
        """
        return self._model.step.spec

    @property
    def status(self) -> ExecutionStatus:
        """Returns the current status of the step.

        Returns:
            The current status of the step.
        """
        # Query the step again since the status might have changed since this
        # object was created.
        return Client().zen_store.get_run_step(self.id).status

    @property
    def is_cached(self) -> bool:
        """Returns whether the step is cached or not.

        Returns:
            True if the step is cached, False otherwise.
        """
        return self.status == ExecutionStatus.CACHED

    @property
    def is_completed(self) -> bool:
        """Returns whether the step is cached or not.

        Returns:
            True if the step is completed, False otherwise.
        """
        return self.status == ExecutionStatus.COMPLETED

    @property
    def inputs(self) -> Dict[str, ArtifactView]:
        """Returns all input artifacts that were used to run this step.

        Returns:
            A dictionary of artifact names to artifact views.
        """
        self._ensure_inputs_fetched()
        return self._inputs

    @property
    def input(self) -> ArtifactView:
        """Returns the input artifact that was used to run this step.

        Returns:
            The input artifact.

        Raises:
            ValueError: If there were zero or multiple inputs to this step.
        """
        if len(self.inputs) != 1:
            raise ValueError(
                "Can't use the `StepView.input` property for steps with zero "
                "or multiple inputs, use `StepView.inputs` instead."
            )
        return next(iter(self.inputs.values()))

    @property
    def outputs(self) -> Dict[str, ArtifactView]:
        """Returns all output artifacts that were written by this step.

        Returns:
            A dictionary of artifact names to artifact views.
        """
        self._ensure_outputs_fetched()
        return self._outputs

    @property
    def output(self) -> ArtifactView:
        """Returns the output artifact that was written by this step.

        Returns:
            The output artifact.

        Raises:
            ValueError: If there were zero or multiple step outputs.
        """
        if len(self.outputs) != 1:
            raise ValueError(
                "Can't use the `StepView.output` property for steps with zero "
                "or multiple outputs, use `StepView.outputs` instead."
            )
        return next(iter(self.outputs.values()))

    def _ensure_inputs_fetched(self) -> None:
        """Fetches all step inputs from the ZenStore."""
        if self._inputs:
            # we already fetched inputs, no need to do anything
            return

        self._inputs = {
            name: ArtifactView(artifact_model)
            for name, artifact_model in self._model.input_artifacts.items()
        }

    def _ensure_outputs_fetched(self) -> None:
        """Fetches all step outputs from the ZenStore."""
        if self._outputs:
            # we already fetched outputs, no need to do anything
            return

        self._outputs = {
            name: ArtifactView(artifact_model)
            for name, artifact_model in self._model.output_artifacts.items()
        }

    def __repr__(self) -> str:
        """Returns a string representation of this step.

        Returns:
            A string representation of this step.
        """
        return (
            f"{self.__class__.__qualname__}(id={self.id}, "
            f"name='{self.name}', entrypoint_name='{self.entrypoint_name}'"
        )

    def __eq__(self, other: Any) -> bool:
        """Returns whether the other object is referring to the same step.

        Args:
            other: The other object to compare to.

        Returns:
            True if the other object is referring to the same step, False
            otherwise.
        """
        if isinstance(other, StepView):
            return self.id == other.id
        return NotImplemented
docstring: Optional[str] property readonly

Docstring of the step function or class.

Returns:

Type Description
Optional[str]

The docstring of the step function or class.

enable_cache: bool property readonly

Returns whether caching is enabled for this step.

Returns:

Type Description
bool

Whether caching is enabled for this step.

entrypoint_name: str property readonly

Returns the step entrypoint_name.

This name is equal to the name argument passed to the @step decorator or the actual function name if no explicit name was given.

Examples:

the step entrypoint_name will be "my_step"

@step(step="my_step") def my_step_function(...)

the step entrypoint_name will be "my_step_function"

@step def my_step_function(...)

Returns:

Type Description
str

The step entrypoint_name.

experiment_tracker: Optional[str] property readonly

Returns the name of the experiment tracker of the step.

Returns:

Type Description
Optional[str]

The name of the experiment tracker of the step.

extra: Dict[str, Any] property readonly

Returns the extra dictionary.

This dict is meant to be used to pass any configuration down to the step that the user has use of.

Returns:

Type Description
Dict[str, Any]

The extra dictionary.

id: UUID property readonly

Returns the step id.

Returns:

Type Description
UUID

The step id.

input: ArtifactView property readonly

Returns the input artifact that was used to run this step.

Returns:

Type Description
ArtifactView

The input artifact.

Exceptions:

Type Description
ValueError

If there were zero or multiple inputs to this step.

inputs: Dict[str, zenml.post_execution.artifact.ArtifactView] property readonly

Returns all input artifacts that were used to run this step.

Returns:

Type Description
Dict[str, zenml.post_execution.artifact.ArtifactView]

A dictionary of artifact names to artifact views.

is_cached: bool property readonly

Returns whether the step is cached or not.

Returns:

Type Description
bool

True if the step is cached, False otherwise.

is_completed: bool property readonly

Returns whether the step is cached or not.

Returns:

Type Description
bool

True if the step is completed, False otherwise.

name: str property readonly

Returns the name as it is defined in the pipeline.

This name is equal to the name given to the step within the pipeline context

Examples:

@step() def my_step_function(...)

@pipeline def my_pipeline_function(step_a)

p = my_pipeline_function( step_a = my_step_function() )

The name will be step_a

Returns:

Type Description
str

The name of this step.

output: ArtifactView property readonly

Returns the output artifact that was written by this step.

Returns:

Type Description
ArtifactView

The output artifact.

Exceptions:

Type Description
ValueError

If there were zero or multiple step outputs.

outputs: Dict[str, zenml.post_execution.artifact.ArtifactView] property readonly

Returns all output artifacts that were written by this step.

Returns:

Type Description
Dict[str, zenml.post_execution.artifact.ArtifactView]

A dictionary of artifact names to artifact views.

parameters: Dict[str, str] property readonly

The parameters used to run this step.

Returns:

Type Description
Dict[str, str]

The parameters used to run this step.

parent_step_ids: List[uuid.UUID] property readonly

Returns a list of IDs of all parents of this step.

Returns:

Type Description
List[uuid.UUID]

A list of IDs of all parents of this step.

settings: Dict[str, BaseSettings] property readonly

Returns the step settings.

These are runtime settings passed down to stack components, which can be set at step level.

Returns:

Type Description
Dict[str, BaseSettings]

The step settings.

spec: StepSpec property readonly

Returns the step spec.

The step spec defines the source path and upstream steps of a step and is used primarily to compare whether two steps are the same.

Returns:

Type Description
StepSpec

The step spec.

status: ExecutionStatus property readonly

Returns the current status of the step.

Returns:

Type Description
ExecutionStatus

The current status of the step.

step_configuration: StepConfiguration property readonly

Returns the step configuration.

Returns:

Type Description
StepConfiguration

The step configuration.

step_operator: Optional[str] property readonly

Returns the name of the step operator of the step.

Returns:

Type Description
Optional[str]

The name of the step operator of the step.

__eq__(self, other) special

Returns whether the other object is referring to the same step.

Parameters:

Name Type Description Default
other Any

The other object to compare to.

required

Returns:

Type Description
bool

True if the other object is referring to the same step, False otherwise.

Source code in zenml/post_execution/step.py
def __eq__(self, other: Any) -> bool:
    """Returns whether the other object is referring to the same step.

    Args:
        other: The other object to compare to.

    Returns:
        True if the other object is referring to the same step, False
        otherwise.
    """
    if isinstance(other, StepView):
        return self.id == other.id
    return NotImplemented
__init__(self, model) special

Initializes a post-execution step object.

In most cases StepView objects should not be created manually but retrieved from a PipelineRunView object instead.

Parameters:

Name Type Description Default
model StepRunResponseModel

The model to initialize this object from.

required
Source code in zenml/post_execution/step.py
def __init__(self, model: StepRunResponseModel):
    """Initializes a post-execution step object.

    In most cases `StepView` objects should not be created manually
    but retrieved from a `PipelineRunView` object instead.

    Args:
        model: The model to initialize this object from.
    """
    self._model = model
    self._inputs: Dict[str, ArtifactView] = {}
    self._outputs: Dict[str, ArtifactView] = {}
__repr__(self) special

Returns a string representation of this step.

Returns:

Type Description
str

A string representation of this step.

Source code in zenml/post_execution/step.py
def __repr__(self) -> str:
    """Returns a string representation of this step.

    Returns:
        A string representation of this step.
    """
    return (
        f"{self.__class__.__qualname__}(id={self.id}, "
        f"name='{self.name}', entrypoint_name='{self.entrypoint_name}'"
    )