Skip to content

Orchestrators

zenml.orchestrators special

An orchestrator is a special kind of backend that manages the running of each step of the pipeline. Orchestrators administer the actual pipeline runs. You can think of it as the 'root' of any pipeline job that you run during your experimentation.

ZenML supports a local orchestrator out of the box which allows you to run your pipelines in a local environment. We also support using Apache Airflow as the orchestrator to handle the steps of your pipeline.

base_orchestrator

BaseOrchestrator (BaseComponent) pydantic-model

Base Orchestrator class to orchestrate ZenML pipelines.

Source code in zenml/orchestrators/base_orchestrator.py
class BaseOrchestrator(BaseComponent):
    """Base Orchestrator class to orchestrate ZenML pipelines."""

    _ORCHESTRATOR_STORE_DIR_NAME: str = "orchestrators"

    def __init__(self, repo_path: str, **kwargs: Any) -> None:
        """Initializes a BaseOrchestrator instance.

        Args:
            repo_path: Path to the repository of this orchestrator.
        """
        serialization_dir = os.path.join(
            get_zenml_config_dir(repo_path),
            self._ORCHESTRATOR_STORE_DIR_NAME,
        )
        super().__init__(serialization_dir=serialization_dir, **kwargs)

    @abstractmethod
    def run(
        self, zenml_pipeline: "BasePipeline", run_name: str, **kwargs: Any
    ) -> Any:
        """Abstract method to run a pipeline. Overwrite this in subclasses
        with a concrete implementation on how to run the given pipeline.

        Args:
            zenml_pipeline: The pipeline to run.
            run_name: Name of the pipeline run.
            **kwargs: Potential additional parameters used in subclass
                implementations.
        """
        raise NotImplementedError

    @property
    @abstractmethod
    def is_running(self) -> bool:
        """Returns whether the orchestrator is currently running."""

    @property
    def log_file(self) -> Optional[str]:
        """Returns path to a log file if available."""
        # TODO [ENG-136]: make this more generic in case an orchestrator has
        #  multiple log files, e.g. change to a monitor() method which yields
        #  new logs to output to the CLI
        return None

    def pre_run(self, pipeline: "BasePipeline", caller_filepath: str) -> None:
        """Should be run before the `run()` function to prepare orchestrator.

        Args:
            pipeline: Pipeline that will be run.
            caller_filepath: Path to the file in which `pipeline.run()` was
                called. This is necessary for airflow so we know the file in
                which the DAG is defined.
        """

    def post_run(self) -> None:
        """Should be run after the `run()` to clean up."""

    def up(self) -> None:
        """Provisions resources for the orchestrator."""

    def down(self) -> None:
        """Destroys resources for the orchestrator."""

    class Config:
        """Configuration of settings."""

        env_prefix = "zenml_orchestrator_"
is_running: bool property readonly

Returns whether the orchestrator is currently running.

log_file: Optional[str] property readonly

Returns path to a log file if available.

Config

Configuration of settings.

Source code in zenml/orchestrators/base_orchestrator.py
class Config:
    """Configuration of settings."""

    env_prefix = "zenml_orchestrator_"
__init__(self, repo_path, **kwargs) special

Initializes a BaseOrchestrator instance.

Parameters:

Name Type Description Default
repo_path str

Path to the repository of this orchestrator.

required
Source code in zenml/orchestrators/base_orchestrator.py
def __init__(self, repo_path: str, **kwargs: Any) -> None:
    """Initializes a BaseOrchestrator instance.

    Args:
        repo_path: Path to the repository of this orchestrator.
    """
    serialization_dir = os.path.join(
        get_zenml_config_dir(repo_path),
        self._ORCHESTRATOR_STORE_DIR_NAME,
    )
    super().__init__(serialization_dir=serialization_dir, **kwargs)
down(self)

Destroys resources for the orchestrator.

Source code in zenml/orchestrators/base_orchestrator.py
def down(self) -> None:
    """Destroys resources for the orchestrator."""
post_run(self)

Should be run after the run() to clean up.

Source code in zenml/orchestrators/base_orchestrator.py
def post_run(self) -> None:
    """Should be run after the `run()` to clean up."""
pre_run(self, pipeline, caller_filepath)

Should be run before the run() function to prepare orchestrator.

Parameters:

Name Type Description Default
pipeline BasePipeline

Pipeline that will be run.

required
caller_filepath str

Path to the file in which pipeline.run() was called. This is necessary for airflow so we know the file in which the DAG is defined.

required
Source code in zenml/orchestrators/base_orchestrator.py
def pre_run(self, pipeline: "BasePipeline", caller_filepath: str) -> None:
    """Should be run before the `run()` function to prepare orchestrator.

    Args:
        pipeline: Pipeline that will be run.
        caller_filepath: Path to the file in which `pipeline.run()` was
            called. This is necessary for airflow so we know the file in
            which the DAG is defined.
    """
run(self, zenml_pipeline, run_name, **kwargs)

Abstract method to run a pipeline. Overwrite this in subclasses with a concrete implementation on how to run the given pipeline.

