Skip to content

Orchestrators

zenml.orchestrators special

Initialization for ZenML orchestrators.

An orchestrator is a special kind of backend that manages the running of each step of the pipeline. Orchestrators administer the actual pipeline runs. You can think of it as the 'root' of any pipeline job that you run during your experimentation.

ZenML supports a local orchestrator out of the box which allows you to run your pipelines in a local environment. We also support using Apache Airflow as the orchestrator to handle the steps of your pipeline.

base_orchestrator

Base orchestrator class.

BaseOrchestrator (StackComponent, ABC)

Base class for all orchestrators.

In order to implement an orchestrator you will need to subclass from this class.

How it works:

The run(...) method is the entrypoint that is executed when the pipeline's run method is called within the user code (pipeline_instance.run(...)).

This method will do some internal preparation and then call the prepare_or_run_pipeline(...) method. BaseOrchestrator subclasses must implement this method and either run the pipeline steps directly or deploy the pipeline to some remote infrastructure.

Source code in zenml/orchestrators/base_orchestrator.py
class BaseOrchestrator(StackComponent, ABC):
    """Base class for all orchestrators.

    In order to implement an orchestrator you will need to subclass from this
    class.

    How it works:
    -------------
    The `run(...)` method is the entrypoint that is executed when the
    pipeline's run method is called within the user code
    (`pipeline_instance.run(...)`).

    This method will do some internal preparation and then call the
    `prepare_or_run_pipeline(...)` method. BaseOrchestrator subclasses must
    implement this method and either run the pipeline steps directly or deploy
    the pipeline to some remote infrastructure.
    """

    # Class Configuration
    TYPE: ClassVar[StackComponentType] = StackComponentType.ORCHESTRATOR
    _active_deployment: Optional["PipelineDeployment"] = None
    _active_pb2_pipeline: Optional[Pb2Pipeline] = None

    @property
    def config(self) -> BaseOrchestratorConfig:
        """Returns the `BaseOrchestratorConfig` config.

        Returns:
            The configuration.
        """
        return cast(BaseOrchestratorConfig, self._config)

    @abstractmethod
    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
    ) -> Any:
        """This method needs to be implemented by the respective orchestrator.

        Depending on the type of orchestrator you'll have to perform slightly
        different operations.

        Simple Case:
        ------------
        The Steps are run directly from within the same environment in which
        the orchestrator code is executed. In this case you will need to
        deal with implementation-specific runtime configurations (like the
        schedule) and then iterate through the steps and finally call
        `self.run_step(...)` to execute each step.

        Advanced Case:
        --------------
        Most orchestrators will not run the steps directly. Instead, they
        build some intermediate representation of the pipeline that is then
        used to create and run the pipeline and its steps on the target
        environment. For such orchestrators this method will have to build
        this representation and deploy it.

        Regardless of the implementation details, the orchestrator will need
        to run each step in the target environment. For this the
        `self.run_step(...)` method should be used.

        The easiest way to make this work is by using an entrypoint
        configuration to run single steps (`zenml.entrypoints.step_entrypoint_configuration.StepEntrypointConfiguration`)
        or entire pipelines (`zenml.entrypoints.pipeline_entrypoint_configuration.PipelineEntrypointConfiguration`).

        Args:
            deployment: The pipeline deployment to prepare or run.
            stack: The stack the pipeline will run on.

        Returns:
            The optional return value from this method will be returned by the
            `pipeline_instance.run()` call when someone is running a pipeline.
        """

    def run(self, deployment: "PipelineDeployment", stack: "Stack") -> Any:
        """Runs a pipeline on a stack.

        Args:
            deployment: The pipeline deployment.
            stack: The stack on which to run the pipeline.

        Returns:
            Orchestrator-specific return value.
        """
        self._prepare_run(deployment=deployment)

        result = self.prepare_or_run_pipeline(
            deployment=deployment, stack=stack
        )

        self._cleanup_run()

        return result

    def run_step(
        self, step: "Step", run_name: Optional[str] = None
    ) -> Optional[data_types.ExecutionInfo]:
        """This sets up a component launcher and executes the given step.

        Args:
            step: The step to be executed
            run_name: The unique run name

        Returns:
            The execution info of the step.
        """
        assert self._active_deployment
        assert self._active_pb2_pipeline

        self._ensure_artifact_classes_loaded(step.config)

        step_name = step.config.name
        pb2_pipeline = self._active_pb2_pipeline

        run_name = run_name or self._active_deployment.run_name
        # Substitute the runtime parameter to be a concrete run_id, it is
        # important for this to be unique for each run.
        runtime_parameter_utils.substitute_runtime_parameter(
            pb2_pipeline,
            {PIPELINE_RUN_ID_PARAMETER_NAME: run_name},
        )

        # Extract the deployment_configs and use it to access the executor and
        # custom driver spec
        deployment_config = runner_utils.extract_local_deployment_config(
            pb2_pipeline
        )
        executor_spec = runner_utils.extract_executor_spec(
            deployment_config, step_name
        )
        custom_driver_spec = runner_utils.extract_custom_driver_spec(
            deployment_config, step_name
        )

        metadata_connection_cfg = Client().zen_store.get_metadata_config()

        # At this point the active metadata store is queried for the
        # metadata_connection
        stack = Client().active_stack
        executor_operator = self._get_executor_operator(
            step_operator=step.config.step_operator
        )
        custom_executor_operators = {
            executable_spec_pb2.PythonClassExecutableSpec: executor_operator
        }

        step_run_info = StepRunInfo(
            config=step.config,
            pipeline=self._active_deployment.pipeline,
            run_name=run_name,
        )

        # The protobuf node for the current step is loaded here.
        pipeline_node = self._get_node_with_step_name(step_name)

        proto_utils.add_mlmd_contexts(
            pipeline_node=pipeline_node,
            step=step,
            deployment=self._active_deployment,
            stack=stack,
        )

        component_launcher = launcher.Launcher(
            pipeline_node=pipeline_node,
            mlmd_connection=metadata.Metadata(metadata_connection_cfg),
            pipeline_info=pb2_pipeline.pipeline_info,
            pipeline_runtime_spec=pb2_pipeline.runtime_spec,
            executor_spec=executor_spec,
            custom_driver_spec=custom_driver_spec,
            custom_executor_operators=custom_executor_operators,
        )

        # If a step operator is used, the current environment will not be the
        # one executing the step function code and therefore we don't need to
        # run any preparation
        if step.config.step_operator:
            execution_info = self._execute_step(component_launcher)
        else:
            stack.prepare_step_run(info=step_run_info)
            try:
                execution_info = self._execute_step(component_launcher)
            finally:
                stack.cleanup_step_run(info=step_run_info)

        return execution_info

    @staticmethod
    def requires_resources_in_orchestration_environment(
        step: "Step",
    ) -> bool:
        """Checks if the orchestrator should run this step on special resources.

        Args:
            step: The step that will be checked.

        Returns:
            True if the step requires special resources in the orchestration
            environment, False otherwise.
        """
        # If the step requires custom resources and doesn't run with a step
        # operator, it would need these requirements in the orchestrator
        # environment
        if step.config.step_operator:
            return False

        return not step.config.resource_settings.empty

    def _prepare_run(self, deployment: "PipelineDeployment") -> None:
        """Prepares a run.

        Args:
            deployment: The deployment to prepare.
        """
        self._active_deployment = deployment

        pb2_pipeline = Pb2Pipeline()
        pb2_pipeline_json = string_utils.b64_decode(
            self._active_deployment.proto_pipeline
        )
        json_format.Parse(pb2_pipeline_json, pb2_pipeline)
        self._active_pb2_pipeline = pb2_pipeline

    def _cleanup_run(self) -> None:
        """Cleans up the active run."""
        self._active_deployment = None
        self._active_pb2_pipeline = None

    @staticmethod
    def _ensure_artifact_classes_loaded(
        step_configuration: "StepConfiguration",
    ) -> None:
        """Ensures that all artifact classes for a step are loaded.

        Args:
            step_configuration: A step configuration.
        """
        artifact_class_sources = set(
            input_.artifact_source
            for input_ in step_configuration.inputs.values()
        ) | set(
            output.artifact_source
            for output in step_configuration.outputs.values()
        )

        for source in artifact_class_sources:
            # Tfx depends on these classes being loaded so it can detect the
            # correct artifact class
            source_utils.validate_source_class(
                source, expected_class=BaseArtifact
            )

    @staticmethod
    def _execute_step(
        tfx_launcher: launcher.Launcher,
    ) -> Optional[data_types.ExecutionInfo]:
        """Executes a tfx component.

        Args:
            tfx_launcher: A tfx launcher to execute the component.

        Returns:
            Optional execution info returned by the launcher.

        Raises:
            RuntimeError: If the execution failed during preparation.
        """
        pipeline_step_name = tfx_launcher._pipeline_node.node_info.id
        start_time = time.time()
        logger.info(f"Step `{pipeline_step_name}` has started.")

        # There is no way to differentiate between a cached and a failed
        # execution based on the execution info returned by the TFX launcher.
        # We patch the _publish_failed_execution method in order to check
        # if an execution failed.
        execution_failed = False
        original_publish_failed_execution = (
            tfx_launcher._publish_failed_execution
        )

        def _new_publish_failed_execution(
            self: launcher.Launcher, *args: Any, **kwargs: Any
        ) -> None:
            original_publish_failed_execution(*args, **kwargs)
            nonlocal execution_failed
            execution_failed = True

        setattr(
            tfx_launcher,
            "_publish_failed_execution",
            types.MethodType(_new_publish_failed_execution, tfx_launcher),
        )
        execution_info = tfx_launcher.launch()
        if execution_failed:
            raise RuntimeError(
                "Failed to execute step. This is probably because some input "
                f"artifacts for the step {pipeline_step_name} could not be "
                "found in the database."
            )

        if execution_info and get_cache_status(execution_info):
            logger.info(f"Using cached version of `{pipeline_step_name}`.")

        run_duration = time.time() - start_time
        logger.info(
            f"Step `{pipeline_step_name}` has finished in "
            f"{string_utils.get_human_readable_time(run_duration)}."
        )
        return execution_info

    @staticmethod
    def _get_executor_operator(
        step_operator: Optional[str],
    ) -> Type[BaseExecutorOperator]:
        """Gets the TFX executor operator for the given step operator.

        Args:
            step_operator: The optional step operator used to run a step.

        Returns:
            The executor operator for the given step operator.
        """
        if step_operator:
            from zenml.step_operators.step_executor_operator import (
                StepExecutorOperator,
            )

            return StepExecutorOperator
        else:
            return PythonExecutorOperator

    def _get_node_with_step_name(self, step_name: str) -> PipelineNode:
        """Given the name of a step, return the node with that name from the pb2_pipeline.

        Args:
            step_name: Name of the step

        Returns:
            PipelineNode instance

        Raises:
            KeyError: If the step name is not found in the pipeline.
        """
        assert self._active_pb2_pipeline

        for node in self._active_pb2_pipeline.nodes:
            if (
                node.WhichOneof("node") == "pipeline_node"
                and node.pipeline_node.node_info.id == step_name
            ):
                return node.pipeline_node

        raise KeyError(
            f"Step {step_name} not found in Pipeline "
            f"{self._active_pb2_pipeline.pipeline_info.id}"
        )
