Skip to content

Lineage Graph

zenml.lineage_graph special

Initialization of lineage generation module.

edge

Class for Edges in a lineage graph.

Edge (BaseModel)

A class that represents an edge in a lineage graph.

Source code in zenml/lineage_graph/edge.py
class Edge(BaseModel):
    """A class that represents an edge in a lineage graph."""

    id: str
    source: str
    target: str

lineage_graph

Class for lineage graph generation.

LineageGraph (BaseModel)

A lineage graph representation of a PipelineRunResponseModel.

Source code in zenml/lineage_graph/lineage_graph.py
class LineageGraph(BaseModel):
    """A lineage graph representation of a PipelineRunResponseModel."""

    nodes: List[Union[StepNode, ArtifactNode]] = []
    edges: List[Edge] = []
    root_step_id: Optional[str] = None
    run_metadata: List[Tuple[str, str, str]] = []

    def generate_run_nodes_and_edges(self, run: "PipelineRunResponse") -> None:
        """Initializes a lineage graph from a pipeline run.

        Args:
            run: The PipelineRunResponseModel to generate the lineage graph for.
        """
        self.run_metadata = [
            (m.key, str(m.value), str(m.type))
            for m in run.run_metadata.values()
        ]

        for step in run.steps.values():
            self.generate_step_nodes_and_edges(step)

        self.add_external_artifacts(run)
        self.add_direct_edges(run)

    def generate_step_nodes_and_edges(self, step: "StepRunResponse") -> None:
        """Generates the nodes and edges for a step and its artifacts.

        Args:
            step: The step to generate the nodes and edges for.
        """
        step_id = STEP_PREFIX + str(step.id)

        # Set a root step if it doesn't exist yet
        if self.root_step_id is None:
            self.root_step_id = step_id

        # Add the step node
        self.add_step_node(step, step_id)

        # Add nodes and edges for all output artifacts
        for artifact_name, artifact_version in step.outputs.items():
            artifact_version_id = ARTIFACT_PREFIX + str(artifact_version.id)
            if step.status == ExecutionStatus.CACHED:
                artifact_status = ArtifactNodeStatus.CACHED
            elif step.status == ExecutionStatus.COMPLETED:
                artifact_status = ArtifactNodeStatus.CREATED
            else:
                artifact_status = ArtifactNodeStatus.UNKNOWN
            self.add_artifact_node(
                artifact=artifact_version,
                id=artifact_version_id,
                name=artifact_name,
                step_id=str(step_id),
                status=artifact_status,
            )
            self.add_edge(step_id, artifact_version_id)

        # Add nodes and edges for all input artifacts
        for artifact_name, artifact_version in step.inputs.items():
            artifact_version_id = ARTIFACT_PREFIX + str(artifact_version.id)
            self.add_edge(artifact_version_id, step_id)

    def add_external_artifacts(self, run: "PipelineRunResponse") -> None:
        """Adds all external artifacts to the lineage graph.

        Args:
            run: The pipeline run to add external artifacts for.
        """
        nodes_ids = {node.id for node in self.nodes}
        for step in run.steps.values():
            for artifact_name, artifact_version in step.inputs.items():
                artifact_version_id = ARTIFACT_PREFIX + str(
                    artifact_version.id
                )
                if artifact_version_id not in nodes_ids:
                    self.add_artifact_node(
                        artifact=artifact_version,
                        id=artifact_version_id,
                        name=artifact_name,
                        step_id=str(artifact_version.producer_step_run_id),
                        status=ArtifactNodeStatus.EXTERNAL,
                    )

    def add_direct_edges(self, run: "PipelineRunResponse") -> None:
        """Add all direct edges between nodes generated by `after=...`.

        Args:
            run: The pipeline run to add direct edges for.
        """
        for step in run.steps.values():
            step_id = STEP_PREFIX + str(step.id)
            for parent_step_id_uuid in step.parent_step_ids:
                parent_step_id = STEP_PREFIX + str(parent_step_id_uuid)
                if not self.has_artifact_link(step_id, parent_step_id):
                    self.add_edge(parent_step_id, step_id)

    def has_artifact_link(self, step_id: str, parent_step_id: str) -> bool:
        """Checks if a step has an artifact link to a parent step.

        This is the case for all parent steps that were not specified via
        `after=...`.

        Args:
            step_id: The node ID of the step to check.
            parent_step_id: T node ID of the parent step to check.

        Returns:
            True if the steps are linked via an artifact, False otherwise.
        """
        parent_outputs, child_inputs = set(), set()
        for edge in self.edges:
            if edge.source == parent_step_id:
                parent_outputs.add(edge.target)
            if edge.target == step_id:
                child_inputs.add(edge.source)
        return bool(parent_outputs.intersection(child_inputs))

    def add_step_node(
        self,
        step: "StepRunResponse",
        id: str,
    ) -> None:
        """Adds a step node to the lineage graph.

        Args:
            step: The step to add a node for.
            id: The id of the step node.
        """
        step_config = step.config.model_dump()
        if step_config:
            step_config = {
                key: value
                for key, value in step_config.items()
                if key not in ["inputs", "outputs", "parameters"] and value
            }
        self.nodes.append(
            StepNode(
                id=id,
                data=StepNodeDetails(
                    execution_id=str(step.id),
                    name=step.name,  # redundant for consistency
                    status=step.status,
                    entrypoint_name=step.config.name,  # redundant for consistency
                    parameters=step.config.parameters,
                    configuration=step_config,
                    inputs={k: v.uri for k, v in step.inputs.items()},
                    outputs={k: v.uri for k, v in step.outputs.items()},
                    metadata=[
                        (m.key, str(m.value), str(m.type))
                        for m in step.run_metadata.values()
                    ],
                ),
            )
        )

    def add_artifact_node(
        self,
        artifact: "ArtifactVersionResponse",
        id: str,
        name: str,
        step_id: str,
        status: ArtifactNodeStatus,
    ) -> None:
        """Adds an artifact node to the lineage graph.

        Args:
            artifact: The artifact to add a node for.
            id: The id of the artifact node.
            name: The input or output name of the artifact.
            step_id: The id of the step that produced the artifact.
            status: The status of the step that produced the artifact.
        """
        node = ArtifactNode(
            id=id,
            data=ArtifactNodeDetails(
                execution_id=str(artifact.id),
                name=name,
                status=status,
                is_cached=status == ArtifactNodeStatus.CACHED,
                artifact_type=artifact.type,
                artifact_data_type=artifact.data_type.import_path,
                parent_step_id=step_id,
                producer_step_id=str(artifact.producer_step_run_id),
                uri=artifact.uri,
                metadata=[
                    (m.key, str(m.value), str(m.type))
                    for m in artifact.run_metadata.values()
                ],
            ),
        )
        self.nodes.append(node)

    def add_edge(self, source: str, target: str) -> None:
        """Adds an edge to the lineage graph.

        Args:
            source: The source node id.
            target: The target node id.
        """
        self.edges.append(
            Edge(id=source + "_" + target, source=source, target=target)
        )
