Lineage Graph
zenml.lineage_graph
special
Initialization of lineage generation module.
edge
Class for Edges in a lineage graph.
Edge (BaseModel)
pydantic-model
A class that represents an edge in a lineage graph.
Source code in zenml/lineage_graph/edge.py
class Edge(BaseModel):
"""A class that represents an edge in a lineage graph."""
id: str
source: str
target: str
lineage_graph
Class for lineage graph generation.
LineageGraph (BaseModel)
pydantic-model
A lineage graph representation of a PipelineRunResponseModel.
Source code in zenml/lineage_graph/lineage_graph.py
class LineageGraph(BaseModel):
"""A lineage graph representation of a PipelineRunResponseModel."""
nodes: List[Union[StepNode, ArtifactNode]] = []
edges: List[Edge] = []
root_step_id: Optional[str] = None
run_metadata: List[Tuple[str, str, str]] = []
def generate_step_nodes_and_edges(
self, step: StepRunResponseModel
) -> None:
"""Generates the step nodes and the edges between them.
Args:
step: The step to generate the nodes and edges for.
"""
step_id = STEP_PREFIX + str(step.id)
if self.root_step_id is None:
self.root_step_id = step_id
step_config = step.config.dict()
if step_config:
step_config = {
key: value
for key, value in step_config.items()
if key not in ["inputs", "outputs", "parameters"] and value
}
self.nodes.append(
StepNode(
id=step_id,
data=StepNodeDetails(
execution_id=str(step.id),
name=step.name, # redundant for consistency
status=step.status,
entrypoint_name=step.config.name, # redundant for consistency
parameters=step.config.parameters,
configuration=step_config,
inputs={k: v.uri for k, v in step.inputs.items()},
outputs={k: v.uri for k, v in step.outputs.items()},
metadata=[
(m.key, str(m.value), str(m.type))
for m in step.metadata.values()
],
),
)
)
for artifact_name, artifact in step.outputs.items():
artifact_id = ARTIFACT_PREFIX + str(artifact.id)
self.nodes.append(
ArtifactNode(
id=artifact_id,
data=ArtifactNodeDetails(
execution_id=str(artifact.id),
name=artifact_name,
status=step.status,
is_cached=step.status == ExecutionStatus.CACHED,
artifact_type=artifact.type,
artifact_data_type=artifact.data_type.import_path,
parent_step_id=str(step.id),
producer_step_id=str(artifact.producer_step_run_id),
uri=artifact.uri,
metadata=[
(m.key, str(m.value), str(m.type))
for m in artifact.metadata.values()
],
),
)
)
self.edges.append(
Edge(
id=step_id + "_" + artifact_id,
source=step_id,
target=artifact_id,
)
)
for artifact_name, artifact in step.inputs.items():
artifact_id = ARTIFACT_PREFIX + str(artifact.id)
self.edges.append(
Edge(
id=step_id + "_" + artifact_id,
source=artifact_id,
target=step_id,
)
)
def generate_run_nodes_and_edges(
self, run: PipelineRunResponseModel
) -> None:
"""Generates the run nodes and the edges between them.
Args:
run: The PipelineRunResponseModel to generate the lineage graph for.
"""
self.run_metadata = [
(m.key, str(m.value), str(m.type)) for m in run.metadata.values()
]
for step in run.steps.values():
self.generate_step_nodes_and_edges(step)
generate_run_nodes_and_edges(self, run)
Generates the run nodes and the edges between them.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run |
PipelineRunResponseModel |
The PipelineRunResponseModel to generate the lineage graph for. |
required |
Source code in zenml/lineage_graph/lineage_graph.py
def generate_run_nodes_and_edges(
self, run: PipelineRunResponseModel
) -> None:
"""Generates the run nodes and the edges between them.
Args:
run: The PipelineRunResponseModel to generate the lineage graph for.
"""
self.run_metadata = [
(m.key, str(m.value), str(m.type)) for m in run.metadata.values()
]
for step in run.steps.values():
self.generate_step_nodes_and_edges(step)
generate_step_nodes_and_edges(self, step)
Generates the step nodes and the edges between them.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step |
StepRunResponseModel |
The step to generate the nodes and edges for. |
required |
Source code in zenml/lineage_graph/lineage_graph.py
def generate_step_nodes_and_edges(
self, step: StepRunResponseModel
) -> None:
"""Generates the step nodes and the edges between them.
Args:
step: The step to generate the nodes and edges for.
"""
step_id = STEP_PREFIX + str(step.id)
if self.root_step_id is None:
self.root_step_id = step_id
step_config = step.config.dict()
if step_config:
step_config = {
key: value
for key, value in step_config.items()
if key not in ["inputs", "outputs", "parameters"] and value
}
self.nodes.append(
StepNode(
id=step_id,
data=StepNodeDetails(
execution_id=str(step.id),
name=step.name, # redundant for consistency
status=step.status,
entrypoint_name=step.config.name, # redundant for consistency
parameters=step.config.parameters,
configuration=step_config,
inputs={k: v.uri for k, v in step.inputs.items()},
outputs={k: v.uri for k, v in step.outputs.items()},
metadata=[
(m.key, str(m.value), str(m.type))
for m in step.metadata.values()
],
),
)
)
for artifact_name, artifact in step.outputs.items():
artifact_id = ARTIFACT_PREFIX + str(artifact.id)
self.nodes.append(
ArtifactNode(
id=artifact_id,
data=ArtifactNodeDetails(
execution_id=str(artifact.id),
name=artifact_name,
status=step.status,
is_cached=step.status == ExecutionStatus.CACHED,
artifact_type=artifact.type,
artifact_data_type=artifact.data_type.import_path,
parent_step_id=str(step.id),
producer_step_id=str(artifact.producer_step_run_id),
uri=artifact.uri,
metadata=[
(m.key, str(m.value), str(m.type))
for m in artifact.metadata.values()
],
),
)
)
self.edges.append(
Edge(
id=step_id + "_" + artifact_id,
source=step_id,
target=artifact_id,
)
)
for artifact_name, artifact in step.inputs.items():
artifact_id = ARTIFACT_PREFIX + str(artifact.id)
self.edges.append(
Edge(
id=step_id + "_" + artifact_id,
source=artifact_id,
target=step_id,
)
)
node
special
Initialization of lineage nodes.
artifact_node
Class for all lineage artifact nodes.
ArtifactNode (BaseNode)
pydantic-model
A class that represents an artifact node in a lineage graph.
Source code in zenml/lineage_graph/node/artifact_node.py
class ArtifactNode(BaseNode):
"""A class that represents an artifact node in a lineage graph."""
type: str = "artifact"
data: ArtifactNodeDetails
ArtifactNodeDetails (BaseNodeDetails)
pydantic-model
Captures all artifact details for the node.
Source code in zenml/lineage_graph/node/artifact_node.py
class ArtifactNodeDetails(BaseNodeDetails):
"""Captures all artifact details for the node."""
is_cached: bool
artifact_type: str
artifact_data_type: str
parent_step_id: str
producer_step_id: Optional[str]
uri: str
metadata: List[Tuple[str, str, str]] # (key, value, type)
base_node
Base class for all lineage nodes.
BaseNode (BaseModel)
pydantic-model
A class that represents a node in a lineage graph.
Source code in zenml/lineage_graph/node/base_node.py
class BaseNode(BaseModel):
"""A class that represents a node in a lineage graph."""
id: str
type: str
data: BaseNodeDetails
BaseNodeDetails (BaseModel)
pydantic-model
Captures all details for the node.
Source code in zenml/lineage_graph/node/base_node.py
class BaseNodeDetails(BaseModel):
"""Captures all details for the node."""
execution_id: str
name: str
status: ExecutionStatus
step_node
Class for all lineage step nodes.
StepNode (BaseNode)
pydantic-model
A class that represents a step node in a lineage graph.
Source code in zenml/lineage_graph/node/step_node.py
class StepNode(BaseNode):
"""A class that represents a step node in a lineage graph."""
type: str = "step"
data: StepNodeDetails
StepNodeDetails (BaseNodeDetails)
pydantic-model
Captures all artifact details for the node.
Source code in zenml/lineage_graph/node/step_node.py
class StepNodeDetails(BaseNodeDetails):
"""Captures all artifact details for the node."""
entrypoint_name: str
parameters: Dict[str, Any]
configuration: Dict[str, Any]
inputs: Dict[str, Any]
outputs: Dict[str, Any]
metadata: List[Tuple[str, str, str]] # (key, value, type)