config: BaseOrchestratorConfig property readonly

Returns the BaseOrchestratorConfig config.

Returns:

Type Description
BaseOrchestratorConfig

The configuration.

prepare_or_run_pipeline(self, deployment, stack)

This method needs to be implemented by the respective orchestrator.

Depending on the type of orchestrator you'll have to perform slightly different operations.

Simple Case:

The Steps are run directly from within the same environment in which the orchestrator code is executed. In this case you will need to deal with implementation-specific runtime configurations (like the schedule) and then iterate through the steps and finally call self.run_step(...) to execute each step.

Advanced Case:

Most orchestrators will not run the steps directly. Instead, they build some intermediate representation of the pipeline that is then used to create and run the pipeline and its steps on the target environment. For such orchestrators this method will have to build this representation and deploy it.

Regardless of the implementation details, the orchestrator will need to run each step in the target environment. For this the self.run_step(...) method should be used.

The easiest way to make this work is by using an entrypoint configuration to run single steps (zenml.entrypoints.step_entrypoint_configuration.StepEntrypointConfiguration) or entire pipelines (zenml.entrypoints.pipeline_entrypoint_configuration.PipelineEntrypointConfiguration).

Parameters:

Name Type Description Default
deployment PipelineDeployment

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required

Returns:

Type Description
Any