add_artifact_node(self, artifact, id, name, step_id, status)

Adds an artifact node to the lineage graph.

Parameters:

Name Type Description Default
artifact ArtifactVersionResponse

The artifact to add a node for.

required
id str

The id of the artifact node.

required
name str

The input or output name of the artifact.

required
step_id str

The id of the step that produced the artifact.

required
status ArtifactNodeStatus

The status of the step that produced the artifact.

required
Source code in zenml/lineage_graph/lineage_graph.py
def add_artifact_node(
    self,
    artifact: "ArtifactVersionResponse",
    id: str,
    name: str,
    step_id: str,
    status: ArtifactNodeStatus,
) -> None:
    """Adds an artifact node to the lineage graph.

    Args:
        artifact: The artifact to add a node for.
        id: The id of the artifact node.
        name: The input or output name of the artifact.
        step_id: The id of the step that produced the artifact.
        status: The status of the step that produced the artifact.
    """
    node = ArtifactNode(
        id=id,
        data=ArtifactNodeDetails(
            execution_id=str(artifact.id),
            name=name,
            status=status,
            is_cached=status == ArtifactNodeStatus.CACHED,
            artifact_type=artifact.type,
            artifact_data_type=artifact.data_type.import_path,
            parent_step_id=step_id,
            producer_step_id=str(artifact.producer_step_run_id),
            uri=artifact.uri,
            metadata=[
                (m.key, str(m.value), str(m.type))
                for m in artifact.run_metadata.values()
            ],
        ),
    )
    self.nodes.append(node)
add_direct_edges(self, run)

Add all direct edges between nodes generated by after=....

Parameters:

Name Type Description Default
run PipelineRunResponse

The pipeline run to add direct edges for.

required
Source code in zenml/lineage_graph/lineage_graph.py
def add_direct_edges(self, run: "PipelineRunResponse") -> None:
    """Add all direct edges between nodes generated by `after=...`.

    Args:
        run: The pipeline run to add direct edges for.
    """
    for step in run.steps.values():
        step_id = STEP_PREFIX + str(step.id)
        for parent_step_id_uuid in step.parent_step_ids:
            parent_step_id = STEP_PREFIX + str(parent_step_id_uuid)
            if not self.has_artifact_link(step_id, parent_step_id):
                self.add_edge(parent_step_id, step_id)
