Post Execution
zenml.post_execution
special
Initialization for the post-execution module.
After executing a pipeline, the user needs to be able to fetch it from history and perform certain tasks. The post_execution submodule provides a set of interfaces with which the user can interact with artifacts, the pipeline, steps, and the post-run pipeline object.
artifact
Initialization for the post-execution artifact class.
ArtifactView
Post-execution artifact class.
This can be used to read artifact data that was created during a pipeline execution.
Source code in zenml/post_execution/artifact.py
class ArtifactView:
"""Post-execution artifact class.
This can be used to read artifact data that was created during a pipeline
execution.
"""
def __init__(self, model: "ArtifactResponseModel"):
"""Initializes a post-execution artifact object.
In most cases `ArtifactView` objects should not be created manually but
retrieved from a `StepView` via the `inputs` or `outputs` properties.
Args:
model: The model to initialize this object from.
"""
self._model = model
@property
def id(self) -> UUID:
"""Returns the artifact id.
Returns:
The artifact id.
"""
return self._model.id
@property
def name(self) -> str:
"""Returns the name of the artifact (output name in the parent step).
Returns:
The name of the artifact.
"""
return self._model.name
@property
def type(self) -> str:
"""Returns the artifact type.
Returns:
The artifact type.
"""
return self._model.type
@property
def data_type(self) -> str:
"""Returns the data type of the artifact.
Returns:
The data type of the artifact.
"""
return self._model.data_type
@property
def uri(self) -> str:
"""Returns the URI where the artifact data is stored.
Returns:
The URI where the artifact data is stored.
"""
return self._model.uri
@property
def materializer(self) -> str:
"""Returns the materializer that was used to write this artifact.
Returns:
The materializer that was used to write this artifact.
"""
return self._model.materializer
@property
def producer_step_id(self) -> Optional[UUID]:
"""Returns the ID of the original step that produced the artifact.
Returns:
The ID of the original step that produced the artifact.
"""
return self._model.producer_step_run_id
def read(
self,
output_data_type: Optional[Type[Any]] = None,
materializer_class: Optional[Type["BaseMaterializer"]] = None,
) -> Any:
"""Materializes the data stored in this artifact.
Args:
output_data_type: Deprecated; will be ignored.
materializer_class: Deprecated; will be ignored.
Returns:
The materialized data.
"""
if output_data_type is not None:
logger.warning(
"The `output_data_type` argument is deprecated and will be "
"removed in a future release."
)
if materializer_class is not None:
logger.warning(
"The `materializer_class` argument is deprecated and will be "
"removed in a future release."
)
from zenml.utils.materializer_utils import load_artifact
return load_artifact(self._model)
def __repr__(self) -> str:
"""Returns a string representation of this artifact.
Returns:
A string representation of this artifact.
"""
return (
f"{self.__class__.__qualname__}(id={self.id}, "
f"type='{self.type}', uri='{self.uri}', "
f"materializer='{self.materializer}')"
)
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same artifact.
Args:
other: The other object to compare to.
Returns:
True if the other object is referring to the same artifact, else
False.
"""
if isinstance(other, ArtifactView):
return self.id == other.id and self.uri == other.uri
return NotImplemented
data_type: str
property
readonly
Returns the data type of the artifact.
Returns:
Type | Description |
---|---|
str |
The data type of the artifact. |
id: UUID
property
readonly
Returns the artifact id.
Returns:
Type | Description |
---|---|
UUID |
The artifact id. |
materializer: str
property
readonly
Returns the materializer that was used to write this artifact.
Returns:
Type | Description |
---|---|
str |
The materializer that was used to write this artifact. |
name: str
property
readonly
Returns the name of the artifact (output name in the parent step).
Returns:
Type | Description |
---|---|
str |
The name of the artifact. |
producer_step_id: Optional[uuid.UUID]
property
readonly
Returns the ID of the original step that produced the artifact.
Returns:
Type | Description |
---|---|
Optional[uuid.UUID] |
The ID of the original step that produced the artifact. |
type: str
property
readonly
Returns the artifact type.
Returns:
Type | Description |
---|---|
str |
The artifact type. |
uri: str
property
readonly
Returns the URI where the artifact data is stored.
Returns:
Type | Description |
---|---|
str |
The URI where the artifact data is stored. |
__eq__(self, other)
special
Returns whether the other object is referring to the same artifact.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
Any |
The other object to compare to. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the other object is referring to the same artifact, else False. |
Source code in zenml/post_execution/artifact.py
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same artifact.
Args:
other: The other object to compare to.
Returns:
True if the other object is referring to the same artifact, else
False.
"""
if isinstance(other, ArtifactView):
return self.id == other.id and self.uri == other.uri
return NotImplemented
__init__(self, model)
special
Initializes a post-execution artifact object.
In most cases ArtifactView
objects should not be created manually but
retrieved from a StepView
via the inputs
or outputs
properties.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
ArtifactResponseModel |
The model to initialize this object from. |
required |
Source code in zenml/post_execution/artifact.py
def __init__(self, model: "ArtifactResponseModel"):
"""Initializes a post-execution artifact object.
In most cases `ArtifactView` objects should not be created manually but
retrieved from a `StepView` via the `inputs` or `outputs` properties.
Args:
model: The model to initialize this object from.
"""
self._model = model
__repr__(self)
special
Returns a string representation of this artifact.
Returns:
Type | Description |
---|---|
str |
A string representation of this artifact. |
Source code in zenml/post_execution/artifact.py
def __repr__(self) -> str:
"""Returns a string representation of this artifact.
Returns:
A string representation of this artifact.
"""
return (
f"{self.__class__.__qualname__}(id={self.id}, "
f"type='{self.type}', uri='{self.uri}', "
f"materializer='{self.materializer}')"
)
read(self, output_data_type=None, materializer_class=None)
Materializes the data stored in this artifact.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_data_type |
Optional[Type[Any]] |
Deprecated; will be ignored. |
None |
materializer_class |
Optional[Type[BaseMaterializer]] |
Deprecated; will be ignored. |
None |
Returns:
Type | Description |
---|---|
Any |
The materialized data. |
Source code in zenml/post_execution/artifact.py
def read(
self,
output_data_type: Optional[Type[Any]] = None,
materializer_class: Optional[Type["BaseMaterializer"]] = None,
) -> Any:
"""Materializes the data stored in this artifact.
Args:
output_data_type: Deprecated; will be ignored.
materializer_class: Deprecated; will be ignored.
Returns:
The materialized data.
"""
if output_data_type is not None:
logger.warning(
"The `output_data_type` argument is deprecated and will be "
"removed in a future release."
)
if materializer_class is not None:
logger.warning(
"The `materializer_class` argument is deprecated and will be "
"removed in a future release."
)
from zenml.utils.materializer_utils import load_artifact
return load_artifact(self._model)
lineage
special
Initialization of lineage generation module.
edge
Class for Edges in a lineage graph.
Edge (BaseModel)
pydantic-model
A class that represents an edge in a lineage graph.
Source code in zenml/post_execution/lineage/edge.py
class Edge(BaseModel):
"""A class that represents an edge in a lineage graph."""
id: str
source: str
target: str
lineage_graph
Class for lineage graph generation.
LineageGraph (BaseModel)
pydantic-model
A lineage graph representation of a PipelineRunView.
Source code in zenml/post_execution/lineage/lineage_graph.py
class LineageGraph(BaseModel):
"""A lineage graph representation of a PipelineRunView."""
nodes: List[Union[StepNode, ArtifactNode]] = []
edges: List[Edge] = []
root_step_id: Optional[str]
def generate_step_nodes_and_edges(self, step: StepView) -> None:
"""Generates the step nodes and the edges between them.
Args:
step: The step to generate the nodes and edges for.
"""
step_id = STEP_PREFIX + str(step.id)
if self.root_step_id is None:
self.root_step_id = step_id
step_config = step.step_configuration.dict()
if step_config:
step_config = {
key: value
for key, value in step_config.items()
if key not in ["inputs", "outputs", "parameters"] and value
}
self.nodes.append(
StepNode(
id=step_id,
data=StepNodeDetails(
execution_id=str(step.id),
name=step.name, # redundant for consistency
status=step.status,
entrypoint_name=step.entrypoint_name, # redundant for consistency
parameters=step.parameters,
configuration=step_config,
inputs={k: v.uri for k, v in step.inputs.items()},
outputs={k: v.uri for k, v in step.outputs.items()},
),
)
)
for artifact_name, artifact in step.outputs.items():
artifact_id = ARTIFACT_PREFIX + str(artifact.id)
self.nodes.append(
ArtifactNode(
id=artifact_id,
data=ArtifactNodeDetails(
execution_id=str(artifact.id),
name=artifact_name,
status=step.status,
is_cached=step.status == ExecutionStatus.CACHED,
artifact_type=artifact.type,
artifact_data_type=artifact.data_type,
parent_step_id=str(step.id),
producer_step_id=str(artifact.producer_step_id),
uri=artifact.uri,
),
)
)
self.edges.append(
Edge(
id=step_id + "_" + artifact_id,
source=step_id,
target=artifact_id,
)
)
for artifact_name, artifact in step.inputs.items():
artifact_id = ARTIFACT_PREFIX + str(artifact.id)
self.edges.append(
Edge(
id=step_id + "_" + artifact_id,
source=artifact_id,
target=step_id,
)
)
def generate_run_nodes_and_edges(self, run: PipelineRunView) -> None:
"""Generates the run nodes and the edges between them.
Args:
run: The PipelineRunView to generate the lineage graph for.
"""
for step in run.steps:
self.generate_step_nodes_and_edges(step)
generate_run_nodes_and_edges(self, run)
Generates the run nodes and the edges between them.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run |
PipelineRunView |
The PipelineRunView to generate the lineage graph for. |
required |
Source code in zenml/post_execution/lineage/lineage_graph.py
def generate_run_nodes_and_edges(self, run: PipelineRunView) -> None:
"""Generates the run nodes and the edges between them.
Args:
run: The PipelineRunView to generate the lineage graph for.
"""
for step in run.steps:
self.generate_step_nodes_and_edges(step)
generate_step_nodes_and_edges(self, step)
Generates the step nodes and the edges between them.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step |
StepView |
The step to generate the nodes and edges for. |
required |
Source code in zenml/post_execution/lineage/lineage_graph.py
def generate_step_nodes_and_edges(self, step: StepView) -> None:
"""Generates the step nodes and the edges between them.
Args:
step: The step to generate the nodes and edges for.
"""
step_id = STEP_PREFIX + str(step.id)
if self.root_step_id is None:
self.root_step_id = step_id
step_config = step.step_configuration.dict()
if step_config:
step_config = {
key: value
for key, value in step_config.items()
if key not in ["inputs", "outputs", "parameters"] and value
}
self.nodes.append(
StepNode(
id=step_id,
data=StepNodeDetails(
execution_id=str(step.id),
name=step.name, # redundant for consistency
status=step.status,
entrypoint_name=step.entrypoint_name, # redundant for consistency
parameters=step.parameters,
configuration=step_config,
inputs={k: v.uri for k, v in step.inputs.items()},
outputs={k: v.uri for k, v in step.outputs.items()},
),
)
)
for artifact_name, artifact in step.outputs.items():
artifact_id = ARTIFACT_PREFIX + str(artifact.id)
self.nodes.append(
ArtifactNode(
id=artifact_id,
data=ArtifactNodeDetails(
execution_id=str(artifact.id),
name=artifact_name,
status=step.status,
is_cached=step.status == ExecutionStatus.CACHED,
artifact_type=artifact.type,
artifact_data_type=artifact.data_type,
parent_step_id=str(step.id),
producer_step_id=str(artifact.producer_step_id),
uri=artifact.uri,
),
)
)
self.edges.append(
Edge(
id=step_id + "_" + artifact_id,
source=step_id,
target=artifact_id,
)
)
for artifact_name, artifact in step.inputs.items():
artifact_id = ARTIFACT_PREFIX + str(artifact.id)
self.edges.append(
Edge(
id=step_id + "_" + artifact_id,
source=artifact_id,
target=step_id,
)
)
node
special
Initialization of lineage nodes.
artifact_node
Class for all lineage artifact nodes.
ArtifactNode (BaseNode)
pydantic-model
A class that represents an artifact node in a lineage graph.
Source code in zenml/post_execution/lineage/node/artifact_node.py
class ArtifactNode(BaseNode):
"""A class that represents an artifact node in a lineage graph."""
type: str = "artifact"
data: ArtifactNodeDetails
ArtifactNodeDetails (BaseNodeDetails)
pydantic-model
Captures all artifact details for the node.
Source code in zenml/post_execution/lineage/node/artifact_node.py
class ArtifactNodeDetails(BaseNodeDetails):
"""Captures all artifact details for the node."""
is_cached: bool
artifact_type: str
artifact_data_type: str
parent_step_id: str
producer_step_id: Optional[str]
uri: str
base_node
Base class for all lineage nodes.
BaseNode (BaseModel)
pydantic-model
A class that represents a node in a lineage graph.
Source code in zenml/post_execution/lineage/node/base_node.py
class BaseNode(BaseModel):
"""A class that represents a node in a lineage graph."""
id: str
type: str
data: BaseNodeDetails
BaseNodeDetails (BaseModel)
pydantic-model
Captures all details for the node.
Source code in zenml/post_execution/lineage/node/base_node.py
class BaseNodeDetails(BaseModel):
"""Captures all details for the node."""
execution_id: str
name: str
status: ExecutionStatus
step_node
Class for all lineage step nodes.
StepNode (BaseNode)
pydantic-model
A class that represents a step node in a lineage graph.
Source code in zenml/post_execution/lineage/node/step_node.py
class StepNode(BaseNode):
"""A class that represents a step node in a lineage graph."""
type: str = "step"
data: StepNodeDetails
StepNodeDetails (BaseNodeDetails)
pydantic-model
Captures all artifact details for the node.
Source code in zenml/post_execution/lineage/node/step_node.py
class StepNodeDetails(BaseNodeDetails):
"""Captures all artifact details for the node."""
entrypoint_name: str
parameters: Dict[str, Any]
configuration: Dict[str, Any]
inputs: Dict[str, Any]
outputs: Dict[str, Any]
pipeline
Implementation of the post-execution pipeline.
PipelineView
Post-execution pipeline class.
Source code in zenml/post_execution/pipeline.py
class PipelineView:
"""Post-execution pipeline class."""
def __init__(self, model: PipelineResponseModel):
"""Initializes a post-execution pipeline object.
In most cases `PipelineView` objects should not be created manually
but retrieved using the `get_pipelines()` utility from
`zenml.post_execution` instead.
Args:
model: The model to initialize this pipeline view from.
"""
self._model = model
@property
def id(self) -> UUID:
"""Returns the ID of this pipeline.
Returns:
The ID of this pipeline.
"""
assert self._model.id is not None
return self._model.id
@property
def name(self) -> str:
"""Returns the name of the pipeline.
Returns:
The name of the pipeline.
"""
return self._model.name
@property
def docstring(self) -> Optional[str]:
"""Returns the docstring of the pipeline.
Returns:
The docstring of the pipeline.
"""
return self._model.docstring
@property
def spec(self) -> "PipelineSpec":
"""Returns the spec of the pipeline.
The pipeline spec contains the source paths of all steps, as well as
each of their upstream step names. This is primarily used to compare
whether two pipelines are the same.
Returns:
The spec of the pipeline.
"""
return self._model.spec
@property
def runs(self) -> List["PipelineRunView"]:
"""Returns all stored runs of this pipeline.
The runs are returned in chronological order, so the latest
run will be the last element in this list.
Returns:
A list of all stored runs of this pipeline.
"""
# Do not cache runs as new runs might appear during this objects
# lifecycle
active_project_id = Client().active_project.id
runs = Client().zen_store.list_runs(
project_name_or_id=active_project_id,
pipeline_id=self._model.id,
)
return [PipelineRunView(run) for run in runs]
def get_run_for_completed_step(self, step_name: str) -> "PipelineRunView":
"""Ascertains which pipeline run produced the cached artifact of a given step.
Args:
step_name: Name of step at hand
Returns:
None if no run is found that completed the given step,
else the original pipeline_run.
Raises:
LookupError: If no run is found that completed the given step
"""
orig_pipeline_run = None
for run in reversed(self.runs):
try:
step = run.get_step(step_name)
if step.is_completed:
orig_pipeline_run = run
break
except KeyError:
pass
if not orig_pipeline_run:
raise LookupError(
"No Pipeline Run could be found, that has"
f" completed the provided step: [{step_name}]"
)
return orig_pipeline_run
def __repr__(self) -> str:
"""Returns a string representation of this pipeline.
Returns:
A string representation of this pipeline.
"""
return (
f"{self.__class__.__qualname__}(id={self.id}, "
f"name='{self.name}')"
)
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same pipeline.
Args:
other: The other object to compare to.
Returns:
True if the other object is referring to the same pipeline,
False otherwise.
"""
if isinstance(other, PipelineView):
return self.id == other.id
return NotImplemented
docstring: Optional[str]
property
readonly
Returns the docstring of the pipeline.
Returns:
Type | Description |
---|---|
Optional[str] |
The docstring of the pipeline. |
id: UUID
property
readonly
Returns the ID of this pipeline.
Returns:
Type | Description |
---|---|
UUID |
The ID of this pipeline. |
name: str
property
readonly
Returns the name of the pipeline.
Returns:
Type | Description |
---|---|
str |
The name of the pipeline. |
runs: List[PipelineRunView]
property
readonly
Returns all stored runs of this pipeline.
The runs are returned in chronological order, so the latest run will be the last element in this list.
Returns:
Type | Description |
---|---|
List[PipelineRunView] |
A list of all stored runs of this pipeline. |
spec: PipelineSpec
property
readonly
Returns the spec of the pipeline.
The pipeline spec contains the source paths of all steps, as well as each of their upstream step names. This is primarily used to compare whether two pipelines are the same.
Returns:
Type | Description |
---|---|
PipelineSpec |
The spec of the pipeline. |
__eq__(self, other)
special
Returns whether the other object is referring to the same pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
Any |
The other object to compare to. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the other object is referring to the same pipeline, False otherwise. |
Source code in zenml/post_execution/pipeline.py
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same pipeline.
Args:
other: The other object to compare to.
Returns:
True if the other object is referring to the same pipeline,
False otherwise.
"""
if isinstance(other, PipelineView):
return self.id == other.id
return NotImplemented
__init__(self, model)
special
Initializes a post-execution pipeline object.
In most cases PipelineView
objects should not be created manually
but retrieved using the get_pipelines()
utility from
zenml.post_execution
instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
PipelineResponseModel |
The model to initialize this pipeline view from. |
required |
Source code in zenml/post_execution/pipeline.py
def __init__(self, model: PipelineResponseModel):
"""Initializes a post-execution pipeline object.
In most cases `PipelineView` objects should not be created manually
but retrieved using the `get_pipelines()` utility from
`zenml.post_execution` instead.
Args:
model: The model to initialize this pipeline view from.
"""
self._model = model
__repr__(self)
special
Returns a string representation of this pipeline.
Returns:
Type | Description |
---|---|
str |
A string representation of this pipeline. |
Source code in zenml/post_execution/pipeline.py
def __repr__(self) -> str:
"""Returns a string representation of this pipeline.
Returns:
A string representation of this pipeline.
"""
return (
f"{self.__class__.__qualname__}(id={self.id}, "
f"name='{self.name}')"
)
get_run_for_completed_step(self, step_name)
Ascertains which pipeline run produced the cached artifact of a given step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_name |
str |
Name of step at hand |
required |
Returns:
Type | Description |
---|---|
PipelineRunView |
None if no run is found that completed the given step, else the original pipeline_run. |
Exceptions:
Type | Description |
---|---|
LookupError |
If no run is found that completed the given step |
Source code in zenml/post_execution/pipeline.py
def get_run_for_completed_step(self, step_name: str) -> "PipelineRunView":
"""Ascertains which pipeline run produced the cached artifact of a given step.
Args:
step_name: Name of step at hand
Returns:
None if no run is found that completed the given step,
else the original pipeline_run.
Raises:
LookupError: If no run is found that completed the given step
"""
orig_pipeline_run = None
for run in reversed(self.runs):
try:
step = run.get_step(step_name)
if step.is_completed:
orig_pipeline_run = run
break
except KeyError:
pass
if not orig_pipeline_run:
raise LookupError(
"No Pipeline Run could be found, that has"
f" completed the provided step: [{step_name}]"
)
return orig_pipeline_run
pipeline_run
Implementation of the post-execution pipeline run class.
PipelineRunView
Post-execution pipeline run class.
This can be used to query steps and artifact information associated with a pipeline execution.
Source code in zenml/post_execution/pipeline_run.py
class PipelineRunView:
"""Post-execution pipeline run class.
This can be used to query steps and artifact information associated with a
pipeline execution.
"""
def __init__(self, model: PipelineRunResponseModel):
"""Initializes a post-execution pipeline run object.
In most cases `PipelineRunView` objects should not be created manually
but retrieved from a `PipelineView` object instead.
Args:
model: The model to initialize this object from.
"""
self._model = model
self._steps: Dict[str, StepView] = OrderedDict()
@property
def id(self) -> UUID:
"""Returns the ID of this pipeline run.
Returns:
The ID of this pipeline run.
"""
assert self._model.id is not None
return self._model.id
@property
def name(self) -> str:
"""Returns the name of the pipeline run.
Returns:
The name of the pipeline run.
"""
return self._model.name
@property
def pipeline_configuration(self) -> Dict[str, Any]:
"""Returns the pipeline configuration.
Returns:
The pipeline configuration.
"""
return self._model.pipeline_configuration
@property
def settings(self) -> Dict[str, Any]:
"""Returns the pipeline settings.
These are runtime settings passed down to stack components, which
can be set at pipeline level.
Returns:
The pipeline settings.
"""
settings = self.pipeline_configuration["settings"]
return cast(Dict[str, Any], settings)
@property
def extra(self) -> Dict[str, Any]:
"""Returns the pipeline extras.
This dict is meant to be used to pass any configuration down to the
pipeline or stack components that the user has use of.
Returns:
The pipeline extras.
"""
extra = self.pipeline_configuration["extra"]
return cast(Dict[str, Any], extra)
@property
def enable_cache(self) -> bool:
"""Returns whether caching is enabled for this pipeline run.
Returns:
True if caching is enabled for this pipeline run.
"""
enable_cache = self.pipeline_configuration["enable_cache"]
return cast(bool, enable_cache)
@property
def zenml_version(self) -> Optional[str]:
"""Version of ZenML that this pipeline run was performed with.
Returns:
The version of ZenML that this pipeline run was performed with.
"""
return self._model.zenml_version
@property
def git_sha(self) -> Optional[str]:
"""Git commit SHA that this pipeline run was performed on.
This will only be set if the pipeline code is in a git repository and
there are no dirty files when running the pipeline.
Returns:
The git commit SHA that this pipeline run was performed on.
"""
return self._model.git_sha
@property
def status(self) -> ExecutionStatus:
"""Returns the current status of the pipeline run.
Returns:
The current status of the pipeline run.
"""
# Query the run again since the status might have changed since this
# object was created.
return Client().get_pipeline_run(self.id).status
@property
def created(self) -> datetime:
"""Returns the creation time of the pipeline run.
Returns:
The creation time of the pipeline run.
"""
return self._model.created
@property
def steps(self) -> List[StepView]:
"""Returns all steps that were executed as part of this pipeline run.
Returns:
A list of all steps that were executed as part of this pipeline run.
"""
self._ensure_steps_fetched()
return list(self._steps.values())
def get_step_names(self) -> List[str]:
"""Returns a list of all step names.
Returns:
A list of all step names.
"""
self._ensure_steps_fetched()
return list(self._steps.keys())
def get_step(
self,
step: Optional[str] = None,
**kwargs: Any,
) -> StepView:
"""Returns a step for the given name.
The name refers to the name of the step in the pipeline definition, not
the class name of the step-class.
Use it like this:
```python
# Get the step by name
pipeline_run_view.get_step("first_step")
```
Args:
step: Class or class instance of the step
**kwargs: The deprecated `name` is caught as a kwarg to
specify the step instead of using the `step` argument.
Returns:
A step for the given name.
Raises:
KeyError: If there is no step with the given name.
RuntimeError: If no step has been specified at all.
"""
self._ensure_steps_fetched()
api_doc_link = get_apidocs_link(
"core-post_execution",
"zenml.post_execution.pipeline_run.PipelineRunView" ".get_step",
)
step_name = kwargs.get("name", None)
# Raise an error if neither `step` nor `name` args were provided.
if not step and not isinstance(step_name, str):
raise RuntimeError(
"No step specified. Please specify a step using "
"pipeline_run_view.get_step(step=`step_name`). "
f"Please refer to the API docs to learn more: "
f"{api_doc_link}"
)
# If `name` was provided but not `step`, print a depreciation warning.
if not step:
logger.warning(
"Using 'name' to get a step from "
"'PipelineRunView.get_step()' is deprecated and "
"will be removed in the future. Instead please "
"use 'step' to access a step from your past "
"pipeline runs. Learn more in our API docs: %s",
api_doc_link,
)
step = step_name
# Raise an error if there is no such step in the given pipeline run.
if step not in self._steps:
raise KeyError(
f"No step found for name `{step}`. This pipeline "
f"run only has steps with the following "
f"names: `{self.get_step_names()}`"
)
return self._steps[step]
def _ensure_steps_fetched(self) -> None:
"""Fetches all steps for this pipeline run from the metadata store."""
if self._steps:
# we already fetched the steps, no need to do anything
return
assert self._model.id is not None
steps = Client().zen_store.list_run_steps(self._model.id)
self._steps = {step.name: StepView(step) for step in steps}
def __repr__(self) -> str:
"""Returns a string representation of this pipeline run.
Returns:
A string representation of this pipeline run.
"""
return (
f"{self.__class__.__qualname__}(id={self.id}, "
f"name='{self.name}')"
)
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same pipeline run.
Args:
other: The other object to compare to.
Returns:
True if the other object is referring to the same pipeline run.
"""
if isinstance(other, PipelineRunView):
return self.id == other.id
return NotImplemented
created: datetime
property
readonly
Returns the creation time of the pipeline run.
Returns:
Type | Description |
---|---|
datetime |
The creation time of the pipeline run. |
enable_cache: bool
property
readonly
Returns whether caching is enabled for this pipeline run.
Returns:
Type | Description |
---|---|
bool |
True if caching is enabled for this pipeline run. |
extra: Dict[str, Any]
property
readonly
Returns the pipeline extras.
This dict is meant to be used to pass any configuration down to the pipeline or stack components that the user has use of.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The pipeline extras. |
git_sha: Optional[str]
property
readonly
Git commit SHA that this pipeline run was performed on.
This will only be set if the pipeline code is in a git repository and there are no dirty files when running the pipeline.
Returns:
Type | Description |
---|---|
Optional[str] |
The git commit SHA that this pipeline run was performed on. |
id: UUID
property
readonly
Returns the ID of this pipeline run.
Returns:
Type | Description |
---|---|
UUID |
The ID of this pipeline run. |
name: str
property
readonly
Returns the name of the pipeline run.
Returns:
Type | Description |
---|---|
str |
The name of the pipeline run. |
pipeline_configuration: Dict[str, Any]
property
readonly
Returns the pipeline configuration.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The pipeline configuration. |
settings: Dict[str, Any]
property
readonly
Returns the pipeline settings.
These are runtime settings passed down to stack components, which can be set at pipeline level.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The pipeline settings. |
status: ExecutionStatus
property
readonly
Returns the current status of the pipeline run.
Returns:
Type | Description |
---|---|
ExecutionStatus |
The current status of the pipeline run. |
steps: List[zenml.post_execution.step.StepView]
property
readonly
Returns all steps that were executed as part of this pipeline run.
Returns:
Type | Description |
---|---|
List[zenml.post_execution.step.StepView] |
A list of all steps that were executed as part of this pipeline run. |
zenml_version: Optional[str]
property
readonly
Version of ZenML that this pipeline run was performed with.
Returns:
Type | Description |
---|---|
Optional[str] |
The version of ZenML that this pipeline run was performed with. |
__eq__(self, other)
special
Returns whether the other object is referring to the same pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
Any |
The other object to compare to. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the other object is referring to the same pipeline run. |
Source code in zenml/post_execution/pipeline_run.py
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same pipeline run.
Args:
other: The other object to compare to.
Returns:
True if the other object is referring to the same pipeline run.
"""
if isinstance(other, PipelineRunView):
return self.id == other.id
return NotImplemented
__init__(self, model)
special
Initializes a post-execution pipeline run object.
In most cases PipelineRunView
objects should not be created manually
but retrieved from a PipelineView
object instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
PipelineRunResponseModel |
The model to initialize this object from. |
required |
Source code in zenml/post_execution/pipeline_run.py
def __init__(self, model: PipelineRunResponseModel):
"""Initializes a post-execution pipeline run object.
In most cases `PipelineRunView` objects should not be created manually
but retrieved from a `PipelineView` object instead.
Args:
model: The model to initialize this object from.
"""
self._model = model
self._steps: Dict[str, StepView] = OrderedDict()
__repr__(self)
special
Returns a string representation of this pipeline run.
Returns:
Type | Description |
---|---|
str |
A string representation of this pipeline run. |
Source code in zenml/post_execution/pipeline_run.py
def __repr__(self) -> str:
"""Returns a string representation of this pipeline run.
Returns:
A string representation of this pipeline run.
"""
return (
f"{self.__class__.__qualname__}(id={self.id}, "
f"name='{self.name}')"
)
get_step(self, step=None, **kwargs)
Returns a step for the given name.
The name refers to the name of the step in the pipeline definition, not the class name of the step-class.
Use it like this:
# Get the step by name
pipeline_run_view.get_step("first_step")
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step |
Optional[str] |
Class or class instance of the step |
None |
**kwargs |
Any |
The deprecated |
{} |
Returns:
Type | Description |
---|---|
StepView |
A step for the given name. |
Exceptions:
Type | Description |
---|---|
KeyError |
If there is no step with the given name. |
RuntimeError |
If no step has been specified at all. |
Source code in zenml/post_execution/pipeline_run.py
def get_step(
self,
step: Optional[str] = None,
**kwargs: Any,
) -> StepView:
"""Returns a step for the given name.
The name refers to the name of the step in the pipeline definition, not
the class name of the step-class.
Use it like this:
```python
# Get the step by name
pipeline_run_view.get_step("first_step")
```
Args:
step: Class or class instance of the step
**kwargs: The deprecated `name` is caught as a kwarg to
specify the step instead of using the `step` argument.
Returns:
A step for the given name.
Raises:
KeyError: If there is no step with the given name.
RuntimeError: If no step has been specified at all.
"""
self._ensure_steps_fetched()
api_doc_link = get_apidocs_link(
"core-post_execution",
"zenml.post_execution.pipeline_run.PipelineRunView" ".get_step",
)
step_name = kwargs.get("name", None)
# Raise an error if neither `step` nor `name` args were provided.
if not step and not isinstance(step_name, str):
raise RuntimeError(
"No step specified. Please specify a step using "
"pipeline_run_view.get_step(step=`step_name`). "
f"Please refer to the API docs to learn more: "
f"{api_doc_link}"
)
# If `name` was provided but not `step`, print a depreciation warning.
if not step:
logger.warning(
"Using 'name' to get a step from "
"'PipelineRunView.get_step()' is deprecated and "
"will be removed in the future. Instead please "
"use 'step' to access a step from your past "
"pipeline runs. Learn more in our API docs: %s",
api_doc_link,
)
step = step_name
# Raise an error if there is no such step in the given pipeline run.
if step not in self._steps:
raise KeyError(
f"No step found for name `{step}`. This pipeline "
f"run only has steps with the following "
f"names: `{self.get_step_names()}`"
)
return self._steps[step]
get_step_names(self)
Returns a list of all step names.
Returns:
Type | Description |
---|---|
List[str] |
A list of all step names. |
Source code in zenml/post_execution/pipeline_run.py
def get_step_names(self) -> List[str]:
"""Returns a list of all step names.
Returns:
A list of all step names.
"""
self._ensure_steps_fetched()
return list(self._steps.keys())
get_run(name)
Fetches the post-execution view of a run with the given name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the run to fetch. |
required |
Returns:
Type | Description |
---|---|
PipelineRunView |
The post-execution view of the run with the given name. |
Exceptions:
Type | Description |
---|---|
KeyError |
If no run with the given name exists. |
RuntimeError |
If multiple runs with the given name exist. |
Source code in zenml/post_execution/pipeline_run.py
def get_run(name: str) -> "PipelineRunView":
"""Fetches the post-execution view of a run with the given name.
Args:
name: The name of the run to fetch.
Returns:
The post-execution view of the run with the given name.
Raises:
KeyError: If no run with the given name exists.
RuntimeError: If multiple runs with the given name exist.
"""
client = Client()
active_project_id = client.active_project.id
runs = client.zen_store.list_runs(
name=name,
project_name_or_id=active_project_id,
)
# TODO: [server] this error handling could be improved
if not runs:
raise KeyError(f"No run with name '{name}' exists.")
elif len(runs) > 1:
raise RuntimeError(
f"Multiple runs have been found for name '{name}'.", runs
)
return PipelineRunView(runs[0])
get_unlisted_runs()
Fetches post-execution views of all unlisted runs.
Unlisted runs are runs that are not associated with any pipeline.
Returns:
Type | Description |
---|---|
List[PipelineRunView] |
A list of post-execution run views. |
Source code in zenml/post_execution/pipeline_run.py
def get_unlisted_runs() -> List["PipelineRunView"]:
"""Fetches post-execution views of all unlisted runs.
Unlisted runs are runs that are not associated with any pipeline.
Returns:
A list of post-execution run views.
"""
client = Client()
runs = client.zen_store.list_runs(
project_name_or_id=client.active_project.id,
unlisted=True,
)
return [PipelineRunView(model) for model in runs]
step
Implementation of a post-execution step class.
StepView
Post-execution step class.
This can be used to query artifact information associated with a pipeline step.
Source code in zenml/post_execution/step.py
class StepView:
"""Post-execution step class.
This can be used to query artifact information associated with a pipeline step.
"""
def __init__(self, model: StepRunResponseModel):
"""Initializes a post-execution step object.
In most cases `StepView` objects should not be created manually
but retrieved from a `PipelineRunView` object instead.
Args:
model: The model to initialize this object from.
"""
self._model = model
self._inputs: Dict[str, ArtifactView] = {}
self._outputs: Dict[str, ArtifactView] = {}
@property
def id(self) -> UUID:
"""Returns the step id.
Returns:
The step id.
"""
assert self._model.id
return self._model.id
@property
def parent_step_ids(self) -> List[UUID]:
"""Returns a list of IDs of all parents of this step.
Returns:
A list of IDs of all parents of this step.
"""
assert self._model.parent_step_ids
return self._model.parent_step_ids
@property
def entrypoint_name(self) -> str:
"""Returns the step entrypoint_name.
This name is equal to the name argument passed to the @step decorator
or the actual function name if no explicit name was given.
Examples:
# the step entrypoint_name will be "my_step"
@step(step="my_step")
def my_step_function(...)
# the step entrypoint_name will be "my_step_function"
@step
def my_step_function(...)
Returns:
The step entrypoint_name.
"""
return self.step_configuration.name
@property
def name(self) -> str:
"""Returns the name as it is defined in the pipeline.
This name is equal to the name given to the step within the pipeline
context
Examples:
@step()
def my_step_function(...)
@pipeline
def my_pipeline_function(step_a)
p = my_pipeline_function(
step_a = my_step_function()
)
The name will be `step_a`
Returns:
The name of this step.
"""
return self._model.name
@property
def docstring(self) -> Optional[str]:
"""Docstring of the step function or class.
Returns:
The docstring of the step function or class.
"""
return self.step_configuration.docstring
@property
def parameters(self) -> Dict[str, str]:
"""The parameters used to run this step.
Returns:
The parameters used to run this step.
"""
return self.step_configuration.parameters
@property
def step_configuration(self) -> "StepConfiguration":
"""Returns the step configuration.
Returns:
The step configuration.
"""
return self._model.step.config
@property
def settings(self) -> Dict[str, "BaseSettings"]:
"""Returns the step settings.
These are runtime settings passed down to stack components, which
can be set at step level.
Returns:
The step settings.
"""
return self.step_configuration.settings
@property
def extra(self) -> Dict[str, Any]:
"""Returns the extra dictionary.
This dict is meant to be used to pass any configuration down to the
step that the user has use of.
Returns:
The extra dictionary.
"""
return self.step_configuration.extra
@property
def enable_cache(self) -> bool:
"""Returns whether caching is enabled for this step.
Returns:
Whether caching is enabled for this step.
"""
return self.step_configuration.enable_cache
@property
def step_operator(self) -> Optional[str]:
"""Returns the name of the step operator of the step.
Returns:
The name of the step operator of the step.
"""
return self.step_configuration.step_operator
@property
def experiment_tracker(self) -> Optional[str]:
"""Returns the name of the experiment tracker of the step.
Returns:
The name of the experiment tracker of the step.
"""
return self.step_configuration.experiment_tracker
@property
def spec(self) -> "StepSpec":
"""Returns the step spec.
The step spec defines the source path and upstream steps of a step and
is used primarily to compare whether two steps are the same.
Returns:
The step spec.
"""
return self._model.step.spec
@property
def status(self) -> ExecutionStatus:
"""Returns the current status of the step.
Returns:
The current status of the step.
"""
# Query the step again since the status might have changed since this
# object was created.
return Client().zen_store.get_run_step(self.id).status
@property
def is_cached(self) -> bool:
"""Returns whether the step is cached or not.
Returns:
True if the step is cached, False otherwise.
"""
return self.status == ExecutionStatus.CACHED
@property
def is_completed(self) -> bool:
"""Returns whether the step is cached or not.
Returns:
True if the step is completed, False otherwise.
"""
return self.status == ExecutionStatus.COMPLETED
@property
def inputs(self) -> Dict[str, ArtifactView]:
"""Returns all input artifacts that were used to run this step.
Returns:
A dictionary of artifact names to artifact views.
"""
self._ensure_inputs_fetched()
return self._inputs
@property
def input(self) -> ArtifactView:
"""Returns the input artifact that was used to run this step.
Returns:
The input artifact.
Raises:
ValueError: If there were zero or multiple inputs to this step.
"""
if len(self.inputs) != 1:
raise ValueError(
"Can't use the `StepView.input` property for steps with zero "
"or multiple inputs, use `StepView.inputs` instead."
)
return next(iter(self.inputs.values()))
@property
def outputs(self) -> Dict[str, ArtifactView]:
"""Returns all output artifacts that were written by this step.
Returns:
A dictionary of artifact names to artifact views.
"""
self._ensure_outputs_fetched()
return self._outputs
@property
def output(self) -> ArtifactView:
"""Returns the output artifact that was written by this step.
Returns:
The output artifact.
Raises:
ValueError: If there were zero or multiple step outputs.
"""
if len(self.outputs) != 1:
raise ValueError(
"Can't use the `StepView.output` property for steps with zero "
"or multiple outputs, use `StepView.outputs` instead."
)
return next(iter(self.outputs.values()))
def _ensure_inputs_fetched(self) -> None:
"""Fetches all step inputs from the ZenStore."""
if self._inputs:
# we already fetched inputs, no need to do anything
return
self._inputs = {
name: ArtifactView(artifact_model)
for name, artifact_model in self._model.input_artifacts.items()
}
def _ensure_outputs_fetched(self) -> None:
"""Fetches all step outputs from the ZenStore."""
if self._outputs:
# we already fetched outputs, no need to do anything
return
self._outputs = {
name: ArtifactView(artifact_model)
for name, artifact_model in self._model.output_artifacts.items()
}
def __repr__(self) -> str:
"""Returns a string representation of this step.
Returns:
A string representation of this step.
"""
return (
f"{self.__class__.__qualname__}(id={self.id}, "
f"name='{self.name}', entrypoint_name='{self.entrypoint_name}'"
)
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same step.
Args:
other: The other object to compare to.
Returns:
True if the other object is referring to the same step, False
otherwise.
"""
if isinstance(other, StepView):
return self.id == other.id
return NotImplemented
docstring: Optional[str]
property
readonly
Docstring of the step function or class.
Returns:
Type | Description |
---|---|
Optional[str] |
The docstring of the step function or class. |
enable_cache: bool
property
readonly
Returns whether caching is enabled for this step.
Returns:
Type | Description |
---|---|
bool |
Whether caching is enabled for this step. |
entrypoint_name: str
property
readonly
Returns the step entrypoint_name.
This name is equal to the name argument passed to the @step decorator or the actual function name if no explicit name was given.
Examples:
the step entrypoint_name will be "my_step"
@step(step="my_step") def my_step_function(...)
the step entrypoint_name will be "my_step_function"
@step def my_step_function(...)
Returns:
Type | Description |
---|---|
str |
The step entrypoint_name. |
experiment_tracker: Optional[str]
property
readonly
Returns the name of the experiment tracker of the step.
Returns:
Type | Description |
---|---|
Optional[str] |
The name of the experiment tracker of the step. |
extra: Dict[str, Any]
property
readonly
Returns the extra dictionary.
This dict is meant to be used to pass any configuration down to the step that the user has use of.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The extra dictionary. |
id: UUID
property
readonly
Returns the step id.
Returns:
Type | Description |
---|---|
UUID |
The step id. |
input: ArtifactView
property
readonly
Returns the input artifact that was used to run this step.
Returns:
Type | Description |
---|---|
ArtifactView |
The input artifact. |
Exceptions:
Type | Description |
---|---|
ValueError |
If there were zero or multiple inputs to this step. |
inputs: Dict[str, zenml.post_execution.artifact.ArtifactView]
property
readonly
Returns all input artifacts that were used to run this step.
Returns:
Type | Description |
---|---|
Dict[str, zenml.post_execution.artifact.ArtifactView] |
A dictionary of artifact names to artifact views. |
is_cached: bool
property
readonly
Returns whether the step is cached or not.
Returns:
Type | Description |
---|---|
bool |
True if the step is cached, False otherwise. |
is_completed: bool
property
readonly
Returns whether the step is cached or not.
Returns:
Type | Description |
---|---|
bool |
True if the step is completed, False otherwise. |
name: str
property
readonly
Returns the name as it is defined in the pipeline.
This name is equal to the name given to the step within the pipeline context
Examples:
@step() def my_step_function(...)
@pipeline def my_pipeline_function(step_a)
p = my_pipeline_function( step_a = my_step_function() )
The name will be step_a
Returns:
Type | Description |
---|---|
str |
The name of this step. |
output: ArtifactView
property
readonly
Returns the output artifact that was written by this step.
Returns:
Type | Description |
---|---|
ArtifactView |
The output artifact. |
Exceptions:
Type | Description |
---|---|
ValueError |
If there were zero or multiple step outputs. |
outputs: Dict[str, zenml.post_execution.artifact.ArtifactView]
property
readonly
Returns all output artifacts that were written by this step.
Returns:
Type | Description |
---|---|
Dict[str, zenml.post_execution.artifact.ArtifactView] |
A dictionary of artifact names to artifact views. |
parameters: Dict[str, str]
property
readonly
The parameters used to run this step.
Returns:
Type | Description |
---|---|
Dict[str, str] |
The parameters used to run this step. |
parent_step_ids: List[uuid.UUID]
property
readonly
Returns a list of IDs of all parents of this step.
Returns:
Type | Description |
---|---|
List[uuid.UUID] |
A list of IDs of all parents of this step. |
settings: Dict[str, BaseSettings]
property
readonly
Returns the step settings.
These are runtime settings passed down to stack components, which can be set at step level.
Returns:
Type | Description |
---|---|
Dict[str, BaseSettings] |
The step settings. |
spec: StepSpec
property
readonly
Returns the step spec.
The step spec defines the source path and upstream steps of a step and is used primarily to compare whether two steps are the same.
Returns:
Type | Description |
---|---|
StepSpec |
The step spec. |
status: ExecutionStatus
property
readonly
Returns the current status of the step.
Returns:
Type | Description |
---|---|
ExecutionStatus |
The current status of the step. |
step_configuration: StepConfiguration
property
readonly
Returns the step configuration.
Returns:
Type | Description |
---|---|
StepConfiguration |
The step configuration. |
step_operator: Optional[str]
property
readonly
Returns the name of the step operator of the step.
Returns:
Type | Description |
---|---|
Optional[str] |
The name of the step operator of the step. |
__eq__(self, other)
special
Returns whether the other object is referring to the same step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
Any |
The other object to compare to. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the other object is referring to the same step, False otherwise. |
Source code in zenml/post_execution/step.py
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same step.
Args:
other: The other object to compare to.
Returns:
True if the other object is referring to the same step, False
otherwise.
"""
if isinstance(other, StepView):
return self.id == other.id
return NotImplemented
__init__(self, model)
special
Initializes a post-execution step object.
In most cases StepView
objects should not be created manually
but retrieved from a PipelineRunView
object instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
StepRunResponseModel |
The model to initialize this object from. |
required |
Source code in zenml/post_execution/step.py
def __init__(self, model: StepRunResponseModel):
"""Initializes a post-execution step object.
In most cases `StepView` objects should not be created manually
but retrieved from a `PipelineRunView` object instead.
Args:
model: The model to initialize this object from.
"""
self._model = model
self._inputs: Dict[str, ArtifactView] = {}
self._outputs: Dict[str, ArtifactView] = {}
__repr__(self)
special
Returns a string representation of this step.
Returns:
Type | Description |
---|---|
str |
A string representation of this step. |
Source code in zenml/post_execution/step.py
def __repr__(self) -> str:
"""Returns a string representation of this step.
Returns:
A string representation of this step.
"""
return (
f"{self.__class__.__qualname__}(id={self.id}, "
f"name='{self.name}', entrypoint_name='{self.entrypoint_name}'"
)