The optional return value from this method will be returned by the pipeline_instance.run() call when someone is running a pipeline.

Source code in zenml/orchestrators/base_orchestrator.py
@abstractmethod
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeployment",
    stack: "Stack",
) -> Any:
    """This method needs to be implemented by the respective orchestrator.

    Depending on the type of orchestrator you'll have to perform slightly
    different operations.

    Simple Case:
    ------------
    The Steps are run directly from within the same environment in which
    the orchestrator code is executed. In this case you will need to
    deal with implementation-specific runtime configurations (like the
    schedule) and then iterate through the steps and finally call
    `self.run_step(...)` to execute each step.

    Advanced Case:
    --------------
    Most orchestrators will not run the steps directly. Instead, they
    build some intermediate representation of the pipeline that is then
    used to create and run the pipeline and its steps on the target
    environment. For such orchestrators this method will have to build
    this representation and deploy it.

    Regardless of the implementation details, the orchestrator will need
    to run each step in the target environment. For this the
    `self.run_step(...)` method should be used.

    The easiest way to make this work is by using an entrypoint
    configuration to run single steps (`zenml.entrypoints.step_entrypoint_configuration.StepEntrypointConfiguration`)
    or entire pipelines (`zenml.entrypoints.pipeline_entrypoint_configuration.PipelineEntrypointConfiguration`).

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.

    Returns:
        The optional return value from this method will be returned by the
        `pipeline_instance.run()` call when someone is running a pipeline.
    """