add_edge(self, source, target)

Adds an edge to the lineage graph.

Parameters:

Name Type Description Default
source str

The source node id.

required
target str

The target node id.

required
Source code in zenml/lineage_graph/lineage_graph.py
def add_edge(self, source: str, target: str) -> None:
    """Adds an edge to the lineage graph.

    Args:
        source: The source node id.
        target: The target node id.
    """
    self.edges.append(
        Edge(id=source + "_" + target, source=source, target=target)
    )
add_external_artifacts(self, run)

Adds all external artifacts to the lineage graph.

Parameters:

Name Type Description Default
run PipelineRunResponse

The pipeline run to add external artifacts for.

required
Source code in zenml/lineage_graph/lineage_graph.py
def add_external_artifacts(self, run: "PipelineRunResponse") -> None:
    """Adds all external artifacts to the lineage graph.

    Args:
        run: The pipeline run to add external artifacts for.
    """
    nodes_ids = {node.id for node in self.nodes}
    for step in run.steps.values():
        for artifact_name, artifact_version in step.inputs.items():
            artifact_version_id = ARTIFACT_PREFIX + str(
                artifact_version.id
            )
            if artifact_version_id not in nodes_ids:
                self.add_artifact_node(
                    artifact=artifact_version,
                    id=artifact_version_id,
                    name=artifact_name,
                    step_id=str(artifact_version.producer_step_run_id),
                    status=ArtifactNodeStatus.EXTERNAL,
                )
add_step_node(self, step, id)

Adds a step node to the lineage graph.

Parameters:

Name Type Description Default
step StepRunResponse

The step to add a node for.

required
id str

The id of the step node.

required
Source code in zenml/lineage_graph/lineage_graph.py
def add_step_node(
    self,
    step: "StepRunResponse",
    id: str,
) -> None:
    """Adds a step node to the lineage graph.

    Args:
        step: The step to add a node for.
        id: The id of the step node.
    """
    step_config = step.config.model_dump()
    if step_config:
        step_config = {
            key: value
            for key, value in step_config.items()
            if key not in ["inputs", "outputs", "parameters"] and value
        }
    self.nodes.append(
        StepNode(
            id=id,
            data=StepNodeDetails(
                execution_id=str(step.id),
                name=step.name,  # redundant for consistency
                status=step.status,
                entrypoint_name=step.config.name,  # redundant for consistency
                parameters=step.config.parameters,
                configuration=step_config,
                inputs={k: v.uri for k, v in step.inputs.items()},
                outputs={k: v.uri for k, v in step.outputs.items()},
                metadata=[
                    (m.key, str(m.value), str(m.type))
                    for m in step.run_metadata.values()
                ],
            ),
        )
    )
generate_run_nodes_and_edges(self, run)

Initializes a lineage graph from a pipeline run.

Parameters:

Name Type Description Default
run PipelineRunResponse

The PipelineRunResponseModel to generate the lineage graph for.

required
Source code in zenml/lineage_graph/lineage_graph.py
def generate_run_nodes_and_edges(self, run: "PipelineRunResponse") -> None:
    """Initializes a lineage graph from a pipeline run.

    Args:
        run: The PipelineRunResponseModel to generate the lineage graph for.
    """
    self.run_metadata = [
        (m.key, str(m.value), str(m.type))
        for m in run.run_metadata.values()
    ]

    for step in run.steps.values():
        self.generate_step_nodes_and_edges(step)

    self.add_external_artifacts(run)
    self.add_direct_edges(run)
generate_step_nodes_and_edges(self, step)

Generates the nodes and edges for a step and its artifacts.

Parameters:

Name Type Description Default
step StepRunResponse

The step to generate the nodes and edges for.