Parameters:

Name Type Description Default
zenml_pipeline BasePipeline

The pipeline to run.

required
run_name str

Name of the pipeline run.

required
**kwargs Any

Potential additional parameters used in subclass implementations.

{}
Source code in zenml/orchestrators/base_orchestrator.py
@abstractmethod
def run(
    self, zenml_pipeline: "BasePipeline", run_name: str, **kwargs: Any
) -> Any:
    """Abstract method to run a pipeline. Overwrite this in subclasses
    with a concrete implementation on how to run the given pipeline.

    Args:
        zenml_pipeline: The pipeline to run.
        run_name: Name of the pipeline run.
        **kwargs: Potential additional parameters used in subclass
            implementations.
    """
    raise NotImplementedError
up(self)

Provisions resources for the orchestrator.

Source code in zenml/orchestrators/base_orchestrator.py
def up(self) -> None:
    """Provisions resources for the orchestrator."""

local special

local_dag_runner

Inspired by local dag runner implementation by Google at: https://github.com/tensorflow/tfx/blob/master/tfx/orchestration /local/local_dag_runner.py

LocalDagRunner (TfxRunner)

Local TFX DAG runner.

Source code in zenml/orchestrators/local/local_dag_runner.py
class LocalDagRunner(tfx_runner.TfxRunner):
    """Local TFX DAG runner."""

    def __init__(self) -> None:
        """Initializes LocalDagRunner as a TFX orchestrator."""

    def run(self, pipeline: tfx_pipeline.Pipeline, run_name: str = "") -> None:
        """Runs given logical pipeline locally.

        Args:
          pipeline: Logical pipeline containing pipeline args and components.
          run_name: Name of the pipeline run.
        """
        for component in pipeline.components:
            if isinstance(component, base_component.BaseComponent):
                component._resolve_pip_dependencies(
                    pipeline.pipeline_info.pipeline_root
                )

        c = compiler.Compiler()
        pipeline = c.compile(pipeline)

        # Substitute the runtime parameter to be a concrete run_id
        runtime_parameter_utils.substitute_runtime_parameter(
            pipeline,
            {
                PIPELINE_RUN_ID_PARAMETER_NAME: run_name,
            },
        )

        deployment_config = runner_utils.extract_local_deployment_config(
            pipeline
        )
        connection_config = deployment_config.metadata_connection_config  # type: ignore[attr-defined] # noqa

        logger.debug(f"Using deployment config:\n {deployment_config}")
        logger.debug(f"Using connection config:\n {connection_config}")

        # Run each component. Note that the pipeline.components list is in
        # topological order.
        for node in pipeline.nodes:
            pipeline_node = node.pipeline_node
            node_id = pipeline_node.node_info.id
            executor_spec = runner_utils.extract_executor_spec(
                deployment_config, node_id
            )
            custom_driver_spec = runner_utils.extract_custom_driver_spec(
                deployment_config, node_id
            )

            component_launcher = launcher.Launcher(
                pipeline_node=pipeline_node,
                mlmd_connection=metadata.Metadata(connection_config),
                pipeline_info=pipeline.pipeline_info,
                pipeline_runtime_spec=pipeline.runtime_spec,
                executor_spec=executor_spec,
                custom_driver_spec=custom_driver_spec,
            )
            execute_step(component_launcher)
__init__(self) special

Initializes LocalDagRunner as a TFX orchestrator.

Source code in zenml/orchestrators/local/local_dag_runner.py
def __init__(self) -> None:
    """Initializes LocalDagRunner as a TFX orchestrator."""
run(self, pipeline, run_name='')

Runs given logical pipeline locally.

Parameters:

Name Type Description Default
pipeline Pipeline

Logical pipeline containing pipeline args and components.

required
run_name str

Name of the pipeline run.

''
Source code in zenml/orchestrators/local/local_dag_runner.py
def run(self, pipeline: tfx_pipeline.Pipeline, run_name: str = "") -> None:
    """Runs given logical pipeline locally.

    Args:
      pipeline: Logical pipeline containing pipeline args and components.
      run_name: Name of the pipeline run.
    """
    for component in pipeline.components:
        if isinstance(component, base_component.BaseComponent):
            component._resolve_pip_dependencies(
                pipeline.pipeline_info.pipeline_root
            )

    c = compiler.Compiler()
    pipeline = c.compile(pipeline)

    # Substitute the runtime parameter to be a concrete run_id
    runtime_parameter_utils.substitute_runtime_parameter(
        pipeline,
        {
            PIPELINE_RUN_ID_PARAMETER_NAME: run_name,
        },
    )

    deployment_config = runner_utils.extract_local_deployment_config(
        pipeline
    )
    connection_config = deployment_config.metadata_connection_config  # type: ignore[attr-defined] # noqa

    logger.debug(f"Using deployment config:\n {deployment_config}")
    logger.debug(f"Using connection config:\n {connection_config}")

    # Run each component. Note that the pipeline.components list is in
    # topological order.
    for node in pipeline.nodes:
        pipeline_node = node.pipeline_node
        node_id = pipeline_node.node_info.id
        executor_spec = runner_utils.extract_executor_spec(
            deployment_config, node_id
        )
        custom_driver_spec = runner_utils.extract_custom_driver_spec(
            deployment_config, node_id
        )

        component_launcher = launcher.Launcher(
            pipeline_node=pipeline_node,
            mlmd_connection=metadata.Metadata(connection_config),
            pipeline_info=pipeline.pipeline_info,
            pipeline_runtime_spec=pipeline.runtime_spec,
            executor_spec=executor_spec,
            custom_driver_spec=custom_driver_spec,
        )
        execute_step(component_launcher)