requires_resources_in_orchestration_environment(step) staticmethod

Checks if the orchestrator should run this step on special resources.

Parameters:

Name Type Description Default
step Step

The step that will be checked.

required

Returns:

Type Description
bool

True if the step requires special resources in the orchestration environment, False otherwise.

Source code in zenml/orchestrators/base_orchestrator.py
@staticmethod
def requires_resources_in_orchestration_environment(
    step: "Step",
) -> bool:
    """Checks if the orchestrator should run this step on special resources.

    Args:
        step: The step that will be checked.

    Returns:
        True if the step requires special resources in the orchestration
        environment, False otherwise.
    """
    # If the step requires custom resources and doesn't run with a step
    # operator, it would need these requirements in the orchestrator
    # environment
    if step.config.step_operator:
        return False

    return not step.config.resource_settings.empty
run(self, deployment, stack)

Runs a pipeline on a stack.

Parameters:

Name Type Description Default
deployment PipelineDeployment

The pipeline deployment.

required
stack Stack

The stack on which to run the pipeline.

required

Returns:

Type Description
Any

Orchestrator-specific return value.

Source code in zenml/orchestrators/base_orchestrator.py
def run(self, deployment: "PipelineDeployment", stack: "Stack") -> Any:
    """Runs a pipeline on a stack.

    Args:
        deployment: The pipeline deployment.
        stack: The stack on which to run the pipeline.

    Returns:
        Orchestrator-specific return value.
    """
    self._prepare_run(deployment=deployment)

    result = self.prepare_or_run_pipeline(
        deployment=deployment, stack=stack
    )

    self._cleanup_run()

    return result
run_step(self, step, run_name=None)

This sets up a component launcher and executes the given step.

Parameters:

Name Type Description Default
step Step

The step to be executed

required
run_name Optional[str]

The unique run name

None

Returns:

Type Description
Optional[tfx.orchestration.portable.data_types.ExecutionInfo]

The execution info of the step.