required
Source code in zenml/lineage_graph/lineage_graph.py
def generate_step_nodes_and_edges(self, step: "StepRunResponse") -> None:
    """Generates the nodes and edges for a step and its artifacts.

    Args:
        step: The step to generate the nodes and edges for.
    """
    step_id = STEP_PREFIX + str(step.id)

    # Set a root step if it doesn't exist yet
    if self.root_step_id is None:
        self.root_step_id = step_id

    # Add the step node
    self.add_step_node(step, step_id)

    # Add nodes and edges for all output artifacts
    for artifact_name, artifact_version in step.outputs.items():
        artifact_version_id = ARTIFACT_PREFIX + str(artifact_version.id)
        if step.status == ExecutionStatus.CACHED:
            artifact_status = ArtifactNodeStatus.CACHED
        elif step.status == ExecutionStatus.COMPLETED:
            artifact_status = ArtifactNodeStatus.CREATED
        else:
            artifact_status = ArtifactNodeStatus.UNKNOWN
        self.add_artifact_node(
            artifact=artifact_version,
            id=artifact_version_id,
            name=artifact_name,
            step_id=str(step_id),
            status=artifact_status,
        )
        self.add_edge(step_id, artifact_version_id)

    # Add nodes and edges for all input artifacts
    for artifact_name, artifact_version in step.inputs.items():
        artifact_version_id = ARTIFACT_PREFIX + str(artifact_version.id)
        self.add_edge(artifact_version_id, step_id)

Checks if a step has an artifact link to a parent step.

This is the case for all parent steps that were not specified via after=....

Parameters:

Name Type Description Default
step_id str

The node ID of the step to check.

required
parent_step_id str

T node ID of the parent step to check.

required

Returns:

Type Description
bool

True if the steps are linked via an artifact, False otherwise.

Source code in zenml/lineage_graph/lineage_graph.py
def has_artifact_link(self, step_id: str, parent_step_id: str) -> bool:
    """Checks if a step has an artifact link to a parent step.

    This is the case for all parent steps that were not specified via
    `after=...`.

    Args:
        step_id: The node ID of the step to check.
        parent_step_id: T node ID of the parent step to check.

    Returns:
        True if the steps are linked via an artifact, False otherwise.
    """
    parent_outputs, child_inputs = set(), set()
    for edge in self.edges:
        if edge.source == parent_step_id:
            parent_outputs.add(edge.target)
        if edge.target == step_id:
            child_inputs.add(edge.source)
    return bool(parent_outputs.intersection(child_inputs))

node special

Initialization of lineage nodes.

artifact_node

Class for all lineage artifact nodes.

ArtifactNode (BaseNode)

A class that represents an artifact node in a lineage graph.

Source code in zenml/lineage_graph/node/artifact_node.py
class ArtifactNode(BaseNode):
    """A class that represents an artifact node in a lineage graph."""

    type: str = "artifact"
    data: ArtifactNodeDetails
ArtifactNodeDetails (BaseNodeDetails)

Captures all artifact details for the node.

Source code in zenml/lineage_graph/node/artifact_node.py
class ArtifactNodeDetails(BaseNodeDetails):
    """Captures all artifact details for the node."""

    status: ArtifactNodeStatus
    is_cached: bool
    artifact_type: str
    artifact_data_type: str
    parent_step_id: str
    producer_step_id: Optional[str]
    uri: str
    metadata: List[Tuple[str, str, str]]  # (key, value, type)
ArtifactNodeStatus (StrEnum)

Enum that represents the status of an artifact.

Source code in zenml/lineage_graph/node/artifact_node.py
class ArtifactNodeStatus(StrEnum):
    """Enum that represents the status of an artifact."""

    CACHED = "cached"
    CREATED = "created"
    EXTERNAL = "external"
    UNKNOWN = "unknown"

base_node

Base class for all lineage nodes.

BaseNode (BaseModel)

A class that represents a node in a lineage graph.

Source code in zenml/lineage_graph/node/base_node.py
class BaseNode(BaseModel):
    """A class that represents a node in a lineage graph."""

    id: str
    type: str
    data: BaseNodeDetails
BaseNodeDetails (BaseModel)

Captures all details for the node.

Source code in zenml/lineage_graph/node/base_node.py
class BaseNodeDetails(BaseModel):
    """Captures all details for the node."""

    execution_id: str
    name: str

step_node

Class for all lineage step nodes.

StepNode (BaseNode)

A class that represents a step node in a lineage graph.

Source code in zenml/lineage_graph/node/step_node.py
class StepNode(BaseNode):
    """A class that represents a step node in a lineage graph."""

    type: str = "step"
    data: StepNodeDetails
StepNodeDetails (BaseNodeDetails)

Captures all artifact details for the node.

Source code in zenml/lineage_graph/node/step_node.py
class StepNodeDetails(BaseNodeDetails):
    """Captures all artifact details for the node."""

    status: ExecutionStatus
    entrypoint_name: str
    parameters: Dict[str, Any]
    configuration: Dict[str, Any]
    inputs: Dict[str, Any]
    outputs: Dict[str, Any]
    metadata: List[Tuple[str, str, str]]  # (key, value, type)