Skip to content

Orchestrators

zenml.orchestrators special

Initialization for ZenML orchestrators.

An orchestrator is a special kind of backend that manages the running of each step of the pipeline. Orchestrators administer the actual pipeline runs. You can think of it as the 'root' of any pipeline job that you run during your experimentation.

ZenML supports a local orchestrator out of the box which allows you to run your pipelines in a local environment. We also support using Apache Airflow as the orchestrator to handle the steps of your pipeline.

base_orchestrator

Base orchestrator class.

BaseOrchestrator (StackComponent, ABC) pydantic-model

Base class for all orchestrators.

In order to implement an orchestrator you will need to subclass from this class.

How it works:

The run() method is the entrypoint that is executed when the pipeline's run method is called within the user code (pipeline_instance.run()).

This method will take the ZenML Pipeline instance and prepare it for eventual execution. To do this the following steps are taken:

  • The underlying protobuf pipeline is created.

  • Within the _configure_node_context() method the pipeline requirements, stack and runtime configuration is added to the step context

  • The _get_sorted_steps() method then generates a sorted list of steps which will later be used to directly execute these steps in order, or to easily build a dag

  • After these initial steps comes the most crucial one. Within the prepare_or_run_pipeline() method each orchestrator will have its own implementation that dictates the pipeline orchestration. In the simplest case this method will iterate through all steps and execute them one by one. In other cases this method will build and deploy an intermediate representation of the pipeline (e.g an airflow dag or a kubeflow pipelines yaml) to be executed within the orchestrators environment.

Building your own:

In order to build your own orchestrator, all you need to do is subclass from this class and implement your own prepare_or_run_pipeline() method. Overwriting other methods is NOT recommended but possible. See the docstring of the prepare_or_run_pipeline() method to find out details of what needs to be implemented within it.