Source code in zenml/orchestrators/base_orchestrator.py
def run_step(
    self, step: "Step", run_name: Optional[str] = None
) -> Optional[data_types.ExecutionInfo]:
    """This sets up a component launcher and executes the given step.

    Args:
        step: The step to be executed
        run_name: The unique run name

    Returns:
        The execution info of the step.
    """
    assert self._active_deployment
    assert self._active_pb2_pipeline

    self._ensure_artifact_classes_loaded(step.config)

    step_name = step.config.name
    pb2_pipeline = self._active_pb2_pipeline

    run_name = run_name or self._active_deployment.run_name
    # Substitute the runtime parameter to be a concrete run_id, it is
    # important for this to be unique for each run.
    runtime_parameter_utils.substitute_runtime_parameter(
        pb2_pipeline,
        {PIPELINE_RUN_ID_PARAMETER_NAME: run_name},
    )

    # Extract the deployment_configs and use it to access the executor and
    # custom driver spec
    deployment_config = runner_utils.extract_local_deployment_config(
        pb2_pipeline
    )
    executor_spec = runner_utils.extract_executor_spec(
        deployment_config, step_name
    )
    custom_driver_spec = runner_utils.extract_custom_driver_spec(
        deployment_config, step_name
    )

    metadata_connection_cfg = Client().zen_store.get_metadata_config()

    # At this point the active metadata store is queried for the
    # metadata_connection
    stack = Client().active_stack
    executor_operator = self._get_executor_operator(
        step_operator=step.config.step_operator
    )
    custom_executor_operators = {
        executable_spec_pb2.PythonClassExecutableSpec: executor_operator
    }

    step_run_info = StepRunInfo(
        config=step.config,
        pipeline=self._active_deployment.pipeline,
        run_name=run_name,
    )

    # The protobuf node for the current step is loaded here.
    pipeline_node = self._get_node_with_step_name(step_name)

    proto_utils.add_mlmd_contexts(
        pipeline_node=pipeline_node,
        step=step,
        deployment=self._active_deployment,
        stack=stack,
    )

    component_launcher = launcher.Launcher(
        pipeline_node=pipeline_node,
        mlmd_connection=metadata.Metadata(metadata_connection_cfg),
        pipeline_info=pb2_pipeline.pipeline_info,
        pipeline_runtime_spec=pb2_pipeline.runtime_spec,
        executor_spec=executor_spec,
        custom_driver_spec=custom_driver_spec,
        custom_executor_operators=custom_executor_operators,
    )

    # If a step operator is used, the current environment will not be the
    # one executing the step function code and therefore we don't need to
    # run any preparation
    if step.config.step_operator:
        execution_info = self._execute_step(component_launcher)
    else:
        stack.prepare_step_run(info=step_run_info)
        try:
            execution_info = self._execute_step(component_launcher)
        finally:
            stack.cleanup_step_run(info=step_run_info)

    return execution_info

BaseOrchestratorConfig (StackComponentConfig) pydantic-model

Base orchestrator config.

Source code in zenml/orchestrators/base_orchestrator.py
class BaseOrchestratorConfig(StackComponentConfig):
    """Base orchestrator config."""

    @root_validator(pre=True)
    def _deprecations(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        """Validate and/or remove deprecated fields.

        Args:
            values: The values to validate.

        Returns:
            The validated values.
        """
        if "custom_docker_base_image_name" in values:
            image_name = values.pop("custom_docker_base_image_name", None)
            if image_name:
                logger.warning(
                    "The 'custom_docker_base_image_name' field has been "
                    "deprecated. To use a custom base container image with your "
                    "orchestrators, please use the DockerSettings in your "
                    "pipeline (see https://docs.zenml.io/advanced-guide/pipelines/containerization)."
                )

        return values

BaseOrchestratorFlavor (Flavor)

Base orchestrator flavor class.

Source code in zenml/orchestrators/base_orchestrator.py
class BaseOrchestratorFlavor(Flavor):
    """Base orchestrator flavor class."""

    @property
    def type(self) -> StackComponentType:
        """Returns the flavor type.

        Returns:
            The flavor type.
        """
        return StackComponentType.ORCHESTRATOR

    @property
    def config_class(self) -> Type[BaseOrchestratorConfig]:
        """Config class for the base orchestrator flavor.

        Returns:
            The config class.
        """
        return BaseOrchestratorConfig

    @property
    @abstractmethod
    def implementation_class(self) -> Type["BaseOrchestrator"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
config_class: Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig] property readonly

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig]

The config class.

implementation_class: Type[BaseOrchestrator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[BaseOrchestrator]

The implementation class.

type: StackComponentType property readonly

Returns the flavor type.

Returns:

Type Description
StackComponentType

The flavor type.

local special

Initialization for the local orchestrator.

local_orchestrator

Implementation of the ZenML local orchestrator.

LocalOrchestrator (BaseOrchestrator)

Orchestrator responsible for running pipelines locally.

This orchestrator does not allow for concurrent execution of steps and also does not support running on a schedule.

Source code in zenml/orchestrators/local/local_orchestrator.py
class LocalOrchestrator(BaseOrchestrator):
    """Orchestrator responsible for running pipelines locally.

    This orchestrator does not allow for concurrent execution of steps and also
    does not support running on a schedule.
    """

    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
    ) -> Any:
        """Iterates through all steps and executes them sequentially.

        Args:
            deployment: The pipeline deployment to prepare or run.
            stack: The stack on which the pipeline is deployed.
        """
        if deployment.schedule:
            logger.warning(
                "Local Orchestrator currently does not support the "
                "use of schedules. The `schedule` will be ignored "
                "and the pipeline will be run immediately."
            )

        # Run each step
        for step in deployment.steps.values():
            if self.requires_resources_in_orchestration_environment(step):
                logger.warning(
                    "Specifying step resources is not supported for the local "
                    "orchestrator, ignoring resource configuration for "
                    "step %s.",
                    step.config.name,
                )

            self.run_step(
                step=step,
            )
