Post Execution
zenml.post_execution
special
Initialization for the post-execution module.
After executing a pipeline, the user needs to be able to fetch it from history and perform certain tasks. The post_execution submodule provides a set of interfaces with which the user can interact with artifacts, the pipeline, steps, and the post-run pipeline object.
artifact
Initialization for the post-execution artifact class.
ArtifactView
Post-execution artifact class.
This can be used to read artifact data that was created during a pipeline execution.
Source code in zenml/post_execution/artifact.py
class ArtifactView:
"""Post-execution artifact class.
This can be used to read artifact data that was created during a pipeline
execution.
"""
def __init__(
self,
id_: int,
type_: str,
uri: str,
materializer: str,
data_type: str,
metadata_store: "BaseMetadataStore",
parent_step_id: int,
):
"""Initializes a post-execution artifact object.
In most cases `ArtifactView` objects should not be created manually but
retrieved from a `StepView` via the `inputs` or `outputs` properties.
Args:
id_: The artifact id.
type_: The type of this artifact.
uri: Specifies where the artifact data is stored.
materializer: Information needed to restore the materializer
that was used to write this artifact.
data_type: The type of data that was passed to the materializer
when writing that artifact. Will be used as a default type
to read the artifact.
metadata_store: The metadata store which should be used to fetch
additional information related to this pipeline.
parent_step_id: The ID of the parent step.
"""
self._id = id_
self._type = type_
self._uri = uri
self._materializer = materializer
self._data_type = data_type
self._metadata_store = metadata_store
self._parent_step_id = parent_step_id
@property
def id(self) -> int:
"""Returns the artifact id.
Returns:
The artifact id.
"""
return self._id
@property
def type(self) -> str:
"""Returns the artifact type.
Returns:
The artifact type.
"""
return self._type
@property
def data_type(self) -> str:
"""Returns the data type of the artifact.
Returns:
The data type of the artifact.
"""
return self._data_type
@property
def uri(self) -> str:
"""Returns the URI where the artifact data is stored.
Returns:
The URI where the artifact data is stored.
"""
return self._uri
@property
def parent_step_id(self) -> int:
"""Returns the ID of the parent step.
This need not be equivalent to the ID of the producer step.
Returns:
The ID of the parent step.
"""
return self._parent_step_id
@property
def producer_step(self) -> "StepView":
"""Returns the original StepView that produced the artifact.
Returns:
The original StepView that produced the artifact.
"""
# TODO [ENG-174]: Replace with artifact.id instead of passing self if
# required.
return self._metadata_store.get_producer_step_from_artifact(self)
@property
def is_cached(self) -> bool:
"""Returns True if artifact was cached in a previous run, else False.
Returns:
True if artifact was cached in a previous run, else False.
"""
# self._metadata_store.
return self.producer_step.id != self.parent_step_id
def read(
self,
output_data_type: Optional[Type[Any]] = None,
materializer_class: Optional[Type["BaseMaterializer"]] = None,
) -> Any:
"""Materializes the data stored in this artifact.
Args:
output_data_type: The datatype to which the materializer should
read, will be passed to the materializers `handle_input` method.
materializer_class: The class of the materializer that should be
used to read the artifact data. If no materializer class is
given, we use the materializer that was used to write the
artifact during execution of the pipeline.
Returns:
The materialized data.
Raises:
ModuleNotFoundError: If the materializer class could not be found.
"""
if not materializer_class:
try:
materializer_class = source_utils.load_source_path_class(
self._materializer
)
except (ModuleNotFoundError, AttributeError) as e:
logger.error(
f"ZenML can not locate and import the materializer module "
f"{self._materializer} which was used to write this "
f"artifact. If you want to read from it, please provide "
f"a 'materializer_class'."
)
raise ModuleNotFoundError(e) from e
if not output_data_type:
try:
output_data_type = source_utils.load_source_path_class(
self._data_type
)
except (ModuleNotFoundError, AttributeError) as e:
logger.error(
f"ZenML can not locate and import the data type of this "
f"artifact {self._data_type}. If you want to read "
f"from it, please provide a 'output_data_type'."
)
raise ModuleNotFoundError(e) from e
logger.debug(
"Using '%s' to read '%s' (uri: %s).",
materializer_class.__qualname__,
self._type,
self._uri,
)
# TODO [ENG-162]: passing in `self` to initialize the materializer only
# works because materializers only require a `.uri` property at the
# moment.
materializer = materializer_class(self) # type: ignore[arg-type]
return materializer.handle_input(output_data_type)
def __repr__(self) -> str:
"""Returns a string representation of this artifact.
Returns:
A string representation of this artifact.
"""
return (
f"{self.__class__.__qualname__}(id={self._id}, "
f"type='{self._type}', uri='{self._uri}', "
f"materializer='{self._materializer}')"
)
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same artifact.
Args:
other: The other object to compare to.
Returns:
True if the other object is referring to the same artifact, else
False.
"""
if isinstance(other, ArtifactView):
return self._id == other._id and self._uri == other._uri
return NotImplemented
data_type: str
property
readonly
Returns the data type of the artifact.
Returns:
Type | Description |
---|---|
str |
The data type of the artifact. |
id: int
property
readonly
Returns the artifact id.
Returns:
Type | Description |
---|---|
int |
The artifact id. |
is_cached: bool
property
readonly
Returns True if artifact was cached in a previous run, else False.
Returns:
Type | Description |
---|---|
bool |
True if artifact was cached in a previous run, else False. |
parent_step_id: int
property
readonly
Returns the ID of the parent step.
This need not be equivalent to the ID of the producer step.
Returns:
Type | Description |
---|---|
int |
The ID of the parent step. |
producer_step: StepView
property
readonly
Returns the original StepView that produced the artifact.
Returns:
Type | Description |
---|---|
StepView |
The original StepView that produced the artifact. |
type: str
property
readonly
Returns the artifact type.
Returns:
Type | Description |
---|---|
str |
The artifact type. |
uri: str
property
readonly
Returns the URI where the artifact data is stored.
Returns:
Type | Description |
---|---|
str |
The URI where the artifact data is stored. |
__eq__(self, other)
special
Returns whether the other object is referring to the same artifact.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
Any |
The other object to compare to. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the other object is referring to the same artifact, else False. |
Source code in zenml/post_execution/artifact.py
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same artifact.
Args:
other: The other object to compare to.
Returns:
True if the other object is referring to the same artifact, else
False.
"""
if isinstance(other, ArtifactView):
return self._id == other._id and self._uri == other._uri
return NotImplemented
__init__(self, id_, type_, uri, materializer, data_type, metadata_store, parent_step_id)
special
Initializes a post-execution artifact object.
In most cases ArtifactView
objects should not be created manually but
retrieved from a StepView
via the inputs
or outputs
properties.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id_ |
int |
The artifact id. |
required |
type_ |
str |
The type of this artifact. |
required |
uri |
str |
Specifies where the artifact data is stored. |
required |
materializer |
str |
Information needed to restore the materializer that was used to write this artifact. |
required |
data_type |
str |
The type of data that was passed to the materializer when writing that artifact. Will be used as a default type to read the artifact. |
required |
metadata_store |
BaseMetadataStore |
The metadata store which should be used to fetch additional information related to this pipeline. |
required |
parent_step_id |
int |
The ID of the parent step. |
required |
Source code in zenml/post_execution/artifact.py
def __init__(
self,
id_: int,
type_: str,
uri: str,
materializer: str,
data_type: str,
metadata_store: "BaseMetadataStore",
parent_step_id: int,
):
"""Initializes a post-execution artifact object.
In most cases `ArtifactView` objects should not be created manually but
retrieved from a `StepView` via the `inputs` or `outputs` properties.
Args:
id_: The artifact id.
type_: The type of this artifact.
uri: Specifies where the artifact data is stored.
materializer: Information needed to restore the materializer
that was used to write this artifact.
data_type: The type of data that was passed to the materializer
when writing that artifact. Will be used as a default type
to read the artifact.
metadata_store: The metadata store which should be used to fetch
additional information related to this pipeline.
parent_step_id: The ID of the parent step.
"""
self._id = id_
self._type = type_
self._uri = uri
self._materializer = materializer
self._data_type = data_type
self._metadata_store = metadata_store
self._parent_step_id = parent_step_id
__repr__(self)
special
Returns a string representation of this artifact.
Returns:
Type | Description |
---|---|
str |
A string representation of this artifact. |
Source code in zenml/post_execution/artifact.py
def __repr__(self) -> str:
"""Returns a string representation of this artifact.
Returns:
A string representation of this artifact.
"""
return (
f"{self.__class__.__qualname__}(id={self._id}, "
f"type='{self._type}', uri='{self._uri}', "
f"materializer='{self._materializer}')"
)
read(self, output_data_type=None, materializer_class=None)
Materializes the data stored in this artifact.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_data_type |
Optional[Type[Any]] |
The datatype to which the materializer should
read, will be passed to the materializers |
None |
materializer_class |
Optional[Type[BaseMaterializer]] |
The class of the materializer that should be used to read the artifact data. If no materializer class is given, we use the materializer that was used to write the artifact during execution of the pipeline. |
None |
Returns:
Type | Description |
---|---|
Any |
The materialized data. |
Exceptions:
Type | Description |
---|---|
ModuleNotFoundError |
If the materializer class could not be found. |
Source code in zenml/post_execution/artifact.py
def read(
self,
output_data_type: Optional[Type[Any]] = None,
materializer_class: Optional[Type["BaseMaterializer"]] = None,
) -> Any:
"""Materializes the data stored in this artifact.
Args:
output_data_type: The datatype to which the materializer should
read, will be passed to the materializers `handle_input` method.
materializer_class: The class of the materializer that should be
used to read the artifact data. If no materializer class is
given, we use the materializer that was used to write the
artifact during execution of the pipeline.
Returns:
The materialized data.
Raises:
ModuleNotFoundError: If the materializer class could not be found.
"""
if not materializer_class:
try:
materializer_class = source_utils.load_source_path_class(
self._materializer
)
except (ModuleNotFoundError, AttributeError) as e:
logger.error(
f"ZenML can not locate and import the materializer module "
f"{self._materializer} which was used to write this "
f"artifact. If you want to read from it, please provide "
f"a 'materializer_class'."
)
raise ModuleNotFoundError(e) from e
if not output_data_type:
try:
output_data_type = source_utils.load_source_path_class(
self._data_type
)
except (ModuleNotFoundError, AttributeError) as e:
logger.error(
f"ZenML can not locate and import the data type of this "
f"artifact {self._data_type}. If you want to read "
f"from it, please provide a 'output_data_type'."
)
raise ModuleNotFoundError(e) from e
logger.debug(
"Using '%s' to read '%s' (uri: %s).",
materializer_class.__qualname__,
self._type,
self._uri,
)
# TODO [ENG-162]: passing in `self` to initialize the materializer only
# works because materializers only require a `.uri` property at the
# moment.
materializer = materializer_class(self) # type: ignore[arg-type]
return materializer.handle_input(output_data_type)
pipeline
Implementation of the post-execution pipeline.
PipelineView
Post-execution pipeline class.
This can be used to query pipeline-related information from the metadata store.
Source code in zenml/post_execution/pipeline.py
class PipelineView:
"""Post-execution pipeline class.
This can be used to query pipeline-related information from the metadata store.
"""
def __init__(
self, id_: int, name: str, metadata_store: "BaseMetadataStore"
):
"""Initializes a post-execution pipeline object.
In most cases `PipelineView` objects should not be created manually
but retrieved using the `get_pipelines()` method of a
`zenml.repository.Repository` instead.
Args:
id_: The context id of this pipeline.
name: The name of this pipeline.
metadata_store: The metadata store which should be used to fetch
additional information related to this pipeline.
"""
self._id = id_
self._name = name
self._metadata_store = metadata_store
@property
def name(self) -> str:
"""Returns the name of the pipeline.
Returns:
The name of the pipeline.
"""
return self._name
@property
def runs(self) -> List["PipelineRunView"]:
"""Returns all stored runs of this pipeline.
The runs are returned in chronological order, so the latest
run will be the last element in this list.
Returns:
A list of all stored runs of this pipeline.
"""
# Do not cache runs as new runs might appear during this objects
# lifecycle
runs = list(self._metadata_store.get_pipeline_runs(self).values())
for run in runs:
run._run_wrapper = self._get_zenstore_run(run_name=run.name)
return runs
def get_run_names(self) -> List[str]:
"""Returns a list of all run names.
Returns:
A list of all run names.
"""
# Do not cache runs as new runs might appear during this objects
# lifecycle
runs = self._metadata_store.get_pipeline_runs(self)
return list(runs.keys())
def get_run(self, name: str) -> "PipelineRunView":
"""Returns a run for the given name.
Args:
name: The name of the run to return.
Returns:
The run with the given name.
Raises:
KeyError: If there is no run with the given name.
"""
run = self._metadata_store.get_pipeline_run(self, name)
if not run:
raise KeyError(
f"No run found for name `{name}`. This pipeline "
f"only has runs with the following "
f"names: `{self.get_run_names()}`"
)
run._run_wrapper = self._get_zenstore_run(run_name=name)
return run
def get_run_for_completed_step(self, step_name: str) -> "PipelineRunView":
"""Ascertains which pipeline run produced the cached artifact of a given step.
Args:
step_name: Name of step at hand
Returns:
None if no run is found that completed the given step,
else the original pipeline_run.
Raises:
LookupError: If no run is found that completed the given step
"""
orig_pipeline_run = None
for run in reversed(self.runs):
try:
step = run.get_step(step_name)
if step.is_completed:
orig_pipeline_run = run
break
except KeyError:
pass
if not orig_pipeline_run:
raise LookupError(
"No Pipeline Run could be found, that has"
f" completed the provided step: [{step_name}]"
)
return orig_pipeline_run
def _get_zenstore_run(self, run_name: str) -> Optional[PipelineRunWrapper]:
"""Gets a ZenStore run for the given run name.
This will filter all ZenStore runs by the pipeline name of this
pipeline view, the run name passed in as an argument and the metadata
store that this pipeline run is associated with.
Args:
run_name: The name of the run to get.
Returns:
The ZenStore run with the given name, if found.
"""
from zenml.repository import Repository
repo = Repository(skip_repository_check=True) # type: ignore[call-arg]
try:
run_wrapper = repo.zen_store.get_pipeline_run(
pipeline_name=self.name, run_name=run_name
)
metadata_store_wrapper = run_wrapper.stack.get_component_wrapper(
StackComponentType.METADATA_STORE
)
if metadata_store_wrapper and (
metadata_store_wrapper.uuid == self._metadata_store.uuid
):
return run_wrapper
except KeyError:
pass
return None
def __repr__(self) -> str:
"""Returns a string representation of this pipeline.
Returns:
A string representation of this pipeline.
"""
return (
f"{self.__class__.__qualname__}(id={self._id}, "
f"name='{self._name}')"
)
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same pipeline.
Args:
other: The other object to compare to.
Returns:
True if the other object is referring to the same pipeline,
False otherwise.
"""
if isinstance(other, PipelineView):
return (
self._id == other._id
and self._metadata_store.uuid == other._metadata_store.uuid
)
return NotImplemented
name: str
property
readonly
Returns the name of the pipeline.
Returns:
Type | Description |
---|---|
str |
The name of the pipeline. |
runs: List[PipelineRunView]
property
readonly
Returns all stored runs of this pipeline.
The runs are returned in chronological order, so the latest run will be the last element in this list.
Returns:
Type | Description |
---|---|
List[PipelineRunView] |
A list of all stored runs of this pipeline. |
__eq__(self, other)
special
Returns whether the other object is referring to the same pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
Any |
The other object to compare to. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the other object is referring to the same pipeline, False otherwise. |
Source code in zenml/post_execution/pipeline.py
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same pipeline.
Args:
other: The other object to compare to.
Returns:
True if the other object is referring to the same pipeline,
False otherwise.
"""
if isinstance(other, PipelineView):
return (
self._id == other._id
and self._metadata_store.uuid == other._metadata_store.uuid
)
return NotImplemented
__init__(self, id_, name, metadata_store)
special
Initializes a post-execution pipeline object.
In most cases PipelineView
objects should not be created manually
but retrieved using the get_pipelines()
method of a
zenml.repository.Repository
instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id_ |
int |
The context id of this pipeline. |
required |
name |
str |
The name of this pipeline. |
required |
metadata_store |
BaseMetadataStore |
The metadata store which should be used to fetch additional information related to this pipeline. |
required |
Source code in zenml/post_execution/pipeline.py
def __init__(
self, id_: int, name: str, metadata_store: "BaseMetadataStore"
):
"""Initializes a post-execution pipeline object.
In most cases `PipelineView` objects should not be created manually
but retrieved using the `get_pipelines()` method of a
`zenml.repository.Repository` instead.
Args:
id_: The context id of this pipeline.
name: The name of this pipeline.
metadata_store: The metadata store which should be used to fetch
additional information related to this pipeline.
"""
self._id = id_
self._name = name
self._metadata_store = metadata_store
__repr__(self)
special
Returns a string representation of this pipeline.
Returns:
Type | Description |
---|---|
str |
A string representation of this pipeline. |
Source code in zenml/post_execution/pipeline.py
def __repr__(self) -> str:
"""Returns a string representation of this pipeline.
Returns:
A string representation of this pipeline.
"""
return (
f"{self.__class__.__qualname__}(id={self._id}, "
f"name='{self._name}')"
)
get_run(self, name)
Returns a run for the given name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the run to return. |
required |
Returns:
Type | Description |
---|---|
PipelineRunView |
The run with the given name. |
Exceptions:
Type | Description |
---|---|
KeyError |
If there is no run with the given name. |
Source code in zenml/post_execution/pipeline.py
def get_run(self, name: str) -> "PipelineRunView":
"""Returns a run for the given name.
Args:
name: The name of the run to return.
Returns:
The run with the given name.
Raises:
KeyError: If there is no run with the given name.
"""
run = self._metadata_store.get_pipeline_run(self, name)
if not run:
raise KeyError(
f"No run found for name `{name}`. This pipeline "
f"only has runs with the following "
f"names: `{self.get_run_names()}`"
)
run._run_wrapper = self._get_zenstore_run(run_name=name)
return run
get_run_for_completed_step(self, step_name)
Ascertains which pipeline run produced the cached artifact of a given step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_name |
str |
Name of step at hand |
required |
Returns:
Type | Description |
---|---|
PipelineRunView |
None if no run is found that completed the given step, else the original pipeline_run. |
Exceptions:
Type | Description |
---|---|
LookupError |
If no run is found that completed the given step |
Source code in zenml/post_execution/pipeline.py
def get_run_for_completed_step(self, step_name: str) -> "PipelineRunView":
"""Ascertains which pipeline run produced the cached artifact of a given step.
Args:
step_name: Name of step at hand
Returns:
None if no run is found that completed the given step,
else the original pipeline_run.
Raises:
LookupError: If no run is found that completed the given step
"""
orig_pipeline_run = None
for run in reversed(self.runs):
try:
step = run.get_step(step_name)
if step.is_completed:
orig_pipeline_run = run
break
except KeyError:
pass
if not orig_pipeline_run:
raise LookupError(
"No Pipeline Run could be found, that has"
f" completed the provided step: [{step_name}]"
)
return orig_pipeline_run
get_run_names(self)
Returns a list of all run names.
Returns:
Type | Description |
---|---|
List[str] |
A list of all run names. |
Source code in zenml/post_execution/pipeline.py
def get_run_names(self) -> List[str]:
"""Returns a list of all run names.
Returns:
A list of all run names.
"""
# Do not cache runs as new runs might appear during this objects
# lifecycle
runs = self._metadata_store.get_pipeline_runs(self)
return list(runs.keys())
pipeline_run
Implementation of the post-execution pipeline run class.
PipelineRunView
Post-execution pipeline run class.
This can be used to query steps and artifact information associated with a pipeline execution.
Source code in zenml/post_execution/pipeline_run.py
class PipelineRunView:
"""Post-execution pipeline run class.
This can be used to query steps and artifact information associated with a
pipeline execution.
"""
def __init__(
self,
id_: int,
name: str,
executions: List[proto.Execution],
metadata_store: "BaseMetadataStore",
):
"""Initializes a post-execution pipeline run object.
In most cases `PipelineRunView` objects should not be created manually
but retrieved from a `PipelineView` object instead.
Args:
id_: The context id of this pipeline run.
name: The name of this pipeline run.
executions: All executions associated with this pipeline run.
metadata_store: The metadata store which should be used to fetch
additional information related to this pipeline run.
"""
self._id = id_
self._name = name
self._metadata_store = metadata_store
self._executions = executions
self._steps: Dict[str, StepView] = OrderedDict()
# This might be set from the parent pipeline view in case this run
# is also tracked in the ZenStore
self._run_wrapper: Optional[PipelineRunWrapper] = None
@property
def name(self) -> str:
"""Returns the name of the pipeline run.
Returns:
The name of the pipeline run.
"""
return self._name
@property
def zenml_version(self) -> Optional[str]:
"""Version of ZenML that this pipeline run was performed with.
Returns:
The version of ZenML that this pipeline run was performed with.
"""
if self._run_wrapper:
return self._run_wrapper.zenml_version
return None
@property
def git_sha(self) -> Optional[str]:
"""Git commit SHA that this pipeline run was performed on.
This will only be set if the pipeline code is in a git repository and
there are no dirty files when running the pipeline.
Returns:
The git commit SHA that this pipeline run was performed on.
"""
if self._run_wrapper:
return self._run_wrapper.git_sha
return None
@property
def runtime_configuration(self) -> Optional["RuntimeConfiguration"]:
"""Runtime configuration that was used for this pipeline run.
This will only be set if the pipeline run was tracked in a ZenStore.
Returns:
The runtime configuration that was used for this pipeline run.
"""
if self._run_wrapper:
return RuntimeConfiguration(
**self._run_wrapper.runtime_configuration
)
return None
@property
def status(self) -> ExecutionStatus:
"""Returns the current status of the pipeline run.
Returns:
The current status of the pipeline run.
"""
step_statuses = (step.status for step in self.steps)
if any(status == ExecutionStatus.FAILED for status in step_statuses):
return ExecutionStatus.FAILED
elif all(
status == ExecutionStatus.COMPLETED
or status == ExecutionStatus.CACHED
for status in step_statuses
):
return ExecutionStatus.COMPLETED
else:
return ExecutionStatus.RUNNING
@property
def steps(self) -> List[StepView]:
"""Returns all steps that were executed as part of this pipeline run.
Returns:
A list of all steps that were executed as part of this pipeline run.
"""
self._ensure_steps_fetched()
return list(self._steps.values())
def get_step_names(self) -> List[str]:
"""Returns a list of all step names.
Returns:
A list of all step names.
"""
self._ensure_steps_fetched()
return list(self._steps.keys())
def get_step(
self,
step: Optional[Union["BaseStep", Type["BaseStep"], str]] = None,
**kwargs: Any,
) -> StepView:
"""Returns a step for the given name.
Use it in one of these ways:
```python
# Get the step by name
pipeline_run_view.get_step("first_step")
# Get the step by supplying the original step class
pipeline_run_view.get_step(first_step)
# Get the step by supplying an instance of the original step class
pipeline_run_view.get_step(first_step())
```
Args:
step: Class or class instance of the step
**kwargs: The deprecated `name` is caught as a kwarg to
specify the step instead of using the `step` argument.
Returns:
A step for the given name.
Raises:
KeyError: If there is no step with the given name.
RuntimeError: If not step is specified either through the `step` or
the `name` argument.
"""
self._ensure_steps_fetched()
if isinstance(step, str):
step_name = step
elif isinstance(step, zenml.steps.base_step.BaseStep):
step_name = step.name
elif isinstance(step, type) and issubclass(
step, zenml.steps.base_step.BaseStep
):
step_name = step.__name__
elif "name" in kwargs and isinstance(kwargs.get("name"), str):
logger.warning(
"Using 'name' to get a step from "
"'PipelineRunView.get_step()' is deprecated and "
"will be removed in the future. Instead please "
"use 'step' to access a step from your past "
"pipeline runs. Learn more in our API docs: %s",
get_apidocs_link(
"post_execution",
"zenml.post_execution.pipeline_run.PipelineRunView.get_step",
),
)
step_name = kwargs.pop("name")
else:
raise RuntimeError(
"No step specified to get from "
"`PipelineRunView`. Please set a `step` "
"within the `get_step()` method. Learn more in"
" our API docs: %s",
get_apidocs_link(
"post_execution",
"zenml.post_execution.pipeline_run.PipelineRunView.get_step",
),
)
try:
return self._steps[step_name]
except KeyError:
raise KeyError(
f"No step found for name `{step_name}`. This pipeline "
f"run only has steps with the following "
f"names: `{self.get_step_names()}`"
)
def _ensure_steps_fetched(self) -> None:
"""Fetches all steps for this pipeline run from the metadata store."""
if self._steps:
# we already fetched the steps, no need to do anything
return
self._steps = self._metadata_store.get_pipeline_run_steps(self)
if self._run_wrapper:
# If we have the run wrapper from the ZenStore, pass on the step
# wrapper so users can access additional information about the step.
for step_wrapper in self._run_wrapper.pipeline.steps:
if step_wrapper.name in self._steps:
self._steps[step_wrapper.name]._step_wrapper = step_wrapper
def __repr__(self) -> str:
"""Returns a string representation of this pipeline run.
Returns:
A string representation of this pipeline run.
"""
return (
f"{self.__class__.__qualname__}(id={self._id}, "
f"name='{self._name}')"
)
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same pipeline run.
Args:
other: The other object to compare to.
Returns:
True if the other object is referring to the same pipeline run.
"""
if isinstance(other, PipelineRunView):
return (
self._id == other._id
and self._metadata_store.uuid == other._metadata_store.uuid
)
return NotImplemented
git_sha: Optional[str]
property
readonly
Git commit SHA that this pipeline run was performed on.
This will only be set if the pipeline code is in a git repository and there are no dirty files when running the pipeline.
Returns:
Type | Description |
---|---|
Optional[str] |
The git commit SHA that this pipeline run was performed on. |
name: str
property
readonly
Returns the name of the pipeline run.
Returns:
Type | Description |
---|---|
str |
The name of the pipeline run. |
runtime_configuration: Optional[RuntimeConfiguration]
property
readonly
Runtime configuration that was used for this pipeline run.
This will only be set if the pipeline run was tracked in a ZenStore.
Returns:
Type | Description |
---|---|
Optional[RuntimeConfiguration] |
The runtime configuration that was used for this pipeline run. |
status: ExecutionStatus
property
readonly
Returns the current status of the pipeline run.
Returns:
Type | Description |
---|---|
ExecutionStatus |
The current status of the pipeline run. |
steps: List[zenml.post_execution.step.StepView]
property
readonly
Returns all steps that were executed as part of this pipeline run.
Returns:
Type | Description |
---|---|
List[zenml.post_execution.step.StepView] |
A list of all steps that were executed as part of this pipeline run. |
zenml_version: Optional[str]
property
readonly
Version of ZenML that this pipeline run was performed with.
Returns:
Type | Description |
---|---|
Optional[str] |
The version of ZenML that this pipeline run was performed with. |
__eq__(self, other)
special
Returns whether the other object is referring to the same pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
Any |
The other object to compare to. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the other object is referring to the same pipeline run. |
Source code in zenml/post_execution/pipeline_run.py
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same pipeline run.
Args:
other: The other object to compare to.
Returns:
True if the other object is referring to the same pipeline run.
"""
if isinstance(other, PipelineRunView):
return (
self._id == other._id
and self._metadata_store.uuid == other._metadata_store.uuid
)
return NotImplemented
__init__(self, id_, name, executions, metadata_store)
special
Initializes a post-execution pipeline run object.
In most cases PipelineRunView
objects should not be created manually
but retrieved from a PipelineView
object instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id_ |
int |
The context id of this pipeline run. |
required |
name |
str |
The name of this pipeline run. |
required |
executions |
List[ml_metadata.proto.metadata_store_pb2.Execution] |
All executions associated with this pipeline run. |
required |
metadata_store |
BaseMetadataStore |
The metadata store which should be used to fetch additional information related to this pipeline run. |
required |
Source code in zenml/post_execution/pipeline_run.py
def __init__(
self,
id_: int,
name: str,
executions: List[proto.Execution],
metadata_store: "BaseMetadataStore",
):
"""Initializes a post-execution pipeline run object.
In most cases `PipelineRunView` objects should not be created manually
but retrieved from a `PipelineView` object instead.
Args:
id_: The context id of this pipeline run.
name: The name of this pipeline run.
executions: All executions associated with this pipeline run.
metadata_store: The metadata store which should be used to fetch
additional information related to this pipeline run.
"""
self._id = id_
self._name = name
self._metadata_store = metadata_store
self._executions = executions
self._steps: Dict[str, StepView] = OrderedDict()
# This might be set from the parent pipeline view in case this run
# is also tracked in the ZenStore
self._run_wrapper: Optional[PipelineRunWrapper] = None
__repr__(self)
special
Returns a string representation of this pipeline run.
Returns:
Type | Description |
---|---|
str |
A string representation of this pipeline run. |
Source code in zenml/post_execution/pipeline_run.py
def __repr__(self) -> str:
"""Returns a string representation of this pipeline run.
Returns:
A string representation of this pipeline run.
"""
return (
f"{self.__class__.__qualname__}(id={self._id}, "
f"name='{self._name}')"
)
get_step(self, step=None, **kwargs)
Returns a step for the given name.
Use it in one of these ways:
# Get the step by name
pipeline_run_view.get_step("first_step")
# Get the step by supplying the original step class
pipeline_run_view.get_step(first_step)
# Get the step by supplying an instance of the original step class
pipeline_run_view.get_step(first_step())
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step |
Union[BaseStep, Type[BaseStep], str] |
Class or class instance of the step |
None |
**kwargs |
Any |
The deprecated |
{} |
Returns:
Type | Description |
---|---|
StepView |
A step for the given name. |
Exceptions:
Type | Description |
---|---|
KeyError |
If there is no step with the given name. |
RuntimeError |
If not step is specified either through the |
Source code in zenml/post_execution/pipeline_run.py
def get_step(
self,
step: Optional[Union["BaseStep", Type["BaseStep"], str]] = None,
**kwargs: Any,
) -> StepView:
"""Returns a step for the given name.
Use it in one of these ways:
```python
# Get the step by name
pipeline_run_view.get_step("first_step")
# Get the step by supplying the original step class
pipeline_run_view.get_step(first_step)
# Get the step by supplying an instance of the original step class
pipeline_run_view.get_step(first_step())
```
Args:
step: Class or class instance of the step
**kwargs: The deprecated `name` is caught as a kwarg to
specify the step instead of using the `step` argument.
Returns:
A step for the given name.
Raises:
KeyError: If there is no step with the given name.
RuntimeError: If not step is specified either through the `step` or
the `name` argument.
"""
self._ensure_steps_fetched()
if isinstance(step, str):
step_name = step
elif isinstance(step, zenml.steps.base_step.BaseStep):
step_name = step.name
elif isinstance(step, type) and issubclass(
step, zenml.steps.base_step.BaseStep
):
step_name = step.__name__
elif "name" in kwargs and isinstance(kwargs.get("name"), str):
logger.warning(
"Using 'name' to get a step from "
"'PipelineRunView.get_step()' is deprecated and "
"will be removed in the future. Instead please "
"use 'step' to access a step from your past "
"pipeline runs. Learn more in our API docs: %s",
get_apidocs_link(
"post_execution",
"zenml.post_execution.pipeline_run.PipelineRunView.get_step",
),
)
step_name = kwargs.pop("name")
else:
raise RuntimeError(
"No step specified to get from "
"`PipelineRunView`. Please set a `step` "
"within the `get_step()` method. Learn more in"
" our API docs: %s",
get_apidocs_link(
"post_execution",
"zenml.post_execution.pipeline_run.PipelineRunView.get_step",
),
)
try:
return self._steps[step_name]
except KeyError:
raise KeyError(
f"No step found for name `{step_name}`. This pipeline "
f"run only has steps with the following "
f"names: `{self.get_step_names()}`"
)
get_step_names(self)
Returns a list of all step names.
Returns:
Type | Description |
---|---|
List[str] |
A list of all step names. |
Source code in zenml/post_execution/pipeline_run.py
def get_step_names(self) -> List[str]:
"""Returns a list of all step names.
Returns:
A list of all step names.
"""
self._ensure_steps_fetched()
return list(self._steps.keys())
step
Implementation of a post-execution step class.
StepView
Post-execution step class.
This can be used to query artifact information associated with a pipeline step.
Source code in zenml/post_execution/step.py
class StepView:
"""Post-execution step class.
This can be used to query artifact information associated with a pipeline step.
"""
def __init__(
self,
id_: int,
parents_step_ids: List[int],
entrypoint_name: str,
name: str,
parameters: Dict[str, Any],
metadata_store: "BaseMetadataStore",
):
"""Initializes a post-execution step object.
In most cases `StepView` objects should not be created manually
but retrieved from a `PipelineRunView` object instead.
Args:
id_: The execution id of this step.
parents_step_ids: The execution ids of the parents of this step.
entrypoint_name: The name of this step.
name: The name of this step within the pipeline
parameters: Parameters that were used to run this step.
metadata_store: The metadata store which should be used to fetch
additional information related to this step.
"""
self._id = id_
self._parents_step_ids = parents_step_ids
self._entrypoint_name = entrypoint_name
self._name = name
self._parameters = parameters
self._metadata_store = metadata_store
self._inputs: Dict[str, ArtifactView] = {}
self._outputs: Dict[str, ArtifactView] = {}
# This might be set from the parent pipeline run view in case the run
# is also tracked in the ZenStore
self._step_wrapper: Optional[StepWrapper] = None
@property
def id(self) -> int:
"""Returns the step id.
Returns:
The step id.
"""
return self._id
@property
def parents_step_ids(self) -> List[int]:
"""Returns a list of IDs of all parents of this step.
Returns:
A list of IDs of all parents of this step.
"""
return self._parents_step_ids
@property
def parent_steps(self) -> List["StepView"]:
"""Returns a list of all parent steps of this step.
Returns:
A list of all parent steps of this step.
"""
steps = [
self._metadata_store.get_step_by_id(s)
for s in self.parents_step_ids
]
return steps
@property
def entrypoint_name(self) -> str:
"""Returns the step entrypoint_name.
This name is equal to the name argument passed to the @step decorator
or the actual function name if no explicit name was given.
Examples:
# the step entrypoint_name will be "my_step"
@step(name="my_step")
def my_step_function(...)
# the step entrypoint_name will be "my_step_function"
@step
def my_step_function(...)
Returns:
The step entrypoint_name.
"""
return self._entrypoint_name
@property
def name(self) -> str:
"""Returns the name as it is defined in the pipeline.
This name is equal to the name given to the step within the pipeline
context
Examples:
@step()
def my_step_function(...)
@pipeline
def my_pipeline_function(step_a)
p = my_pipeline_function(
step_a = my_step_function()
)
The name will be `step_a`
Returns:
The name of this step.
"""
return self._name
@property
def docstring(self) -> Optional[str]:
"""Docstring of the step function or class.
Returns:
The docstring of the step function or class.
"""
if self._step_wrapper:
return self._step_wrapper.docstring
return None
@property
def parameters(self) -> Dict[str, Any]:
"""The parameters used to run this step.
Returns:
The parameters used to run this step.
"""
return self._parameters
@property
def status(self) -> ExecutionStatus:
"""Returns the current status of the step.
Returns:
The current status of the step.
"""
return self._metadata_store.get_step_status(self)
@property
def is_cached(self) -> bool:
"""Returns whether the step is cached or not.
Returns:
True if the step is cached, False otherwise.
"""
return self.status == ExecutionStatus.CACHED
@property
def is_completed(self) -> bool:
"""Returns whether the step is cached or not.
Returns:
True if the step is completed, False otherwise.
"""
return self.status == ExecutionStatus.COMPLETED
@property
def inputs(self) -> Dict[str, ArtifactView]:
"""Returns all input artifacts that were used to run this step.
Returns:
A dictionary of artifact names to artifact views.
"""
self._ensure_inputs_outputs_fetched()
return self._inputs
@property
def input(self) -> ArtifactView:
"""Returns the input artifact that was used to run this step.
Returns:
The input artifact.
Raises:
ValueError: If there were zero or multiple inputs to this step.
"""
if len(self.inputs) != 1:
raise ValueError(
"Can't use the `StepView.input` property for steps with zero "
"or multiple inputs, use `StepView.inputs` instead."
)
return next(iter(self.inputs.values()))
@property
def outputs(self) -> Dict[str, ArtifactView]:
"""Returns all output artifacts that were written by this step.
Returns:
A dictionary of artifact names to artifact views.
"""
self._ensure_inputs_outputs_fetched()
return self._outputs
@property
def output(self) -> ArtifactView:
"""Returns the output artifact that was written by this step.
Returns:
The output artifact.
Raises:
ValueError: If there were zero or multiple step outputs.
"""
if len(self.outputs) != 1:
raise ValueError(
"Can't use the `StepView.output` property for steps with zero "
"or multiple outputs, use `StepView.outputs` instead."
)
return next(iter(self.outputs.values()))
def _ensure_inputs_outputs_fetched(self) -> None:
"""Fetches all step inputs and outputs from the metadata store."""
if self._inputs or self._outputs:
# we already fetched inputs/outputs, no need to do anything
return
self._inputs, self._outputs = self._metadata_store.get_step_artifacts(
self
)
def __repr__(self) -> str:
"""Returns a string representation of this step.
Returns:
A string representation of this step.
"""
return (
f"{self.__class__.__qualname__}(id={self._id}, "
f"name='{self.name}', entrypoint_name='{self.entrypoint_name}'"
f"parameters={self._parameters})"
)
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same step.
Args:
other: The other object to compare to.
Returns:
True if the other object is referring to the same step, False
otherwise.
"""
if isinstance(other, StepView):
return (
self._id == other._id
and self._metadata_store.uuid == other._metadata_store.uuid
)
return NotImplemented
docstring: Optional[str]
property
readonly
Docstring of the step function or class.
Returns:
Type | Description |
---|---|
Optional[str] |
The docstring of the step function or class. |
entrypoint_name: str
property
readonly
Returns the step entrypoint_name.
This name is equal to the name argument passed to the @step decorator or the actual function name if no explicit name was given.
Examples:
the step entrypoint_name will be "my_step"
@step(name="my_step") def my_step_function(...)
the step entrypoint_name will be "my_step_function"
@step def my_step_function(...)
Returns:
Type | Description |
---|---|
str |
The step entrypoint_name. |
id: int
property
readonly
Returns the step id.
Returns:
Type | Description |
---|---|
int |
The step id. |
input: ArtifactView
property
readonly
Returns the input artifact that was used to run this step.
Returns:
Type | Description |
---|---|
ArtifactView |
The input artifact. |
Exceptions:
Type | Description |
---|---|
ValueError |
If there were zero or multiple inputs to this step. |
inputs: Dict[str, zenml.post_execution.artifact.ArtifactView]
property
readonly
Returns all input artifacts that were used to run this step.
Returns:
Type | Description |
---|---|
Dict[str, zenml.post_execution.artifact.ArtifactView] |
A dictionary of artifact names to artifact views. |
is_cached: bool
property
readonly
Returns whether the step is cached or not.
Returns:
Type | Description |
---|---|
bool |
True if the step is cached, False otherwise. |
is_completed: bool
property
readonly
Returns whether the step is cached or not.
Returns:
Type | Description |
---|---|
bool |
True if the step is completed, False otherwise. |
name: str
property
readonly
Returns the name as it is defined in the pipeline.
This name is equal to the name given to the step within the pipeline context
Examples:
@step() def my_step_function(...)
@pipeline def my_pipeline_function(step_a)
p = my_pipeline_function( step_a = my_step_function() )
The name will be step_a
Returns:
Type | Description |
---|---|
str |
The name of this step. |
output: ArtifactView
property
readonly
Returns the output artifact that was written by this step.
Returns:
Type | Description |
---|---|
ArtifactView |
The output artifact. |
Exceptions:
Type | Description |
---|---|
ValueError |
If there were zero or multiple step outputs. |
outputs: Dict[str, zenml.post_execution.artifact.ArtifactView]
property
readonly
Returns all output artifacts that were written by this step.
Returns:
Type | Description |
---|---|
Dict[str, zenml.post_execution.artifact.ArtifactView] |
A dictionary of artifact names to artifact views. |
parameters: Dict[str, Any]
property
readonly
The parameters used to run this step.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The parameters used to run this step. |
parent_steps: List[StepView]
property
readonly
Returns a list of all parent steps of this step.
Returns:
Type | Description |
---|---|
List[StepView] |
A list of all parent steps of this step. |
parents_step_ids: List[int]
property
readonly
Returns a list of IDs of all parents of this step.
Returns:
Type | Description |
---|---|
List[int] |
A list of IDs of all parents of this step. |
status: ExecutionStatus
property
readonly
Returns the current status of the step.
Returns:
Type | Description |
---|---|
ExecutionStatus |
The current status of the step. |
__eq__(self, other)
special
Returns whether the other object is referring to the same step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
Any |
The other object to compare to. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the other object is referring to the same step, False otherwise. |
Source code in zenml/post_execution/step.py
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same step.
Args:
other: The other object to compare to.
Returns:
True if the other object is referring to the same step, False
otherwise.
"""
if isinstance(other, StepView):
return (
self._id == other._id
and self._metadata_store.uuid == other._metadata_store.uuid
)
return NotImplemented
__init__(self, id_, parents_step_ids, entrypoint_name, name, parameters, metadata_store)
special
Initializes a post-execution step object.
In most cases StepView
objects should not be created manually
but retrieved from a PipelineRunView
object instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id_ |
int |
The execution id of this step. |
required |
parents_step_ids |
List[int] |
The execution ids of the parents of this step. |
required |
entrypoint_name |
str |
The name of this step. |
required |
name |
str |
The name of this step within the pipeline |
required |
parameters |
Dict[str, Any] |
Parameters that were used to run this step. |
required |
metadata_store |
BaseMetadataStore |
The metadata store which should be used to fetch additional information related to this step. |
required |
Source code in zenml/post_execution/step.py
def __init__(
self,
id_: int,
parents_step_ids: List[int],
entrypoint_name: str,
name: str,
parameters: Dict[str, Any],
metadata_store: "BaseMetadataStore",
):
"""Initializes a post-execution step object.
In most cases `StepView` objects should not be created manually
but retrieved from a `PipelineRunView` object instead.
Args:
id_: The execution id of this step.
parents_step_ids: The execution ids of the parents of this step.
entrypoint_name: The name of this step.
name: The name of this step within the pipeline
parameters: Parameters that were used to run this step.
metadata_store: The metadata store which should be used to fetch
additional information related to this step.
"""
self._id = id_
self._parents_step_ids = parents_step_ids
self._entrypoint_name = entrypoint_name
self._name = name
self._parameters = parameters
self._metadata_store = metadata_store
self._inputs: Dict[str, ArtifactView] = {}
self._outputs: Dict[str, ArtifactView] = {}
# This might be set from the parent pipeline run view in case the run
# is also tracked in the ZenStore
self._step_wrapper: Optional[StepWrapper] = None
__repr__(self)
special
Returns a string representation of this step.
Returns:
Type | Description |
---|---|
str |
A string representation of this step. |
Source code in zenml/post_execution/step.py
def __repr__(self) -> str:
"""Returns a string representation of this step.
Returns:
A string representation of this step.
"""
return (
f"{self.__class__.__qualname__}(id={self._id}, "
f"name='{self.name}', entrypoint_name='{self.entrypoint_name}'"
f"parameters={self._parameters})"
)