Source code in zenml/orchestrators/base_orchestrator.py
class BaseOrchestrator(StackComponent, ABC):
    """Base class for all orchestrators.

    In order to implement an orchestrator you will need to subclass from this
    class.

    How it works:
    -------------
    The `run()` method is the entrypoint that is executed when the
    pipeline's run method is called within the user code
    (`pipeline_instance.run()`).

    This method will take the ZenML Pipeline instance and prepare it for
    eventual execution. To do this the following steps are taken:

    * The underlying protobuf pipeline is created.

    * Within the `_configure_node_context()` method the pipeline
    requirements, stack and runtime configuration is added to the step
    context

    * The `_get_sorted_steps()` method then generates a sorted list of
    steps which will later be used to directly execute these steps in order,
    or to easily build a dag

    * After these initial steps comes the most crucial one. Within the
    `prepare_or_run_pipeline()` method each orchestrator will have its own
    implementation that dictates the pipeline orchestration. In the simplest
    case this method will iterate through all steps and execute them one by
    one. In other cases this method will build and deploy an intermediate
    representation of the pipeline (e.g an airflow dag or a kubeflow
    pipelines yaml) to be executed within the orchestrators environment.

    Building your own:
    ------------------
    In order to build your own orchestrator, all you need to do is subclass
    from this class and implement your own `prepare_or_run_pipeline()`
    method. Overwriting other methods is NOT recommended but possible.
    See the docstring of the `prepare_or_run_pipeline()` method to find out
    details of what needs to be implemented within it.
    """

    # Class Configuration
    TYPE: ClassVar[StackComponentType] = StackComponentType.ORCHESTRATOR

    @abstractmethod
    def prepare_or_run_pipeline(
        self,
        sorted_steps: List[BaseStep],
        pipeline: "BasePipeline",
        pb2_pipeline: Pb2Pipeline,
        stack: "Stack",
        runtime_configuration: "RuntimeConfiguration",
    ) -> Any:
        """This method needs to be implemented by the respective orchestrator.

        Depending on the type of orchestrator you'll have to perform slightly
        different operations.

        Simple Case:
        ------------
        The Steps are run directly from within the same environment in which
        the orchestrator code is executed. In this case you will need to
        deal with implementation-specific runtime configurations (like the
        schedule) and then iterate through each step and finally call
        `self.run_step()` to execute each step.

        Advanced Case:
        --------------
        Most orchestrators will not run the steps directly. Instead, they
        build some intermediate representation of the pipeline that is then
        used to create and run the pipeline and its steps on the target
        environment. For such orchestrators this method will have to build
        this representation and either deploy it directly or return it.

        Regardless of the implementation details, the orchestrator will need
        to a way to trigger each step in the target environment. For this
        the `run_step()` method should be used.

        In case the orchestrator is using docker containers for orchestration
        of each step, the `zenml.entrypoints.step_entrypoint` module can be
        used as a generalized entrypoint that sets up all the necessary
        prerequisites, parses input parameters and finally executes the step
        using the `run_step()`method.

        If the orchestrator needs to know the upstream steps for a specific
        step to build a DAG, it can use the `get_upstream_step_names()` method
        to get them.

        Args:
            sorted_steps: List of sorted steps.
            pipeline: Zenml Pipeline instance.
            pb2_pipeline: Protobuf Pipeline instance.
            stack: The stack the pipeline was run on.
            runtime_configuration: The Runtime configuration of the current run.

        Returns:
            The optional return value from this method will be returned by the
            `pipeline_instance.run()` call when someone is running a pipeline.
        """

    def run(
        self,
        pipeline: "BasePipeline",
        stack: "Stack",
        runtime_configuration: "RuntimeConfiguration",
    ) -> Any:
        """Runs a pipeline.

        To do this, a protobuf pipeline is created, the context of the
        individual steps is expanded to include relevant data, the steps are
        sorted into execution order and the implementation specific
        `prepare_or_run_pipeline()` method is called.

        Args:
            pipeline: The pipeline to run.
            stack: The stack on which the pipeline is run.
            runtime_configuration: Runtime configuration of the pipeline run.

        Returns:
            The result of the call to `prepare_or_run_pipeline()`.
        """
        # Create the protobuf pipeline which will be needed for various reasons
        # in the following steps
        pb2_pipeline: Pb2Pipeline = Compiler().compile(
            create_tfx_pipeline(pipeline, stack=stack)
        )

        self._configure_node_context(
            pipeline=pipeline,
            pb2_pipeline=pb2_pipeline,
            stack=stack,
            runtime_configuration=runtime_configuration,
        )

        sorted_steps = self._get_sorted_steps(
            pipeline=pipeline, pb2_pipeline=pb2_pipeline
        )

        result = self.prepare_or_run_pipeline(
            sorted_steps=sorted_steps,
            pipeline=pipeline,
            pb2_pipeline=pb2_pipeline,
            stack=stack,
            runtime_configuration=runtime_configuration,
        )

        return result

    @staticmethod
    def _get_sorted_steps(
        pipeline: "BasePipeline", pb2_pipeline: Pb2Pipeline
    ) -> List["BaseStep"]:
        """Get steps sorted in the execution order.

        This simplifies the building of a DAG at a later stage as it can be
        built with one iteration over this sorted list of steps.

        Args:
            pipeline: The pipeline
            pb2_pipeline: The protobuf pipeline representation

        Returns:
            List of steps in execution order
        """
        # Create a list of sorted steps
        sorted_steps = []
        for node in pb2_pipeline.nodes:
            pipeline_node: PipelineNode = node.pipeline_node
            sorted_steps.append(
                get_step_for_node(
                    pipeline_node, steps=list(pipeline.steps.values())
                )
            )
        return sorted_steps

    def run_step(
        self,
        step: "BaseStep",
        run_name: str,
        pb2_pipeline: Pb2Pipeline,
    ) -> Optional[data_types.ExecutionInfo]:
        """This sets up a component launcher and executes the given step.

        Args:
            step: The step to be executed
            run_name: The unique run name
            pb2_pipeline: Protobuf Pipeline instance

        Returns:
            The execution info of the step.
        """
        # Substitute the runtime parameter to be a concrete run_id, it is
        # important for this to be unique for each run.
        runtime_parameter_utils.substitute_runtime_parameter(
            pb2_pipeline,
            {PIPELINE_RUN_ID_PARAMETER_NAME: run_name},
        )

        # Extract the deployment_configs and use it to access the executor and
        # custom driver spec
        deployment_config = runner_utils.extract_local_deployment_config(
            pb2_pipeline
        )
        executor_spec = runner_utils.extract_executor_spec(
            deployment_config, step.name
        )
        custom_driver_spec = runner_utils.extract_custom_driver_spec(
            deployment_config, step.name
        )

        # At this point the active metadata store is queried for the
        # metadata_connection
        repo = Repository()
        metadata_store = repo.active_stack.metadata_store
        metadata_connection = metadata.Metadata(
            metadata_store.get_tfx_metadata_config()
        )
        custom_executor_operators = {
            executable_spec_pb2.PythonClassExecutableSpec: step.executor_operator
        }

        # The protobuf node for the current step is loaded here.
        pipeline_node = self._get_node_with_step_name(
            step_name=step.name, pb2_pipeline=pb2_pipeline
        )

        # Create the tfx launcher responsible for executing the step.
        component_launcher = launcher.Launcher(
            pipeline_node=pipeline_node,
            mlmd_connection=metadata_connection,
            pipeline_info=pb2_pipeline.pipeline_info,
            pipeline_runtime_spec=pb2_pipeline.runtime_spec,
            executor_spec=executor_spec,
            custom_driver_spec=custom_driver_spec,
            custom_executor_operators=custom_executor_operators,
        )

        # In some stack configurations, some stack components (like experiment
        # trackers) will run some code before and after the actual step run.
        # This is where the step actually gets executed using the
        # component_launcher
        repo.active_stack.prepare_step_run()
        execution_info = self._execute_step(component_launcher)
        repo.active_stack.cleanup_step_run()

        return execution_info

    @staticmethod
    def _execute_step(
        tfx_launcher: launcher.Launcher,
    ) -> Optional[data_types.ExecutionInfo]:
        """Executes a tfx component.

        Args:
            tfx_launcher: A tfx launcher to execute the component.

        Returns:
            Optional execution info returned by the launcher.

        Raises:
            DuplicateRunNameError: If the run name is already in use.
        """
        step_name_param = (
            INTERNAL_EXECUTION_PARAMETER_PREFIX + PARAM_PIPELINE_PARAMETER_NAME
        )
        pipeline_step_name = tfx_launcher._pipeline_node.node_info.id
        start_time = time.time()
        logger.info(f"Step `{pipeline_step_name}` has started.")
        try:
            execution_info = tfx_launcher.launch()

            if execution_info and get_cache_status(execution_info):
                if execution_info.exec_properties:
                    step_name = json.loads(
                        execution_info.exec_properties[step_name_param]
                    )
                    logger.info(
                        f"Using cached version of `{pipeline_step_name}` "
                        f"[`{step_name}`].",
                    )
                else:
                    logger.error(
                        f"No execution properties found for step "
                        f"`{pipeline_step_name}`."
                    )
        except RuntimeError as e:
            if "execution has already succeeded" in str(e):
                # Hacky workaround to catch the error that a pipeline run with
                # this name already exists. Raise an error with a more
                # descriptive
                # message instead.
                raise DuplicateRunNameError()
            else:
                raise

        run_duration = time.time() - start_time
        logger.info(
            f"Step `{pipeline_step_name}` has finished in "
            f"{string_utils.get_human_readable_time(run_duration)}."
        )
        return execution_info

    def get_upstream_step_names(
        self, step: "BaseStep", pb2_pipeline: Pb2Pipeline
    ) -> List[str]:
        """Given a step, use the associated pb2 node to find the names of all upstream nodes.

        Args:
            step: Instance of a Pipeline Step
            pb2_pipeline: Protobuf Pipeline instance

        Returns:
            List of step names from direct upstream steps
        """
        node = self._get_node_with_step_name(step.name, pb2_pipeline)

        upstream_steps = []
        for upstream_node in node.upstream_nodes:
            upstream_steps.append(upstream_node)

        return upstream_steps

    @staticmethod
    def _get_node_with_step_name(
        step_name: str, pb2_pipeline: Pb2Pipeline
    ) -> PipelineNode:
        """Given the name of a step, return the node with that name from the pb2_pipeline.

        Args:
            step_name: Name of the step
            pb2_pipeline: pb2 pipeline containing nodes

        Returns:
            PipelineNode instance

        Raises:
            KeyError: If the step name is not found in the pipeline.
        """
        for node in pb2_pipeline.nodes:
            if (
                node.WhichOneof("node") == "pipeline_node"
                and node.pipeline_node.node_info.id == step_name
            ):
                return node.pipeline_node

        raise KeyError(
            f"Step {step_name} not found in Pipeline "
            f"{pb2_pipeline.pipeline_info.id}"
        )

    @staticmethod
    def _configure_node_context(
        pipeline: "BasePipeline",
        pb2_pipeline: Pb2Pipeline,
        stack: "Stack",
        runtime_configuration: "RuntimeConfiguration",
    ) -> None:
        """Iterates through each node of a pb2_pipeline.

        This attaches important contexts to the nodes; namely
        pipeline.requirements, stack information and the runtime configuration.

        Args:
            pipeline: Zenml Pipeline instance
            pb2_pipeline: Protobuf Pipeline instance
            stack: The stack the pipeline was run on
            runtime_configuration: The Runtime configuration of the current run
        """
        for node in pb2_pipeline.nodes:
            pipeline_node: PipelineNode = node.pipeline_node

            # Add pipeline requirements to the step context
            requirements = " ".join(sorted(pipeline.requirements))
            context_utils.add_context_to_node(
                pipeline_node,
                type_=MetadataContextTypes.PIPELINE_REQUIREMENTS.value,
                name=str(hash(requirements)),
                properties={"pipeline_requirements": requirements},
            )

            # Add the zenml stack to the step context
            context_utils.add_context_to_node(
                pipeline_node,
                type_=MetadataContextTypes.STACK.value,
                name=str(hash(json.dumps(stack.dict(), sort_keys=True))),
                properties=stack.dict(),
            )

            # Add all Pydantic objects from runtime_configuration to the context
            context_utils.add_runtime_configuration_to_node(
                pipeline_node, runtime_configuration
            )