prepare_or_run_pipeline(self, deployment, stack)

Iterates through all steps and executes them sequentially.

Parameters:

Name Type Description Default
deployment PipelineDeployment

The pipeline deployment to prepare or run.

required
stack Stack

The stack on which the pipeline is deployed.

required
Source code in zenml/orchestrators/local/local_orchestrator.py
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeployment",
    stack: "Stack",
) -> Any:
    """Iterates through all steps and executes them sequentially.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack on which the pipeline is deployed.
    """
    if deployment.schedule:
        logger.warning(
            "Local Orchestrator currently does not support the "
            "use of schedules. The `schedule` will be ignored "
            "and the pipeline will be run immediately."
        )

    # Run each step
    for step in deployment.steps.values():
        if self.requires_resources_in_orchestration_environment(step):
            logger.warning(
                "Specifying step resources is not supported for the local "
                "orchestrator, ignoring resource configuration for "
                "step %s.",
                step.config.name,
            )

        self.run_step(
            step=step,
        )
LocalOrchestratorConfig (BaseOrchestratorConfig) pydantic-model

Local orchestrator config.

Source code in zenml/orchestrators/local/local_orchestrator.py
class LocalOrchestratorConfig(BaseOrchestratorConfig):
    """Local orchestrator config."""

    @property
    def is_local(self) -> bool:
        """Checks if this stack component is running locally.

        This designation is used to determine if the stack component can be
        shared with other users or if it is only usable on the local host.

        Returns:
            True if this config is for a local component, False otherwise.
        """
        return True
is_local: bool property readonly

Checks if this stack component is running locally.

This designation is used to determine if the stack component can be shared with other users or if it is only usable on the local host.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

LocalOrchestratorFlavor (BaseOrchestratorFlavor)

Class for the LocalOrchestratorFlavor.

Source code in zenml/orchestrators/local/local_orchestrator.py
class LocalOrchestratorFlavor(BaseOrchestratorFlavor):
    """Class for the `LocalOrchestratorFlavor`."""

    @property
    def name(self) -> str:
        """The flavor name.

        Returns:
            The flavor name.
        """
        return "local"

    @property
    def config_class(self) -> Type[BaseOrchestratorConfig]:
        """Config class for the base orchestrator flavor.

        Returns:
            The config class.
        """
        return LocalOrchestratorConfig

    @property
    def implementation_class(self) -> Type[LocalOrchestrator]:
        """Implementation class for this flavor.

        Returns:
            The implementation class for this flavor.
        """
        return LocalOrchestrator
config_class: Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig] property readonly

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig]

The config class.

