Post Execution
zenml.post_execution
special
Initialization for the post-execution module.
After executing a pipeline, the user needs to be able to fetch it from history and perform certain tasks. The post_execution submodule provides a set of interfaces with which the user can interact with artifacts, the pipeline, steps, and the post-run pipeline object.
artifact
Initialization for the post-execution artifact class.
ArtifactView (BaseView)
Post-execution artifact class.
This can be used to read artifact data that was created during a pipeline execution.
Source code in zenml/post_execution/artifact.py
class ArtifactView(BaseView):
"""Post-execution artifact class.
This can be used to read artifact data that was created during a pipeline
execution.
"""
MODEL_CLASS = ArtifactResponseModel
REPR_KEYS = ["id", "name", "uri"]
@property
def model(self) -> ArtifactResponseModel:
"""Returns the underlying `ArtifactResponseModel`.
Returns:
The underlying `ArtifactResponseModel`.
"""
return cast(ArtifactResponseModel, self._model)
def read(
self,
output_data_type: Optional[Type[Any]] = None,
materializer_class: Optional[Type["BaseMaterializer"]] = None,
) -> Any:
"""Materializes the data stored in this artifact.
Args:
output_data_type: Deprecated; will be ignored.
materializer_class: Deprecated; will be ignored.
Returns:
The materialized data.
"""
if output_data_type is not None:
logger.warning(
"The `output_data_type` argument is deprecated and will be "
"removed in a future release."
)
if materializer_class is not None:
logger.warning(
"The `materializer_class` argument is deprecated and will be "
"removed in a future release."
)
from zenml.utils.materializer_utils import load_artifact
return load_artifact(self.model)
model: ArtifactResponseModel
property
readonly
Returns the underlying ArtifactResponseModel
.
Returns:
Type | Description |
---|---|
ArtifactResponseModel |
The underlying |
MODEL_CLASS (ArtifactBaseModel, WorkspaceScopedResponseModel)
pydantic-model
Response model for artifacts.
Source code in zenml/post_execution/artifact.py
class ArtifactResponseModel(ArtifactBaseModel, WorkspaceScopedResponseModel):
"""Response model for artifacts."""
producer_step_run_id: Optional[UUID]
metadata: Dict[str, "RunMetadataResponseModel"] = Field(
default={}, title="Metadata of the artifact."
)
read(self, output_data_type=None, materializer_class=None)
Materializes the data stored in this artifact.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_data_type |
Optional[Type[Any]] |
Deprecated; will be ignored. |
None |
materializer_class |
Optional[Type[BaseMaterializer]] |
Deprecated; will be ignored. |
None |
Returns:
Type | Description |
---|---|
Any |
The materialized data. |
Source code in zenml/post_execution/artifact.py
def read(
self,
output_data_type: Optional[Type[Any]] = None,
materializer_class: Optional[Type["BaseMaterializer"]] = None,
) -> Any:
"""Materializes the data stored in this artifact.
Args:
output_data_type: Deprecated; will be ignored.
materializer_class: Deprecated; will be ignored.
Returns:
The materialized data.
"""
if output_data_type is not None:
logger.warning(
"The `output_data_type` argument is deprecated and will be "
"removed in a future release."
)
if materializer_class is not None:
logger.warning(
"The `materializer_class` argument is deprecated and will be "
"removed in a future release."
)
from zenml.utils.materializer_utils import load_artifact
return load_artifact(self.model)
base_view
Base classes for post-execution views.
BaseView (ABC)
Base class for post-execution views.
Subclasses should override the following attributes/methods:
- MODEL_CLASS
should be set to the model class the subclass is wrapping.
- The model
property should return self._model
cast to the correct type.
- (Optionally) REPR_KEYS
can be set to include a list of custom attributes
in the __repr__
method.
Source code in zenml/post_execution/base_view.py
class BaseView(ABC):
"""Base class for post-execution views.
Subclasses should override the following attributes/methods:
- `MODEL_CLASS` should be set to the model class the subclass is wrapping.
- The `model` property should return `self._model` cast to the correct type.
- (Optionally) `REPR_KEYS` can be set to include a list of custom attributes
in the `__repr__` method.
"""
MODEL_CLASS = BaseResponseModel # The model class to wrap.
REPR_KEYS = ["id"] # Keys to include in the `__repr__` method.
def __init__(self, model: BaseResponseModel):
"""Initializes a view for the given model.
Args:
model: The model to create a view for.
Raises:
TypeError: If the model is not of the correct type.
ValueError: If any of the `REPR_KEYS` are not valid.
"""
# Check that the model is of the correct type.
if not isinstance(model, self.MODEL_CLASS):
raise TypeError(
f"Model of {self.__class__.__name__} must be of type "
f"{self.MODEL_CLASS.__name__} but is {type(model).__name__}."
)
# Validate that all `REPR_KEYS` are valid.
for key in self.REPR_KEYS:
if (
key not in model.__fields__
and key not in self._custom_view_properties
):
raise ValueError(
f"Key {key} in {self.__class__.__name__}.REPR_KEYS is "
f"neither a field of {self.MODEL_CLASS.__name__} nor a "
f"custom property of {self.__class__.__name__}."
)
self._model = model
@property
def _custom_view_properties(self) -> List[str]:
"""Returns a list of custom view properties.
Returns:
A list of custom view properties.
"""
return [attr for attr in dir(self) if not attr.startswith("__")]
@property
@abstractmethod
def model(self) -> BaseResponseModel:
"""Returns the underlying model.
Subclasses should override this property to return `self._model` with
the correct model type.
E.g. `return cast(ArtifactResponseModel, self._model)`
Returns:
The underlying model.
"""
def __getattribute__(self, __name: str) -> Any:
"""Returns the attribute with the given name.
This method is overridden so we can access the model fields as if they
were attributes of this class.
Args:
__name: The name of the attribute to return.
Returns:
The attribute with the given name.
"""
# Handle special cases that are required by the `dir` call below
if __name in {"__dict__", "__class__"}:
return super().__getattribute__(__name)
# Check the custom view properties first in case of overwrites
if __name in {attr for attr in dir(self) if not attr.startswith("__")}:
return super().__getattribute__(__name)
# Then check if the attribute is a field in the model
if __name in self._model.__fields__:
return getattr(self._model, __name)
# Otherwise, fall back to the default behaviour
return super().__getattribute__(__name)
def __repr__(self) -> str:
"""Returns a string representation of this artifact.
The string representation is of the form
`__qualname__(<key1>=<value1>, <key2>=<value2>, ...)` where the keys
are the ones specified in `REPR_KEYS`.
Returns:
A string representation of this artifact.
"""
repr = self.__class__.__qualname__
repr_key_strs = [
f"{key}={getattr(self, key)}" for key in self.REPR_KEYS
]
details = ", ".join(repr_key_strs)
if details:
repr += f"({details})"
return repr
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same model.
Args:
other: The other object to compare to.
Returns:
True if the other object is referring to the same model, else False.
"""
if not isinstance(other, self.__class__):
return False
return self._model == other._model # Use the model's `__eq__` method.
model: BaseResponseModel
property
readonly
Returns the underlying model.
Subclasses should override this property to return self._model
with
the correct model type.
E.g. return cast(ArtifactResponseModel, self._model)
Returns:
Type | Description |
---|---|
BaseResponseModel |
The underlying model. |
MODEL_CLASS (AnalyticsTrackedModelMixin)
pydantic-model
Base domain model.
Used as a base class for all domain models that have the following common characteristics:
- are uniquely identified by a UUID
- have a creation timestamp and a last modified timestamp
Source code in zenml/post_execution/base_view.py
class BaseResponseModel(AnalyticsTrackedModelMixin):
"""Base domain model.
Used as a base class for all domain models that have the following common
characteristics:
* are uniquely identified by a UUID
* have a creation timestamp and a last modified timestamp
"""
id: UUID = Field(title="The unique resource id.")
created: datetime = Field(title="Time when this resource was created.")
updated: datetime = Field(
title="Time when this resource was last updated."
)
class Config:
"""Allow extras on Response Models."""
extra = "allow"
def __hash__(self) -> int:
"""Implementation of hash magic method.
Returns:
Hash of the UUID.
"""
return hash((type(self),) + tuple([self.id]))
def __eq__(self, other: Any) -> bool:
"""Implementation of equality magic method.
Args:
other: The other object to compare to.
Returns:
True if the other object is of the same type and has the same UUID.
"""
if isinstance(other, BaseResponseModel):
return self.id == other.id
else:
return False
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for base response models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["entity_id"] = self.id
return metadata
Config
Allow extras on Response Models.
Source code in zenml/post_execution/base_view.py
class Config:
"""Allow extras on Response Models."""
extra = "allow"
__eq__(self, other)
special
Implementation of equality magic method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
Any |
The other object to compare to. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the other object is of the same type and has the same UUID. |
Source code in zenml/post_execution/base_view.py
def __eq__(self, other: Any) -> bool:
"""Implementation of equality magic method.
Args:
other: The other object to compare to.
Returns:
True if the other object is of the same type and has the same UUID.
"""
if isinstance(other, BaseResponseModel):
return self.id == other.id
else:
return False
__hash__(self)
special
Implementation of hash magic method.
Returns:
Type | Description |
---|---|
int |
Hash of the UUID. |
Source code in zenml/post_execution/base_view.py
def __hash__(self) -> int:
"""Implementation of hash magic method.
Returns:
Hash of the UUID.
"""
return hash((type(self),) + tuple([self.id]))
get_analytics_metadata(self)
Fetches the analytics metadata for base response models.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The analytics metadata. |
Source code in zenml/post_execution/base_view.py
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Fetches the analytics metadata for base response models.
Returns:
The analytics metadata.
"""
metadata = super().get_analytics_metadata()
metadata["entity_id"] = self.id
return metadata
__eq__(self, other)
special
Returns whether the other object is referring to the same model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
Any |
The other object to compare to. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the other object is referring to the same model, else False. |
Source code in zenml/post_execution/base_view.py
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same model.
Args:
other: The other object to compare to.
Returns:
True if the other object is referring to the same model, else False.
"""
if not isinstance(other, self.__class__):
return False
return self._model == other._model # Use the model's `__eq__` method.
__getattribute__(self, _BaseView__name)
special
Returns the attribute with the given name.
This method is overridden so we can access the model fields as if they were attributes of this class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
__name |
The name of the attribute to return. |
required |
Returns:
Type | Description |
---|---|
Any |
The attribute with the given name. |
Source code in zenml/post_execution/base_view.py
def __getattribute__(self, __name: str) -> Any:
"""Returns the attribute with the given name.
This method is overridden so we can access the model fields as if they
were attributes of this class.
Args:
__name: The name of the attribute to return.
Returns:
The attribute with the given name.
"""
# Handle special cases that are required by the `dir` call below
if __name in {"__dict__", "__class__"}:
return super().__getattribute__(__name)
# Check the custom view properties first in case of overwrites
if __name in {attr for attr in dir(self) if not attr.startswith("__")}:
return super().__getattribute__(__name)
# Then check if the attribute is a field in the model
if __name in self._model.__fields__:
return getattr(self._model, __name)
# Otherwise, fall back to the default behaviour
return super().__getattribute__(__name)
__init__(self, model)
special
Initializes a view for the given model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
BaseResponseModel |
The model to create a view for. |
required |
Exceptions:
Type | Description |
---|---|
TypeError |
If the model is not of the correct type. |
ValueError |
If any of the |
Source code in zenml/post_execution/base_view.py
def __init__(self, model: BaseResponseModel):
"""Initializes a view for the given model.
Args:
model: The model to create a view for.
Raises:
TypeError: If the model is not of the correct type.
ValueError: If any of the `REPR_KEYS` are not valid.
"""
# Check that the model is of the correct type.
if not isinstance(model, self.MODEL_CLASS):
raise TypeError(
f"Model of {self.__class__.__name__} must be of type "
f"{self.MODEL_CLASS.__name__} but is {type(model).__name__}."
)
# Validate that all `REPR_KEYS` are valid.
for key in self.REPR_KEYS:
if (
key not in model.__fields__
and key not in self._custom_view_properties
):
raise ValueError(
f"Key {key} in {self.__class__.__name__}.REPR_KEYS is "
f"neither a field of {self.MODEL_CLASS.__name__} nor a "
f"custom property of {self.__class__.__name__}."
)
self._model = model
__repr__(self)
special
Returns a string representation of this artifact.
The string representation is of the form
__qualname__(<key1>=<value1>, <key2>=<value2>, ...)
where the keys
are the ones specified in REPR_KEYS
.
Returns:
Type | Description |
---|---|
str |
A string representation of this artifact. |
Source code in zenml/post_execution/base_view.py
def __repr__(self) -> str:
"""Returns a string representation of this artifact.
The string representation is of the form
`__qualname__(<key1>=<value1>, <key2>=<value2>, ...)` where the keys
are the ones specified in `REPR_KEYS`.
Returns:
A string representation of this artifact.
"""
repr = self.__class__.__qualname__
repr_key_strs = [
f"{key}={getattr(self, key)}" for key in self.REPR_KEYS
]
details = ", ".join(repr_key_strs)
if details:
repr += f"({details})"
return repr
lineage
special
Initialization of lineage generation module.
edge
Class for Edges in a lineage graph.
Edge (BaseModel)
pydantic-model
A class that represents an edge in a lineage graph.
Source code in zenml/post_execution/lineage/edge.py
class Edge(BaseModel):
"""A class that represents an edge in a lineage graph."""
id: str
source: str
target: str
lineage_graph
Class for lineage graph generation.
LineageGraph (BaseModel)
pydantic-model
A lineage graph representation of a PipelineRunView.
Source code in zenml/post_execution/lineage/lineage_graph.py
class LineageGraph(BaseModel):
"""A lineage graph representation of a PipelineRunView."""
nodes: List[Union[StepNode, ArtifactNode]] = []
edges: List[Edge] = []
root_step_id: Optional[str]
run_metadata: List[Tuple[str, str, str]] = []
def generate_step_nodes_and_edges(self, step: StepView) -> None:
"""Generates the step nodes and the edges between them.
Args:
step: The step to generate the nodes and edges for.
"""
step_id = STEP_PREFIX + str(step.id)
if self.root_step_id is None:
self.root_step_id = step_id
step_config = step.step_configuration.dict()
if step_config:
step_config = {
key: value
for key, value in step_config.items()
if key not in ["inputs", "outputs", "parameters"] and value
}
self.nodes.append(
StepNode(
id=step_id,
data=StepNodeDetails(
execution_id=str(step.id),
name=step.name, # redundant for consistency
status=step.status,
entrypoint_name=step.entrypoint_name, # redundant for consistency
parameters=step.parameters,
configuration=step_config,
inputs={k: v.uri for k, v in step.inputs.items()},
outputs={k: v.uri for k, v in step.outputs.items()},
metadata=[
(m.key, str(m.value), str(m.type))
for m in step.metadata.values()
],
),
)
)
for artifact_name, artifact in step.outputs.items():
artifact_id = ARTIFACT_PREFIX + str(artifact.id)
self.nodes.append(
ArtifactNode(
id=artifact_id,
data=ArtifactNodeDetails(
execution_id=str(artifact.id),
name=artifact_name,
status=step.status,
is_cached=step.status == ExecutionStatus.CACHED,
artifact_type=artifact.type,
artifact_data_type=artifact.data_type,
parent_step_id=str(step.id),
producer_step_id=str(artifact.producer_step_run_id),
uri=artifact.uri,
metadata=[
(m.key, str(m.value), str(m.type))
for m in artifact.metadata.values()
],
),
)
)
self.edges.append(
Edge(
id=step_id + "_" + artifact_id,
source=step_id,
target=artifact_id,
)
)
for artifact_name, artifact in step.inputs.items():
artifact_id = ARTIFACT_PREFIX + str(artifact.id)
self.edges.append(
Edge(
id=step_id + "_" + artifact_id,
source=artifact_id,
target=step_id,
)
)
def generate_run_nodes_and_edges(self, run: PipelineRunView) -> None:
"""Generates the run nodes and the edges between them.
Args:
run: The PipelineRunView to generate the lineage graph for.
"""
self.run_metadata = [
(m.key, str(m.value), str(m.type)) for m in run.metadata.values()
]
for step in run.steps:
self.generate_step_nodes_and_edges(step)
generate_run_nodes_and_edges(self, run)
Generates the run nodes and the edges between them.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run |
PipelineRunView |
The PipelineRunView to generate the lineage graph for. |
required |
Source code in zenml/post_execution/lineage/lineage_graph.py
def generate_run_nodes_and_edges(self, run: PipelineRunView) -> None:
"""Generates the run nodes and the edges between them.
Args:
run: The PipelineRunView to generate the lineage graph for.
"""
self.run_metadata = [
(m.key, str(m.value), str(m.type)) for m in run.metadata.values()
]
for step in run.steps:
self.generate_step_nodes_and_edges(step)
generate_step_nodes_and_edges(self, step)
Generates the step nodes and the edges between them.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step |
StepView |
The step to generate the nodes and edges for. |
required |
Source code in zenml/post_execution/lineage/lineage_graph.py
def generate_step_nodes_and_edges(self, step: StepView) -> None:
"""Generates the step nodes and the edges between them.
Args:
step: The step to generate the nodes and edges for.
"""
step_id = STEP_PREFIX + str(step.id)
if self.root_step_id is None:
self.root_step_id = step_id
step_config = step.step_configuration.dict()
if step_config:
step_config = {
key: value
for key, value in step_config.items()
if key not in ["inputs", "outputs", "parameters"] and value
}
self.nodes.append(
StepNode(
id=step_id,
data=StepNodeDetails(
execution_id=str(step.id),
name=step.name, # redundant for consistency
status=step.status,
entrypoint_name=step.entrypoint_name, # redundant for consistency
parameters=step.parameters,
configuration=step_config,
inputs={k: v.uri for k, v in step.inputs.items()},
outputs={k: v.uri for k, v in step.outputs.items()},
metadata=[
(m.key, str(m.value), str(m.type))
for m in step.metadata.values()
],
),
)
)
for artifact_name, artifact in step.outputs.items():
artifact_id = ARTIFACT_PREFIX + str(artifact.id)
self.nodes.append(
ArtifactNode(
id=artifact_id,
data=ArtifactNodeDetails(
execution_id=str(artifact.id),
name=artifact_name,
status=step.status,
is_cached=step.status == ExecutionStatus.CACHED,
artifact_type=artifact.type,
artifact_data_type=artifact.data_type,
parent_step_id=str(step.id),
producer_step_id=str(artifact.producer_step_run_id),
uri=artifact.uri,
metadata=[
(m.key, str(m.value), str(m.type))
for m in artifact.metadata.values()
],
),
)
)
self.edges.append(
Edge(
id=step_id + "_" + artifact_id,
source=step_id,
target=artifact_id,
)
)
for artifact_name, artifact in step.inputs.items():
artifact_id = ARTIFACT_PREFIX + str(artifact.id)
self.edges.append(
Edge(
id=step_id + "_" + artifact_id,
source=artifact_id,
target=step_id,
)
)
node
special
Initialization of lineage nodes.
artifact_node
Class for all lineage artifact nodes.
ArtifactNode (BaseNode)
pydantic-model
A class that represents an artifact node in a lineage graph.
Source code in zenml/post_execution/lineage/node/artifact_node.py
class ArtifactNode(BaseNode):
"""A class that represents an artifact node in a lineage graph."""
type: str = "artifact"
data: ArtifactNodeDetails
ArtifactNodeDetails (BaseNodeDetails)
pydantic-model
Captures all artifact details for the node.
Source code in zenml/post_execution/lineage/node/artifact_node.py
class ArtifactNodeDetails(BaseNodeDetails):
"""Captures all artifact details for the node."""
is_cached: bool
artifact_type: str
artifact_data_type: str
parent_step_id: str
producer_step_id: Optional[str]
uri: str
metadata: List[Tuple[str, str, str]] # (key, value, type)
base_node
Base class for all lineage nodes.
BaseNode (BaseModel)
pydantic-model
A class that represents a node in a lineage graph.
Source code in zenml/post_execution/lineage/node/base_node.py
class BaseNode(BaseModel):
"""A class that represents a node in a lineage graph."""
id: str
type: str
data: BaseNodeDetails
BaseNodeDetails (BaseModel)
pydantic-model
Captures all details for the node.
Source code in zenml/post_execution/lineage/node/base_node.py
class BaseNodeDetails(BaseModel):
"""Captures all details for the node."""
execution_id: str
name: str
status: ExecutionStatus
step_node
Class for all lineage step nodes.
StepNode (BaseNode)
pydantic-model
A class that represents a step node in a lineage graph.
Source code in zenml/post_execution/lineage/node/step_node.py
class StepNode(BaseNode):
"""A class that represents a step node in a lineage graph."""
type: str = "step"
data: StepNodeDetails
StepNodeDetails (BaseNodeDetails)
pydantic-model
Captures all artifact details for the node.
Source code in zenml/post_execution/lineage/node/step_node.py
class StepNodeDetails(BaseNodeDetails):
"""Captures all artifact details for the node."""
entrypoint_name: str
parameters: Dict[str, Any]
configuration: Dict[str, Any]
inputs: Dict[str, Any]
outputs: Dict[str, Any]
metadata: List[Tuple[str, str, str]] # (key, value, type)
pipeline
Implementation of the post-execution pipeline.
PipelineView (BaseView)
Post-execution pipeline class.
Source code in zenml/post_execution/pipeline.py
class PipelineView(BaseView):
"""Post-execution pipeline class."""
MODEL_CLASS = PipelineResponseModel
REPR_KEYS = ["id", "name"]
@property
def model(self) -> PipelineResponseModel:
"""Returns the underlying `PipelineResponseModel`.
Returns:
The underlying `PipelineResponseModel`.
"""
return cast(PipelineResponseModel, self._model)
@property
def num_runs(self) -> int:
"""Returns the number of runs of this pipeline.
Returns:
The number of runs of this pipeline.
"""
active_workspace_id = Client().active_workspace.id
return (
Client()
.zen_store.list_runs(
PipelineRunFilterModel(
workspace_id=active_workspace_id,
pipeline_id=self._model.id,
)
)
.total
)
@property
def runs(self) -> List["PipelineRunView"]:
"""Returns the last 50 stored runs of this pipeline.
The runs are returned in chronological order, so the latest
run will be the last element in this list.
Returns:
A list of all stored runs of this pipeline.
"""
# Do not cache runs as new runs might appear during this objects
# lifecycle
active_workspace_id = Client().active_workspace.id
runs = Client().list_runs(
workspace_id=active_workspace_id,
pipeline_id=self.model.id,
size=50,
)
return [PipelineRunView(run) for run in runs.items]
def get_run_for_completed_step(self, step_name: str) -> "PipelineRunView":
"""Ascertains which pipeline run produced the cached artifact of a given step.
Args:
step_name: Name of step at hand
Returns:
None if no run is found that completed the given step,
else the original pipeline_run.
Raises:
LookupError: If no run is found that completed the given step
"""
orig_pipeline_run = None
for run in reversed(self.runs):
try:
step = run.get_step(step_name)
if step.is_completed:
orig_pipeline_run = run
break
except KeyError:
pass
if not orig_pipeline_run:
raise LookupError(
"No Pipeline Run could be found, that has"
f" completed the provided step: [{step_name}]"
)
return orig_pipeline_run
model: PipelineResponseModel
property
readonly
Returns the underlying PipelineResponseModel
.
Returns:
Type | Description |
---|---|
PipelineResponseModel |
The underlying |
num_runs: int
property
readonly
Returns the number of runs of this pipeline.
Returns:
Type | Description |
---|---|
int |
The number of runs of this pipeline. |
runs: List[PipelineRunView]
property
readonly
Returns the last 50 stored runs of this pipeline.
The runs are returned in chronological order, so the latest run will be the last element in this list.
Returns:
Type | Description |
---|---|
List[PipelineRunView] |
A list of all stored runs of this pipeline. |
MODEL_CLASS (PipelineBaseModel, WorkspaceScopedResponseModel)
pydantic-model
Pipeline response model user, workspace, runs, and status hydrated.
Source code in zenml/post_execution/pipeline.py
class PipelineResponseModel(PipelineBaseModel, WorkspaceScopedResponseModel):
"""Pipeline response model user, workspace, runs, and status hydrated."""
runs: Optional[List["PipelineRunResponseModel"]] = Field(
title="A list of the last x Pipeline Runs."
)
status: Optional[List[ExecutionStatus]] = Field(
title="The status of the last x Pipeline Runs."
)
get_run_for_completed_step(self, step_name)
Ascertains which pipeline run produced the cached artifact of a given step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_name |
str |
Name of step at hand |
required |
Returns:
Type | Description |
---|---|
PipelineRunView |
None if no run is found that completed the given step, else the original pipeline_run. |
Exceptions:
Type | Description |
---|---|
LookupError |
If no run is found that completed the given step |
Source code in zenml/post_execution/pipeline.py
def get_run_for_completed_step(self, step_name: str) -> "PipelineRunView":
"""Ascertains which pipeline run produced the cached artifact of a given step.
Args:
step_name: Name of step at hand
Returns:
None if no run is found that completed the given step,
else the original pipeline_run.
Raises:
LookupError: If no run is found that completed the given step
"""
orig_pipeline_run = None
for run in reversed(self.runs):
try:
step = run.get_step(step_name)
if step.is_completed:
orig_pipeline_run = run
break
except KeyError:
pass
if not orig_pipeline_run:
raise LookupError(
"No Pipeline Run could be found, that has"
f" completed the provided step: [{step_name}]"
)
return orig_pipeline_run
pipeline_run
Implementation of the post-execution pipeline run class.
PipelineRunView (BaseView)
Post-execution pipeline run class.
This can be used to query steps and artifact information associated with a pipeline execution.
Source code in zenml/post_execution/pipeline_run.py
class PipelineRunView(BaseView):
"""Post-execution pipeline run class.
This can be used to query steps and artifact information associated with a
pipeline execution.
"""
MODEL_CLASS = PipelineRunResponseModel
REPR_KEYS = ["id", "name"]
def __init__(self, model: PipelineRunResponseModel):
"""Initializes a post-execution pipeline run object.
In most cases `PipelineRunView` objects should not be created manually
but retrieved from a `PipelineView` object instead.
Args:
model: The model to initialize this object from.
"""
super().__init__(model)
self._steps: Dict[str, StepView] = OrderedDict()
@property
def model(self) -> PipelineRunResponseModel:
"""Returns the underlying `PipelineRunResponseModel`.
Returns:
The underlying `PipelineRunResponseModel`.
"""
return cast(PipelineRunResponseModel, self._model)
@property
def settings(self) -> Dict[str, Any]:
"""Returns the pipeline settings.
These are runtime settings passed down to stack components, which
can be set at pipeline level.
Returns:
The pipeline settings.
"""
settings = self.model.pipeline_configuration["settings"]
return cast(Dict[str, Any], settings)
@property
def extra(self) -> Dict[str, Any]:
"""Returns the pipeline extras.
This dict is meant to be used to pass any configuration down to the
pipeline or stack components that the user has use of.
Returns:
The pipeline extras.
"""
extra = self.model.pipeline_configuration["extra"]
return cast(Dict[str, Any], extra)
@property
def enable_cache(self) -> Optional[bool]:
"""Returns whether caching is enabled for this pipeline run.
Returns:
True if caching is enabled for this pipeline run.
"""
from zenml.pipelines.base_pipeline import PARAM_ENABLE_CACHE
return self.model.pipeline_configuration.get(PARAM_ENABLE_CACHE)
@property
def enable_artifact_metadata(self) -> Optional[bool]:
"""Returns whether artifact metadata is enabled for this pipeline run.
Returns:
True if artifact metadata is enabled for this pipeline run.
"""
from zenml.pipelines.base_pipeline import (
PARAM_ENABLE_ARTIFACT_METADATA,
)
return self.model.pipeline_configuration.get(
PARAM_ENABLE_ARTIFACT_METADATA
)
@property
def status(self) -> ExecutionStatus:
"""Returns the current status of the pipeline run.
Returns:
The current status of the pipeline run.
"""
# Query the run again since the status might have changed since this
# object was created.
return Client().get_pipeline_run(self.model.id).status
@property
def steps(self) -> List[StepView]:
"""Returns all steps that were executed as part of this pipeline run.
Returns:
A list of all steps that were executed as part of this pipeline run.
"""
self._ensure_steps_fetched()
return list(self._steps.values())
def get_step_names(self) -> List[str]:
"""Returns a list of all step names.
Returns:
A list of all step names.
"""
self._ensure_steps_fetched()
return list(self._steps.keys())
def get_step(
self,
step: Optional[str] = None,
**kwargs: Any,
) -> StepView:
"""Returns a step for the given name.
The name refers to the name of the step in the pipeline definition, not
the class name of the step-class.
Use it like this:
```python
# Get the step by name
pipeline_run_view.get_step("first_step")
```
Args:
step: Class or class instance of the step
**kwargs: The deprecated `name` is caught as a kwarg to
specify the step instead of using the `step` argument.
Returns:
A step for the given name.
Raises:
KeyError: If there is no step with the given name.
RuntimeError: If no step has been specified at all.
"""
self._ensure_steps_fetched()
api_doc_link = get_apidocs_link(
"core-post_execution",
"zenml.post_execution.pipeline_run.PipelineRunView" ".get_step",
)
step_name = kwargs.get("name", None)
# Raise an error if neither `step` nor `name` args were provided.
if not step and not isinstance(step_name, str):
raise RuntimeError(
"No step specified. Please specify a step using "
"pipeline_run_view.get_step(step=`step_name`). "
f"Please refer to the API docs to learn more: "
f"{api_doc_link}"
)
# If `name` was provided but not `step`, print a depreciation warning.
if not step:
logger.warning(
"Using 'name' to get a step from "
"'PipelineRunView.get_step()' is deprecated and "
"will be removed in the future. Instead please "
"use 'step' to access a step from your past "
"pipeline runs. Learn more in our API docs: %s",
api_doc_link,
)
step = step_name
# Raise an error if there is no such step in the given pipeline run.
if step not in self._steps:
raise KeyError(
f"No step found for name `{step}`. This pipeline "
f"run only has steps with the following "
f"names: `{self.get_step_names()}`"
)
return self._steps[step]
def _ensure_steps_fetched(self) -> None:
"""Fetches all steps for this pipeline run from the metadata store."""
if self._steps:
# we already fetched the steps, no need to do anything
return
client = Client()
steps = client.depaginate(
partial(client.list_run_steps, pipeline_run_id=self.model.id)
)
self._steps = {step.name: StepView(step) for step in steps}
enable_artifact_metadata: Optional[bool]
property
readonly
Returns whether artifact metadata is enabled for this pipeline run.
Returns:
Type | Description |
---|---|
Optional[bool] |
True if artifact metadata is enabled for this pipeline run. |
enable_cache: Optional[bool]
property
readonly
Returns whether caching is enabled for this pipeline run.
Returns:
Type | Description |
---|---|
Optional[bool] |
True if caching is enabled for this pipeline run. |
extra: Dict[str, Any]
property
readonly
Returns the pipeline extras.
This dict is meant to be used to pass any configuration down to the pipeline or stack components that the user has use of.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The pipeline extras. |
model: PipelineRunResponseModel
property
readonly
Returns the underlying PipelineRunResponseModel
.
Returns:
Type | Description |
---|---|
PipelineRunResponseModel |
The underlying |
settings: Dict[str, Any]
property
readonly
Returns the pipeline settings.
These are runtime settings passed down to stack components, which can be set at pipeline level.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The pipeline settings. |
status: ExecutionStatus
property
readonly
Returns the current status of the pipeline run.
Returns:
Type | Description |
---|---|
ExecutionStatus |
The current status of the pipeline run. |
steps: List[zenml.post_execution.step.StepView]
property
readonly
Returns all steps that were executed as part of this pipeline run.
Returns:
Type | Description |
---|---|
List[zenml.post_execution.step.StepView] |
A list of all steps that were executed as part of this pipeline run. |
MODEL_CLASS (PipelineRunBaseModel, WorkspaceScopedResponseModel)
pydantic-model
Pipeline run model with user, workspace, pipeline, and stack hydrated.
Source code in zenml/post_execution/pipeline_run.py
class PipelineRunResponseModel(
PipelineRunBaseModel, WorkspaceScopedResponseModel
):
"""Pipeline run model with user, workspace, pipeline, and stack hydrated."""
pipeline: Optional["PipelineResponseModel"] = Field(
title="The pipeline this run belongs to."
)
stack: Optional["StackResponseModel"] = Field(
title="The stack that was used for this run."
)
metadata: Dict[str, "RunMetadataResponseModel"] = Field(
default={},
title="Metadata associated with this pipeline run.",
)
__init__(self, model)
special
Initializes a post-execution pipeline run object.
In most cases PipelineRunView
objects should not be created manually
but retrieved from a PipelineView
object instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
PipelineRunResponseModel |
The model to initialize this object from. |
required |
Source code in zenml/post_execution/pipeline_run.py
def __init__(self, model: PipelineRunResponseModel):
"""Initializes a post-execution pipeline run object.
In most cases `PipelineRunView` objects should not be created manually
but retrieved from a `PipelineView` object instead.
Args:
model: The model to initialize this object from.
"""
super().__init__(model)
self._steps: Dict[str, StepView] = OrderedDict()
get_step(self, step=None, **kwargs)
Returns a step for the given name.
The name refers to the name of the step in the pipeline definition, not the class name of the step-class.
Use it like this:
# Get the step by name
pipeline_run_view.get_step("first_step")
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step |
Optional[str] |
Class or class instance of the step |
None |
**kwargs |
Any |
The deprecated |
{} |
Returns:
Type | Description |
---|---|
StepView |
A step for the given name. |
Exceptions:
Type | Description |
---|---|
KeyError |
If there is no step with the given name. |
RuntimeError |
If no step has been specified at all. |
Source code in zenml/post_execution/pipeline_run.py
def get_step(
self,
step: Optional[str] = None,
**kwargs: Any,
) -> StepView:
"""Returns a step for the given name.
The name refers to the name of the step in the pipeline definition, not
the class name of the step-class.
Use it like this:
```python
# Get the step by name
pipeline_run_view.get_step("first_step")
```
Args:
step: Class or class instance of the step
**kwargs: The deprecated `name` is caught as a kwarg to
specify the step instead of using the `step` argument.
Returns:
A step for the given name.
Raises:
KeyError: If there is no step with the given name.
RuntimeError: If no step has been specified at all.
"""
self._ensure_steps_fetched()
api_doc_link = get_apidocs_link(
"core-post_execution",
"zenml.post_execution.pipeline_run.PipelineRunView" ".get_step",
)
step_name = kwargs.get("name", None)
# Raise an error if neither `step` nor `name` args were provided.
if not step and not isinstance(step_name, str):
raise RuntimeError(
"No step specified. Please specify a step using "
"pipeline_run_view.get_step(step=`step_name`). "
f"Please refer to the API docs to learn more: "
f"{api_doc_link}"
)
# If `name` was provided but not `step`, print a depreciation warning.
if not step:
logger.warning(
"Using 'name' to get a step from "
"'PipelineRunView.get_step()' is deprecated and "
"will be removed in the future. Instead please "
"use 'step' to access a step from your past "
"pipeline runs. Learn more in our API docs: %s",
api_doc_link,
)
step = step_name
# Raise an error if there is no such step in the given pipeline run.
if step not in self._steps:
raise KeyError(
f"No step found for name `{step}`. This pipeline "
f"run only has steps with the following "
f"names: `{self.get_step_names()}`"
)
return self._steps[step]
get_step_names(self)
Returns a list of all step names.
Returns:
Type | Description |
---|---|
List[str] |
A list of all step names. |
Source code in zenml/post_execution/pipeline_run.py
def get_step_names(self) -> List[str]:
"""Returns a list of all step names.
Returns:
A list of all step names.
"""
self._ensure_steps_fetched()
return list(self._steps.keys())
get_run(name)
Fetches the post-execution view of a run with the given name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the run to fetch. |
required |
Returns:
Type | Description |
---|---|
PipelineRunView |
The post-execution view of the run with the given name. |
Exceptions:
Type | Description |
---|---|
KeyError |
If no run with the given name exists. |
RuntimeError |
If multiple runs with the given name exist. |
Source code in zenml/post_execution/pipeline_run.py
def get_run(name: str) -> "PipelineRunView":
"""Fetches the post-execution view of a run with the given name.
Args:
name: The name of the run to fetch.
Returns:
The post-execution view of the run with the given name.
Raises:
KeyError: If no run with the given name exists.
RuntimeError: If multiple runs with the given name exist.
"""
client = Client()
active_workspace_id = client.active_workspace.id
runs = client.list_runs(
name=name,
workspace_id=active_workspace_id,
)
# TODO: [server] this error handling could be improved
if not runs:
raise KeyError(f"No run with name '{name}' exists.")
elif runs.total > 1:
raise RuntimeError(
f"Multiple runs have been found for name '{name}'.", runs
)
return PipelineRunView(runs.items[0])
get_unlisted_runs()
Fetches the post-execution views of the 50 most recent unlisted runs.
Unlisted runs are runs that are not associated with any pipeline.
Returns:
Type | Description |
---|---|
List[PipelineRunView] |
A list of post-execution run views. |
Source code in zenml/post_execution/pipeline_run.py
def get_unlisted_runs() -> List["PipelineRunView"]:
"""Fetches the post-execution views of the 50 most recent unlisted runs.
Unlisted runs are runs that are not associated with any pipeline.
Returns:
A list of post-execution run views.
"""
client = Client()
runs = client.list_runs(
workspace_id=client.active_workspace.id, unlisted=True, size=50
)
return [PipelineRunView(model) for model in runs.items]
step
Implementation of a post-execution step class.
StepView (BaseView)
Post-execution step class.
This can be used to query artifact information associated with a pipeline step.
Source code in zenml/post_execution/step.py
class StepView(BaseView):
"""Post-execution step class.
This can be used to query artifact information associated with a pipeline step.
"""
MODEL_CLASS = StepRunResponseModel
REPR_KEYS = ["id", "name", "entrypoint_name"]
def __init__(self, model: StepRunResponseModel):
"""Initializes a post-execution step object.
In most cases `StepView` objects should not be created manually
but retrieved from a `PipelineRunView` object instead.
Args:
model: The model to initialize this object from.
"""
super().__init__(model)
self._inputs: Dict[str, ArtifactView] = {}
self._outputs: Dict[str, ArtifactView] = {}
@property
def model(self) -> StepRunResponseModel:
"""Returns the underlying `StepRunResponseModel`.
Returns:
The underlying `StepRunResponseModel`.
"""
return cast(StepRunResponseModel, self._model)
@property
def step_configuration(self) -> "StepConfiguration":
"""Returns the step configuration.
Returns:
The step configuration.
"""
return self.model.step.config
@property
def entrypoint_name(self) -> str:
"""Returns the step entrypoint_name.
This name is equal to the name argument passed to the @step decorator
or the actual function name if no explicit name was given.
Examples:
# the step entrypoint_name will be "my_step"
@step(step="my_step")
def my_step_function(...)
# the step entrypoint_name will be "my_step_function"
@step
def my_step_function(...)
Returns:
The step entrypoint_name.
"""
return self.step_configuration.name
@property
def parameters(self) -> Dict[str, str]:
"""The parameters used to run this step.
Returns:
The parameters used to run this step.
"""
return self.step_configuration.parameters
@property
def settings(self) -> Dict[str, "BaseSettings"]:
"""Returns the step settings.
These are runtime settings passed down to stack components, which
can be set at step level.
Returns:
The step settings.
"""
return self.step_configuration.settings
@property
def extra(self) -> Dict[str, Any]:
"""Returns the extra dictionary.
This dict is meant to be used to pass any configuration down to the
step that the user has use of.
Returns:
The extra dictionary.
"""
return self.step_configuration.extra
@property
def enable_cache(self) -> Optional[bool]:
"""Returns whether caching is enabled for this step.
Returns:
Whether caching is enabled for this step.
"""
return self.step_configuration.enable_cache
@property
def enable_artifact_metadata(self) -> Optional[bool]:
"""Returns whether artifact metadata is enabled for this step.
Returns:
Whether artifact metadata is enabled for this step.
"""
return self.step_configuration.enable_artifact_metadata
@property
def step_operator(self) -> Optional[str]:
"""Returns the name of the step operator of the step.
Returns:
The name of the step operator of the step.
"""
return self.step_configuration.step_operator
@property
def experiment_tracker(self) -> Optional[str]:
"""Returns the name of the experiment tracker of the step.
Returns:
The name of the experiment tracker of the step.
"""
return self.step_configuration.experiment_tracker
@property
def spec(self) -> "StepSpec":
"""Returns the step spec.
The step spec defines the source path and upstream steps of a step and
is used primarily to compare whether two steps are the same.
Returns:
The step spec.
"""
return self.model.step.spec
@property
def status(self) -> ExecutionStatus:
"""Returns the current status of the step.
Returns:
The current status of the step.
"""
# Query the step again since the status might have changed since this
# object was created.
return Client().zen_store.get_run_step(self.model.id).status
@property
def is_cached(self) -> bool:
"""Returns whether the step is cached or not.
Returns:
True if the step is cached, False otherwise.
"""
return self.status == ExecutionStatus.CACHED
@property
def is_completed(self) -> bool:
"""Returns whether the step is cached or not.
Returns:
True if the step is completed, False otherwise.
"""
return self.status == ExecutionStatus.COMPLETED
@property
def inputs(self) -> Dict[str, ArtifactView]:
"""Returns all input artifacts that were used to run this step.
Returns:
A dictionary of artifact names to artifact views.
"""
self._ensure_inputs_fetched()
return self._inputs
@property
def input(self) -> ArtifactView:
"""Returns the input artifact that was used to run this step.
Returns:
The input artifact.
Raises:
ValueError: If there were zero or multiple inputs to this step.
"""
if len(self.inputs) != 1:
raise ValueError(
"Can't use the `StepView.input` property for steps with zero "
"or multiple inputs, use `StepView.inputs` instead."
)
return next(iter(self.inputs.values()))
@property
def outputs(self) -> Dict[str, ArtifactView]:
"""Returns all output artifacts that were written by this step.
Returns:
A dictionary of artifact names to artifact views.
"""
self._ensure_outputs_fetched()
return self._outputs
@property
def output(self) -> ArtifactView:
"""Returns the output artifact that was written by this step.
Returns:
The output artifact.
Raises:
ValueError: If there were zero or multiple step outputs.
"""
if len(self.outputs) != 1:
raise ValueError(
"Can't use the `StepView.output` property for steps with zero "
"or multiple outputs, use `StepView.outputs` instead."
)
return next(iter(self.outputs.values()))
def _ensure_inputs_fetched(self) -> None:
"""Fetches all step inputs from the ZenStore."""
if self._inputs:
# we already fetched inputs, no need to do anything
return
self._inputs = {
name: ArtifactView(artifact_model)
for name, artifact_model in self.model.input_artifacts.items()
}
def _ensure_outputs_fetched(self) -> None:
"""Fetches all step outputs from the ZenStore."""
if self._outputs:
# we already fetched outputs, no need to do anything
return
self._outputs = {
name: ArtifactView(artifact_model)
for name, artifact_model in self.model.output_artifacts.items()
}
enable_artifact_metadata: Optional[bool]
property
readonly
Returns whether artifact metadata is enabled for this step.
Returns:
Type | Description |
---|---|
Optional[bool] |
Whether artifact metadata is enabled for this step. |
enable_cache: Optional[bool]
property
readonly
Returns whether caching is enabled for this step.
Returns:
Type | Description |
---|---|
Optional[bool] |
Whether caching is enabled for this step. |
entrypoint_name: str
property
readonly
Returns the step entrypoint_name.
This name is equal to the name argument passed to the @step decorator or the actual function name if no explicit name was given.
Examples:
the step entrypoint_name will be "my_step"
@step(step="my_step") def my_step_function(...)
the step entrypoint_name will be "my_step_function"
@step def my_step_function(...)
Returns:
Type | Description |
---|---|
str |
The step entrypoint_name. |
experiment_tracker: Optional[str]
property
readonly
Returns the name of the experiment tracker of the step.
Returns:
Type | Description |
---|---|
Optional[str] |
The name of the experiment tracker of the step. |
extra: Dict[str, Any]
property
readonly
Returns the extra dictionary.
This dict is meant to be used to pass any configuration down to the step that the user has use of.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The extra dictionary. |
input: ArtifactView
property
readonly
Returns the input artifact that was used to run this step.
Returns:
Type | Description |
---|---|
ArtifactView |
The input artifact. |
Exceptions:
Type | Description |
---|---|
ValueError |
If there were zero or multiple inputs to this step. |
inputs: Dict[str, zenml.post_execution.artifact.ArtifactView]
property
readonly
Returns all input artifacts that were used to run this step.
Returns:
Type | Description |
---|---|
Dict[str, zenml.post_execution.artifact.ArtifactView] |
A dictionary of artifact names to artifact views. |
is_cached: bool
property
readonly
Returns whether the step is cached or not.
Returns:
Type | Description |
---|---|
bool |
True if the step is cached, False otherwise. |
is_completed: bool
property
readonly
Returns whether the step is cached or not.
Returns:
Type | Description |
---|---|
bool |
True if the step is completed, False otherwise. |
model: StepRunResponseModel
property
readonly
Returns the underlying StepRunResponseModel
.
Returns:
Type | Description |
---|---|
StepRunResponseModel |
The underlying |
output: ArtifactView
property
readonly
Returns the output artifact that was written by this step.
Returns:
Type | Description |
---|---|
ArtifactView |
The output artifact. |
Exceptions:
Type | Description |
---|---|
ValueError |
If there were zero or multiple step outputs. |
outputs: Dict[str, zenml.post_execution.artifact.ArtifactView]
property
readonly
Returns all output artifacts that were written by this step.
Returns:
Type | Description |
---|---|
Dict[str, zenml.post_execution.artifact.ArtifactView] |
A dictionary of artifact names to artifact views. |
parameters: Dict[str, str]
property
readonly
The parameters used to run this step.
Returns:
Type | Description |
---|---|
Dict[str, str] |
The parameters used to run this step. |
settings: Dict[str, BaseSettings]
property
readonly
Returns the step settings.
These are runtime settings passed down to stack components, which can be set at step level.
Returns:
Type | Description |
---|---|
Dict[str, BaseSettings] |
The step settings. |
spec: StepSpec
property
readonly
Returns the step spec.
The step spec defines the source path and upstream steps of a step and is used primarily to compare whether two steps are the same.
Returns:
Type | Description |
---|---|
StepSpec |
The step spec. |
status: ExecutionStatus
property
readonly
Returns the current status of the step.
Returns:
Type | Description |
---|---|
ExecutionStatus |
The current status of the step. |
step_configuration: StepConfiguration
property
readonly
Returns the step configuration.
Returns:
Type | Description |
---|---|
StepConfiguration |
The step configuration. |
step_operator: Optional[str]
property
readonly
Returns the name of the step operator of the step.
Returns:
Type | Description |
---|---|
Optional[str] |
The name of the step operator of the step. |
MODEL_CLASS (StepRunBaseModel, WorkspaceScopedResponseModel)
pydantic-model
Response model for step runs.
Source code in zenml/post_execution/step.py
class StepRunResponseModel(StepRunBaseModel, WorkspaceScopedResponseModel):
"""Response model for step runs."""
input_artifacts: Dict[str, "ArtifactResponseModel"] = {}
output_artifacts: Dict[str, "ArtifactResponseModel"] = {}
metadata: Dict[str, "RunMetadataResponseModel"] = Field(
default={},
title="Metadata associated with this step run.",
)
__init__(self, model)
special
Initializes a post-execution step object.
In most cases StepView
objects should not be created manually
but retrieved from a PipelineRunView
object instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
StepRunResponseModel |
The model to initialize this object from. |
required |
Source code in zenml/post_execution/step.py
def __init__(self, model: StepRunResponseModel):
"""Initializes a post-execution step object.
In most cases `StepView` objects should not be created manually
but retrieved from a `PipelineRunView` object instead.
Args:
model: The model to initialize this object from.
"""
super().__init__(model)
self._inputs: Dict[str, ArtifactView] = {}
self._outputs: Dict[str, ArtifactView] = {}