get_upstream_step_names(self, step, pb2_pipeline)

Given a step, use the associated pb2 node to find the names of all upstream nodes.

Parameters:

Name Type Description Default
step BaseStep

Instance of a Pipeline Step

required
pb2_pipeline Pipeline

Protobuf Pipeline instance

required

Returns:

Type Description
List[str]

List of step names from direct upstream steps

Source code in zenml/orchestrators/base_orchestrator.py
def get_upstream_step_names(
    self, step: "BaseStep", pb2_pipeline: Pb2Pipeline
) -> List[str]:
    """Given a step, use the associated pb2 node to find the names of all upstream nodes.

    Args:
        step: Instance of a Pipeline Step
        pb2_pipeline: Protobuf Pipeline instance

    Returns:
        List of step names from direct upstream steps
    """
    node = self._get_node_with_step_name(step.name, pb2_pipeline)

    upstream_steps = []
    for upstream_node in node.upstream_nodes:
        upstream_steps.append(upstream_node)

    return upstream_steps
prepare_or_run_pipeline(self, sorted_steps, pipeline, pb2_pipeline, stack, runtime_configuration)

This method needs to be implemented by the respective orchestrator.

Depending on the type of orchestrator you'll have to perform slightly different operations.

Simple Case:

The Steps are run directly from within the same environment in which the orchestrator code is executed. In this case you will need to deal with implementation-specific runtime configurations (like the schedule) and then iterate through each step and finally call self.run_step() to execute each step.