implementation_class: Type[zenml.orchestrators.local.local_orchestrator.LocalOrchestrator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[zenml.orchestrators.local.local_orchestrator.LocalOrchestrator]

The implementation class for this flavor.

name: str property readonly

The flavor name.

Returns:

Type Description
str

The flavor name.

local_docker special

Initialization for the local Docker orchestrator.

local_docker_orchestrator

Implementation of the ZenML local Docker orchestrator.

LocalDockerOrchestrator (BaseOrchestrator)

Orchestrator responsible for running pipelines locally using Docker.

This orchestrator does not allow for concurrent execution of steps and also does not support running on a schedule.

Source code in zenml/orchestrators/local_docker/local_docker_orchestrator.py
class LocalDockerOrchestrator(BaseOrchestrator):
    """Orchestrator responsible for running pipelines locally using Docker.

    This orchestrator does not allow for concurrent execution of steps and also
    does not support running on a schedule.
    """

    def prepare_pipeline_deployment(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
    ) -> None:
        """Build a Docker image and (maybe) push it to the container registry.

        Args:
            deployment: The pipeline deployment configuration.
            stack: The stack on which the pipeline will be deployed.
        """
        docker_image_builder = PipelineDockerImageBuilder()
        if stack.container_registry:
            repo_digest = docker_image_builder.build_and_push_docker_image(
                deployment=deployment, stack=stack
            )
            deployment.add_extra(ORCHESTRATOR_DOCKER_IMAGE_KEY, repo_digest)
        else:
            # If there is no container registry, we only build the image
            target_image_name = docker_image_builder.get_target_image_name(
                deployment=deployment
            )
            docker_image_builder.build_docker_image(
                target_image_name=target_image_name,
                deployment=deployment,
                stack=stack,
            )
            deployment.add_extra(
                ORCHESTRATOR_DOCKER_IMAGE_KEY, target_image_name
            )

    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
    ) -> Any:
        """Sequentially runs all pipeline steps in local Docker containers.

        Args:
            deployment: The pipeline deployment to prepare or run.
            stack: The stack the pipeline will run on.
        """
        if deployment.schedule:
            logger.warning(
                "Local Docker Orchestrator currently does not support the"
                "use of schedules. The `schedule` will be ignored "
                "and the pipeline will be run immediately."
            )

        from docker.client import DockerClient

        docker_client = DockerClient.from_env()
        image_name = deployment.pipeline.extra[ORCHESTRATOR_DOCKER_IMAGE_KEY]
        entrypoint = StepEntrypointConfiguration.get_entrypoint_command()

        # Add the local stores path as a volume mount
        stack.check_local_paths()
        local_stores_path = GlobalConfiguration().local_stores_path
        volumes = {
            local_stores_path: {
                "bind": local_stores_path,
                "mode": "rw",
            }
        }
        env = {ENV_ZENML_LOCAL_STORES_PATH: local_stores_path}

        # Run each step
        for step_name, step in deployment.steps.items():
            if self.requires_resources_in_orchestration_environment(step):
                logger.warning(
                    "Specifying step resources is not supported for the local "
                    "Docker orchestrator, ignoring resource configuration for "
                    "step %s.",
                    step.config.name,
                )

            arguments = StepEntrypointConfiguration.get_entrypoint_arguments(
                step_name=step_name
            )
            user = None
            if sys.platform != "win32":
                user = os.getuid()
            logger.info("Running step `%s` in Docker:", step_name)
            logs = docker_client.containers.run(
                image=image_name,
                entrypoint=entrypoint,
                command=arguments,
                user=user,
                volumes=volumes,
                environment=env,
                stream=True,
            )

            for line in logs:
                logger.info(line.strip().decode())
prepare_or_run_pipeline(self, deployment, stack)

Sequentially runs all pipeline steps in local Docker containers.

Parameters:

Name Type Description Default
deployment PipelineDeployment

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
Source code in zenml/orchestrators/local_docker/local_docker_orchestrator.py
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeployment",
    stack: "Stack",
) -> Any:
    """Sequentially runs all pipeline steps in local Docker containers.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
    """
    if deployment.schedule:
        logger.warning(
            "Local Docker Orchestrator currently does not support the"
            "use of schedules. The `schedule` will be ignored "
            "and the pipeline will be run immediately."
        )

    from docker.client import DockerClient

    docker_client = DockerClient.from_env()
    image_name = deployment.pipeline.extra[ORCHESTRATOR_DOCKER_IMAGE_KEY]
    entrypoint = StepEntrypointConfiguration.get_entrypoint_command()

    # Add the local stores path as a volume mount
    stack.check_local_paths()
    local_stores_path = GlobalConfiguration().local_stores_path
    volumes = {
        local_stores_path: {
            "bind": local_stores_path,
            "mode": "rw",
        }
    }
    env = {ENV_ZENML_LOCAL_STORES_PATH: local_stores_path}

    # Run each step
    for step_name, step in deployment.steps.items():
        if self.requires_resources_in_orchestration_environment(step):
            logger.warning(
                "Specifying step resources is not supported for the local "
                "Docker orchestrator, ignoring resource configuration for "
                "step %s.",
                step.config.name,
            )

        arguments = StepEntrypointConfiguration.get_entrypoint_arguments(
            step_name=step_name
        )
        user = None
        if sys.platform != "win32":
            user = os.getuid()
        logger.info("Running step `%s` in Docker:", step_name)
        logs = docker_client.containers.run(
            image=image_name,
            entrypoint=entrypoint,
            command=arguments,
            user=user,
            volumes=volumes,
            environment=env,
            stream=True,
        )

        for line in logs:
            logger.info(line.strip().decode())