local_orchestrator

LocalOrchestrator (BaseOrchestrator) pydantic-model

Orchestrator responsible for running pipelines locally.

Source code in zenml/orchestrators/local/local_orchestrator.py
class LocalOrchestrator(BaseOrchestrator):
    """Orchestrator responsible for running pipelines locally."""

    _is_running: bool = PrivateAttr(default=False)

    @property
    def is_running(self) -> bool:
        """Returns whether the orchestrator is currently running."""
        return self._is_running

    def run(
        self,
        zenml_pipeline: "BasePipeline",
        run_name: str,
        **pipeline_args: Any
    ) -> None:
        """Runs a pipeline locally.

        Args:
            zenml_pipeline: The pipeline to run.
            run_name: Name of the pipeline run.
            **pipeline_args: Unused kwargs to conform with base signature.
        """
        self._is_running = True
        runner = LocalDagRunner()
        tfx_pipeline = create_tfx_pipeline(zenml_pipeline)
        runner.run(tfx_pipeline, run_name)
        self._is_running = False
is_running: bool property readonly

Returns whether the orchestrator is currently running.

run(self, zenml_pipeline, run_name, **pipeline_args)

Runs a pipeline locally.

Parameters:

Name Type Description Default
zenml_pipeline BasePipeline

The pipeline to run.

required
run_name str

Name of the pipeline run.

required
**pipeline_args Any

Unused kwargs to conform with base signature.

{}
Source code in zenml/orchestrators/local/local_orchestrator.py
def run(
    self,
    zenml_pipeline: "BasePipeline",
    run_name: str,
    **pipeline_args: Any
) -> None:
    """Runs a pipeline locally.

    Args:
        zenml_pipeline: The pipeline to run.
        run_name: Name of the pipeline run.
        **pipeline_args: Unused kwargs to conform with base signature.
    """
    self._is_running = True
    runner = LocalDagRunner()
    tfx_pipeline = create_tfx_pipeline(zenml_pipeline)
    runner.run(tfx_pipeline, run_name)
    self._is_running = False

utils

create_tfx_pipeline(zenml_pipeline)

Creates a tfx pipeline from a ZenML pipeline.

Source code in zenml/orchestrators/utils.py
def create_tfx_pipeline(
    zenml_pipeline: "BasePipeline",
) -> tfx_pipeline.Pipeline:
    """Creates a tfx pipeline from a ZenML pipeline."""
    # Connect the inputs/outputs of all steps in the pipeline
    zenml_pipeline.connect(**zenml_pipeline.steps)

    tfx_components = [step.component for step in zenml_pipeline.steps.values()]

    artifact_store = zenml_pipeline.stack.artifact_store
    metadata_store = zenml_pipeline.stack.metadata_store

    return tfx_pipeline.Pipeline(
        pipeline_name=zenml_pipeline.name,
        components=tfx_components,  # type: ignore[arg-type]
        pipeline_root=artifact_store.path,
        metadata_connection_config=metadata_store.get_tfx_metadata_config(),
        enable_cache=zenml_pipeline.enable_cache,
    )

execute_step(tfx_launcher)

Executes a tfx component.

Parameters:

Name Type Description Default
tfx_launcher Launcher

A tfx launcher to execute the component.

required

Returns:

Type Description
Optional[tfx.orchestration.portable.data_types.ExecutionInfo]

Optional execution info returned by the launcher.

Source code in zenml/orchestrators/utils.py
def execute_step(
    tfx_launcher: launcher.Launcher,
) -> Optional[data_types.ExecutionInfo]:
    """Executes a tfx component.

    Args:
        tfx_launcher: A tfx launcher to execute the component.

    Returns:
        Optional execution info returned by the launcher.
    """
    step_name = tfx_launcher._pipeline_node.node_info.id  # type: ignore[attr-defined] # noqa
    start_time = time.time()
    logger.info(f"Step `{step_name}` has started.")
    try:
        execution_info = tfx_launcher.launch()
    except RuntimeError as e:
        if "execution has already succeeded" in str(e):
            # Hacky workaround to catch the error that a pipeline run with
            # this name already exists. Raise an error with a more descriptive
            # message instead.
            raise DuplicateRunNameError()
        else:
            raise

    run_duration = time.time() - start_time
    logger.info(
        "Step `%s` has finished in %s.",
        step_name,
        string_utils.get_human_readable_time(run_duration),
    )
    return execution_info