Advanced Case:

Most orchestrators will not run the steps directly. Instead, they build some intermediate representation of the pipeline that is then used to create and run the pipeline and its steps on the target environment. For such orchestrators this method will have to build this representation and either deploy it directly or return it.

Regardless of the implementation details, the orchestrator will need to a way to trigger each step in the target environment. For this the run_step() method should be used.

In case the orchestrator is using docker containers for orchestration of each step, the zenml.entrypoints.step_entrypoint module can be used as a generalized entrypoint that sets up all the necessary prerequisites, parses input parameters and finally executes the step using the run_step()method.

If the orchestrator needs to know the upstream steps for a specific step to build a DAG, it can use the get_upstream_step_names() method to get them.

Parameters:

Name Type Description Default
sorted_steps List[zenml.steps.base_step.BaseStep]

List of sorted steps.

required
pipeline BasePipeline

Zenml Pipeline instance.

required
pb2_pipeline Pipeline

Protobuf Pipeline instance.

required
stack Stack

The stack the pipeline was run on.

required
runtime_configuration RuntimeConfiguration

The Runtime configuration of the current run.

required

Returns:

Type Description
Any

The optional return value from this method will be returned by the pipeline_instance.run() call when someone is running a pipeline.

Source code in zenml/orchestrators/base_orchestrator.py
@abstractmethod
def prepare_or_run_pipeline(
    self,
    sorted_steps: List[BaseStep],
    pipeline: "BasePipeline",
    pb2_pipeline: Pb2Pipeline,
    stack: "Stack",
    runtime_configuration: "RuntimeConfiguration",
) -> Any:
    """This method needs to be implemented by the respective orchestrator.

    Depending on the type of orchestrator you'll have to perform slightly
    different operations.

    Simple Case:
    ------------
    The Steps are run directly from within the same environment in which
    the orchestrator code is executed. In this case you will need to
    deal with implementation-specific runtime configurations (like the
    schedule) and then iterate through each step and finally call
    `self.run_step()` to execute each step.

    Advanced Case:
    --------------
    Most orchestrators will not run the steps directly. Instead, they
    build some intermediate representation of the pipeline that is then
    used to create and run the pipeline and its steps on the target
    environment. For such orchestrators this method will have to build
    this representation and either deploy it directly or return it.

    Regardless of the implementation details, the orchestrator will need
    to a way to trigger each step in the target environment. For this
    the `run_step()` method should be used.

    In case the orchestrator is using docker containers for orchestration
    of each step, the `zenml.entrypoints.step_entrypoint` module can be
    used as a generalized entrypoint that sets up all the necessary
    prerequisites, parses input parameters and finally executes the step
    using the `run_step()`method.

    If the orchestrator needs to know the upstream steps for a specific
    step to build a DAG, it can use the `get_upstream_step_names()` method
    to get them.

    Args:
        sorted_steps: List of sorted steps.
        pipeline: Zenml Pipeline instance.
        pb2_pipeline: Protobuf Pipeline instance.
        stack: The stack the pipeline was run on.
        runtime_configuration: The Runtime configuration of the current run.

    Returns:
        The optional return value from this method will be returned by the
        `pipeline_instance.run()` call when someone is running a pipeline.
    """