prepare_pipeline_deployment(self, deployment, stack)

Build a Docker image and (maybe) push it to the container registry.

Parameters:

Name Type Description Default
deployment PipelineDeployment

The pipeline deployment configuration.

required
stack Stack

The stack on which the pipeline will be deployed.

required
Source code in zenml/orchestrators/local_docker/local_docker_orchestrator.py
def prepare_pipeline_deployment(
    self,
    deployment: "PipelineDeployment",
    stack: "Stack",
) -> None:
    """Build a Docker image and (maybe) push it to the container registry.

    Args:
        deployment: The pipeline deployment configuration.
        stack: The stack on which the pipeline will be deployed.
    """
    docker_image_builder = PipelineDockerImageBuilder()
    if stack.container_registry:
        repo_digest = docker_image_builder.build_and_push_docker_image(
            deployment=deployment, stack=stack
        )
        deployment.add_extra(ORCHESTRATOR_DOCKER_IMAGE_KEY, repo_digest)
    else:
        # If there is no container registry, we only build the image
        target_image_name = docker_image_builder.get_target_image_name(
            deployment=deployment
        )
        docker_image_builder.build_docker_image(
            target_image_name=target_image_name,
            deployment=deployment,
            stack=stack,
        )
        deployment.add_extra(
            ORCHESTRATOR_DOCKER_IMAGE_KEY, target_image_name
        )
LocalDockerOrchestratorConfig (BaseOrchestratorConfig) pydantic-model

Local Docker orchestrator config.

Source code in zenml/orchestrators/local_docker/local_docker_orchestrator.py
class LocalDockerOrchestratorConfig(BaseOrchestratorConfig):
    """Local Docker orchestrator config."""

    @property
    def is_local(self) -> bool:
        """Checks if this stack component is running locally.

        This designation is used to determine if the stack component can be
        shared with other users or if it is only usable on the local host.

        Returns:
            True if this config is for a local component, False otherwise.
        """
        return True
is_local: bool property readonly

Checks if this stack component is running locally.

This designation is used to determine if the stack component can be shared with other users or if it is only usable on the local host.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

LocalDockerOrchestratorFlavor (BaseOrchestratorFlavor)

Flavor for the local Docker orchestrator.

Source code in zenml/orchestrators/local_docker/local_docker_orchestrator.py
class LocalDockerOrchestratorFlavor(BaseOrchestratorFlavor):
    """Flavor for the local Docker orchestrator."""

    @property
    def name(self) -> str:
        """Name of the orchestrator flavor.

        Returns:
            Name of the orchestrator flavor.
        """
        return "local_docker"

    @property
    def config_class(self) -> Type[BaseOrchestratorConfig]:
        """Config class for the base orchestrator flavor.

        Returns:
            The config class.
        """
        return LocalDockerOrchestratorConfig

    @property
    def implementation_class(self) -> Type["LocalDockerOrchestrator"]:
        """Implementation class for this flavor.

        Returns:
            Implementation class for this flavor.
        """
        return LocalDockerOrchestrator
config_class: Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig] property readonly

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig]

The config class.

implementation_class: Type[LocalDockerOrchestrator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[LocalDockerOrchestrator]

Implementation class for this flavor.

name: str property readonly

Name of the orchestrator flavor.

Returns:

Type Description
str

Name of the orchestrator flavor.

utils

Utility functions for the orchestrator.

get_cache_status(execution_info)

Returns whether a cached execution was used or not.

Parameters:

Name Type Description Default
execution_info Optional[tfx.orchestration.portable.data_types.ExecutionInfo]

The execution info.

required

Returns:

Type Description
bool

True if the execution was cached, False otherwise.

Source code in zenml/orchestrators/utils.py
def get_cache_status(
    execution_info: Optional[data_types.ExecutionInfo],
) -> bool:
    """Returns whether a cached execution was used or not.

    Args:
        execution_info: The execution info.

    Returns:
        `True` if the execution was cached, `False` otherwise.
    """
    # An execution output URI is only provided if the step needs to be
    # executed (= is not cached)
    if execution_info and execution_info.execution_output_uri is None:
        return True
    else:
        return False