run(self, pipeline, stack, runtime_configuration)

Runs a pipeline.

To do this, a protobuf pipeline is created, the context of the individual steps is expanded to include relevant data, the steps are sorted into execution order and the implementation specific prepare_or_run_pipeline() method is called.

Parameters:

Name Type Description Default
pipeline BasePipeline

The pipeline to run.

required
stack Stack

The stack on which the pipeline is run.

required
runtime_configuration RuntimeConfiguration

Runtime configuration of the pipeline run.

required

Returns:

Type Description
Any

The result of the call to prepare_or_run_pipeline().

Source code in zenml/orchestrators/base_orchestrator.py
def run(
    self,
    pipeline: "BasePipeline",
    stack: "Stack",
    runtime_configuration: "RuntimeConfiguration",
) -> Any:
    """Runs a pipeline.

    To do this, a protobuf pipeline is created, the context of the
    individual steps is expanded to include relevant data, the steps are
    sorted into execution order and the implementation specific
    `prepare_or_run_pipeline()` method is called.

    Args:
        pipeline: The pipeline to run.
        stack: The stack on which the pipeline is run.
        runtime_configuration: Runtime configuration of the pipeline run.

    Returns:
        The result of the call to `prepare_or_run_pipeline()`.
    """
    # Create the protobuf pipeline which will be needed for various reasons
    # in the following steps
    pb2_pipeline: Pb2Pipeline = Compiler().compile(
        create_tfx_pipeline(pipeline, stack=stack)
    )

    self._configure_node_context(
        pipeline=pipeline,
        pb2_pipeline=pb2_pipeline,
        stack=stack,
        runtime_configuration=runtime_configuration,
    )

    sorted_steps = self._get_sorted_steps(
        pipeline=pipeline, pb2_pipeline=pb2_pipeline
    )

    result = self.prepare_or_run_pipeline(
        sorted_steps=sorted_steps,
        pipeline=pipeline,
        pb2_pipeline=pb2_pipeline,
        stack=stack,
        runtime_configuration=runtime_configuration,
    )

    return result
run_step(self, step, run_name, pb2_pipeline)

This sets up a component launcher and executes the given step.

Parameters:

Name Type Description Default
step BaseStep

The step to be executed

required
run_name str

The unique run name

required
pb2_pipeline Pipeline

Protobuf Pipeline instance

required

Returns:

Type Description
Optional[tfx.orchestration.portable.data_types.ExecutionInfo]

The execution info of the step.

Source code in zenml/orchestrators/base_orchestrator.py
def run_step(
    self,
    step: "BaseStep",
    run_name: str,
    pb2_pipeline: Pb2Pipeline,
) -> Optional[data_types.ExecutionInfo]:
    """This sets up a component launcher and executes the given step.

    Args:
        step: The step to be executed
        run_name: The unique run name
        pb2_pipeline: Protobuf Pipeline instance

    Returns:
        The execution info of the step.
    """
    # Substitute the runtime parameter to be a concrete run_id, it is
    # important for this to be unique for each run.
    runtime_parameter_utils.substitute_runtime_parameter(
        pb2_pipeline,
        {PIPELINE_RUN_ID_PARAMETER_NAME: run_name},
    )

    # Extract the deployment_configs and use it to access the executor and
    # custom driver spec
    deployment_config = runner_utils.extract_local_deployment_config(
        pb2_pipeline
    )
    executor_spec = runner_utils.extract_executor_spec(
        deployment_config, step.name
    )
    custom_driver_spec = runner_utils.extract_custom_driver_spec(
        deployment_config, step.name
    )

    # At this point the active metadata store is queried for the
    # metadata_connection
    repo = Repository()
    metadata_store = repo.active_stack.metadata_store
    metadata_connection = metadata.Metadata(
        metadata_store.get_tfx_metadata_config()
    )
    custom_executor_operators = {
        executable_spec_pb2.PythonClassExecutableSpec: step.executor_operator
    }

    # The protobuf node for the current step is loaded here.
    pipeline_node = self._get_node_with_step_name(
        step_name=step.name, pb2_pipeline=pb2_pipeline
    )

    # Create the tfx launcher responsible for executing the step.
    component_launcher = launcher.Launcher(
        pipeline_node=pipeline_node,
        mlmd_connection=metadata_connection,
        pipeline_info=pb2_pipeline.pipeline_info,
        pipeline_runtime_spec=pb2_pipeline.runtime_spec,
        executor_spec=executor_spec,
        custom_driver_spec=custom_driver_spec,
        custom_executor_operators=custom_executor_operators,
    )

    # In some stack configurations, some stack components (like experiment
    # trackers) will run some code before and after the actual step run.
    # This is where the step actually gets executed using the
    # component_launcher
    repo.active_stack.prepare_step_run()
    execution_info = self._execute_step(component_launcher)
    repo.active_stack.cleanup_step_run()

    return execution_info

context_utils

Utilities for the local orchestrator to help with contexts.

add_context_to_node(pipeline_node, type_, name, properties)

Adds a new context to a TFX protobuf pipeline node.

Parameters:

Name Type Description Default
pipeline_node pipeline_pb2.PipelineNode

A tfx protobuf pipeline node

required
type_ str

The type name for the context to be added

required
name str

Unique key for the context

required
properties Dict[str, str]

dictionary of strings as properties of the context

required
Source code in zenml/orchestrators/context_utils.py
def add_context_to_node(
    pipeline_node: "pipeline_pb2.PipelineNode",
    type_: str,
    name: str,
    properties: Dict[str, str],
) -> None:
    """Adds a new context to a TFX protobuf pipeline node.

    Args:
        pipeline_node: A tfx protobuf pipeline node
        type_: The type name for the context to be added
        name: Unique key for the context
        properties: dictionary of strings as properties of the context
    """
    # Add a new context to the pipeline
    context: "pipeline_pb2.ContextSpec" = pipeline_node.contexts.contexts.add()
    # Adding the type of context
    context.type.name = type_
    # Setting the name of the context
    context.name.field_value.string_value = name
    # Setting the properties of the context depending on attribute type
    for key, value in properties.items():
        c_property = context.properties[key]
        c_property.field_value.string_value = value

add_runtime_configuration_to_node(pipeline_node, runtime_config)

Add the runtime configuration of a pipeline run to a protobuf pipeline node.

Parameters:

Name Type Description Default
pipeline_node pipeline_pb2.PipelineNode

a tfx protobuf pipeline node

required
runtime_config RuntimeConfiguration

a ZenML RuntimeConfiguration

required
Source code in zenml/orchestrators/context_utils.py
def add_runtime_configuration_to_node(
    pipeline_node: "pipeline_pb2.PipelineNode",
    runtime_config: RuntimeConfiguration,
) -> None:
    """Add the runtime configuration of a pipeline run to a protobuf pipeline node.

    Args:
        pipeline_node: a tfx protobuf pipeline node
        runtime_config: a ZenML RuntimeConfiguration
    """
    skip_errors: bool = runtime_config.get(
        "ignore_unserializable_fields", False
    )

    # Determine the name of the context
    def _name(obj: "BaseModel") -> str:
        """Compute a unique context name for a pydantic BaseModel.

        Args:
            obj: a pydantic BaseModel

        Returns:
            a unique context name
        """
        try:
            return str(hash(obj.json(sort_keys=True)))
        except TypeError as e:
            class_name = obj.__class__.__name__
            logging.info(
                "Cannot convert %s to json, generating uuid instead. Error: %s",
                class_name,
                e,
            )
            return f"{class_name}_{uuid.uuid1()}"

    # iterate over all attributes of runtime context, serializing all pydantic
    # objects to node context.
    for key, obj in runtime_config.items():
        if isinstance(obj, BaseModel):
            logger.debug("Adding %s to context", key)
            add_context_to_node(
                pipeline_node,
                type_=obj.__repr_name__().lower(),
                name=_name(obj),
                properties=serialize_pydantic_object(
                    obj, skip_errors=skip_errors
                ),
            )

serialize_pydantic_object(obj, *, skip_errors=False)

Convert a pydantic object to a dict of strings.

Parameters:

Name Type Description Default
obj BaseModel

a pydantic object.

required
skip_errors bool

if True, ignore errors when serializing the object.

False

Returns:

Type Description
Dict[str, str]

a dictionary of strings.

Source code in zenml/orchestrators/context_utils.py
def serialize_pydantic_object(
    obj: BaseModel, *, skip_errors: bool = False
) -> Dict[str, str]:
    """Convert a pydantic object to a dict of strings.

    Args:
        obj: a pydantic object.
        skip_errors: if True, ignore errors when serializing the object.

    Returns:
        a dictionary of strings.
    """

    class PydanticEncoder(json.JSONEncoder):
        def default(self, o: Any) -> Any:
            """Default encoding for pydantic objects.

            Args:
                o: the object to encode.

            Returns:
                the encoded object.
            """
            try:
                return cast(Callable[[Any], str], obj.__json_encoder__)(o)
            except TypeError:
                return super().default(o)

    def _inner_generator(
        dictionary: Dict[str, Any]
    ) -> Iterator[Tuple[str, str]]:
        """Itemwise serialize each element in a dictionary.

        Args:
            dictionary: a dictionary.

        Yields:
            a tuple of (key, value).

        Raises:
            TypeError: if the value is not JSON serializable
        """
        for key, item in dictionary.items():
            try:
                yield key, json.dumps(item, cls=PydanticEncoder)
            except TypeError as e:
                if skip_errors:
                    logging.info(
                        "Skipping adding field '%s' to metadata context as "
                        "it cannot be serialized due to %s.",
                        key,
                        e,
                    )
                else:
                    raise TypeError(
                        f"Invalid type {type(item)} for key {key} can not be "
                        "serialized."
                    ) from e

    return {key: value for key, value in _inner_generator(obj.dict())}

local special

Initialization for the local orchestrator.

local_orchestrator

Implementation of the ZenML local orchestrator.

LocalOrchestrator (BaseOrchestrator) pydantic-model

Orchestrator responsible for running pipelines locally.

This orchestrator does not allow for concurrent execution of steps and also does not support running on a schedule.

Source code in zenml/orchestrators/local/local_orchestrator.py
class LocalOrchestrator(BaseOrchestrator):
    """Orchestrator responsible for running pipelines locally.

    This orchestrator does not allow for concurrent execution of steps and also
    does not support running on a schedule.
    """

    FLAVOR: ClassVar[str] = "local"

    def prepare_or_run_pipeline(
        self,
        sorted_steps: List[BaseStep],
        pipeline: "BasePipeline",
        pb2_pipeline: Pb2Pipeline,
        stack: "Stack",
        runtime_configuration: "RuntimeConfiguration",
    ) -> Any:
        """This method iterates through all steps and executes them sequentially.

        Args:
            sorted_steps: A list of steps in the pipeline.
            pipeline: The pipeline object.
            pb2_pipeline: The pipeline object in protobuf format.
            stack: The stack object.
            runtime_configuration: The runtime configuration object.
        """
        if runtime_configuration.schedule:
            logger.warning(
                "Local Orchestrator currently does not support the"
                "use of schedules. The `schedule` will be ignored "
                "and the pipeline will be run immediately."
            )
        assert runtime_configuration.run_name, "Run name must be set"

        # Run each step
        for step in sorted_steps:
            self.run_step(
                step=step,
                run_name=runtime_configuration.run_name,
                pb2_pipeline=pb2_pipeline,
            )
prepare_or_run_pipeline(self, sorted_steps, pipeline, pb2_pipeline, stack, runtime_configuration)

This method iterates through all steps and executes them sequentially.

Parameters:

Name Type Description Default
sorted_steps List[zenml.steps.base_step.BaseStep]

A list of steps in the pipeline.

required
pipeline BasePipeline

The pipeline object.

required
pb2_pipeline Pipeline

The pipeline object in protobuf format.

required
stack Stack

The stack object.

required
runtime_configuration RuntimeConfiguration

The runtime configuration object.

required
Source code in zenml/orchestrators/local/local_orchestrator.py
def prepare_or_run_pipeline(
    self,
    sorted_steps: List[BaseStep],
    pipeline: "BasePipeline",
    pb2_pipeline: Pb2Pipeline,
    stack: "Stack",
    runtime_configuration: "RuntimeConfiguration",
) -> Any:
    """This method iterates through all steps and executes them sequentially.

    Args:
        sorted_steps: A list of steps in the pipeline.
        pipeline: The pipeline object.
        pb2_pipeline: The pipeline object in protobuf format.
        stack: The stack object.
        runtime_configuration: The runtime configuration object.
    """
    if runtime_configuration.schedule:
        logger.warning(
            "Local Orchestrator currently does not support the"
            "use of schedules. The `schedule` will be ignored "
            "and the pipeline will be run immediately."
        )
    assert runtime_configuration.run_name, "Run name must be set"

    # Run each step
    for step in sorted_steps:
        self.run_step(
            step=step,
            run_name=runtime_configuration.run_name,
            pb2_pipeline=pb2_pipeline,
        )

utils

Utility functions for the orchestrator.

create_tfx_pipeline(zenml_pipeline, stack)

Creates a tfx pipeline from a ZenML pipeline.

Parameters:

Name Type Description Default
zenml_pipeline BasePipeline

The ZenML pipeline.

required
stack Stack

The stack.

required

Returns:

Type Description
Pipeline

The tfx pipeline.

Source code in zenml/orchestrators/utils.py
def create_tfx_pipeline(
    zenml_pipeline: "BasePipeline", stack: "Stack"
) -> tfx_pipeline.Pipeline:
    """Creates a tfx pipeline from a ZenML pipeline.

    Args:
        zenml_pipeline: The ZenML pipeline.
        stack: The stack.

    Returns:
        The tfx pipeline.
    """
    # Connect the inputs/outputs of all steps in the pipeline
    zenml_pipeline.connect(**zenml_pipeline.steps)

    tfx_components = [step.component for step in zenml_pipeline.steps.values()]

    artifact_store = stack.artifact_store

    # We do not pass the metadata connection config here as it might not be
    # accessible. Instead it is queried from the active stack right before a
    # step is executed (see `BaseOrchestrator.run_step(...)`)
    return tfx_pipeline.Pipeline(
        pipeline_name=zenml_pipeline.name,
        components=tfx_components,  # type: ignore[arg-type]
        pipeline_root=artifact_store.path,
        enable_cache=zenml_pipeline.enable_cache,
    )

get_cache_status(execution_info)

Returns whether a cached execution was used or not.

Parameters:

Name Type Description Default
execution_info Optional[tfx.orchestration.portable.data_types.ExecutionInfo]

The execution info.

required

Returns:

Type Description
bool

True if the execution was cached, False otherwise.

Source code in zenml/orchestrators/utils.py
def get_cache_status(
    execution_info: Optional[data_types.ExecutionInfo],
) -> bool:
    """Returns whether a cached execution was used or not.

    Args:
        execution_info: The execution info.

    Returns:
        `True` if the execution was cached, `False` otherwise.
    """
    # An execution output URI is only provided if the step needs to be
    # executed (= is not cached)
    if execution_info and execution_info.execution_output_uri is None:
        return True
    else:
        return False

get_step_for_node(node, steps)

Finds the matching step for a tfx pipeline node.

Parameters:

Name Type Description Default
node PipelineNode

The tfx pipeline node.

required
steps List[zenml.steps.base_step.BaseStep]

The list of steps.

required

Returns:

Type Description
BaseStep

The matching step.

Exceptions:

Type Description
RuntimeError

If no matching step is found.

Source code in zenml/orchestrators/utils.py
def get_step_for_node(node: PipelineNode, steps: List[BaseStep]) -> BaseStep:
    """Finds the matching step for a tfx pipeline node.

    Args:
        node: The tfx pipeline node.
        steps: The list of steps.

    Returns:
        The matching step.

    Raises:
        RuntimeError: If no matching step is found.
    """
    step_name = node.node_info.id
    try:
        return next(step for step in steps if step.name == step_name)
    except StopIteration:
        raise RuntimeError(f"Unable to find step with name '{step_name}'.")