Orchestrators
        zenml.orchestrators
  
      special
  
    Initialization for ZenML orchestrators.
An orchestrator is a special kind of backend that manages the running of each step of the pipeline. Orchestrators administer the actual pipeline runs. You can think of it as the 'root' of any pipeline job that you run during your experimentation.
ZenML supports a local orchestrator out of the box which allows you to run your pipelines in a local environment. We also support using Apache Airflow as the orchestrator to handle the steps of your pipeline.
        base_orchestrator
    Base orchestrator class.
        
BaseOrchestrator            (StackComponent, ABC)
        
    Base class for all orchestrators.
In order to implement an orchestrator you will need to subclass from this class.
How it works:
The run(...) method is the entrypoint that is executed when the
pipeline's run method is called within the user code
(pipeline_instance.run(...)).
This method will do some internal preparation and then call the
prepare_or_run_pipeline(...) method. BaseOrchestrator subclasses must
implement this method and either run the pipeline steps directly or deploy
the pipeline to some remote infrastructure.
Source code in zenml/orchestrators/base_orchestrator.py
          class BaseOrchestrator(StackComponent, ABC):
    """Base class for all orchestrators.
    In order to implement an orchestrator you will need to subclass from this
    class.
    How it works:
    -------------
    The `run(...)` method is the entrypoint that is executed when the
    pipeline's run method is called within the user code
    (`pipeline_instance.run(...)`).
    This method will do some internal preparation and then call the
    `prepare_or_run_pipeline(...)` method. BaseOrchestrator subclasses must
    implement this method and either run the pipeline steps directly or deploy
    the pipeline to some remote infrastructure.
    """
    # Class Configuration
    TYPE: ClassVar[StackComponentType] = StackComponentType.ORCHESTRATOR
    _active_deployment: Optional["PipelineDeployment"] = None
    _active_pb2_pipeline: Optional[Pb2Pipeline] = None
    @property
    def config(self) -> BaseOrchestratorConfig:
        """Returns the `BaseOrchestratorConfig` config.
        Returns:
            The configuration.
        """
        return cast(BaseOrchestratorConfig, self._config)
    @abstractmethod
    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
    ) -> Any:
        """This method needs to be implemented by the respective orchestrator.
        Depending on the type of orchestrator you'll have to perform slightly
        different operations.
        Simple Case:
        ------------
        The Steps are run directly from within the same environment in which
        the orchestrator code is executed. In this case you will need to
        deal with implementation-specific runtime configurations (like the
        schedule) and then iterate through the steps and finally call
        `self.run_step(...)` to execute each step.
        Advanced Case:
        --------------
        Most orchestrators will not run the steps directly. Instead, they
        build some intermediate representation of the pipeline that is then
        used to create and run the pipeline and its steps on the target
        environment. For such orchestrators this method will have to build
        this representation and deploy it.
        Regardless of the implementation details, the orchestrator will need
        to run each step in the target environment. For this the
        `self.run_step(...)` method should be used.
        The easiest way to make this work is by using an entrypoint
        configuration to run single steps (`zenml.entrypoints.step_entrypoint_configuration.StepEntrypointConfiguration`)
        or entire pipelines (`zenml.entrypoints.pipeline_entrypoint_configuration.PipelineEntrypointConfiguration`).
        Args:
            deployment: The pipeline deployment to prepare or run.
            stack: The stack the pipeline will run on.
        Returns:
            The optional return value from this method will be returned by the
            `pipeline_instance.run()` call when someone is running a pipeline.
        """
    def run(self, deployment: "PipelineDeployment", stack: "Stack") -> Any:
        """Runs a pipeline on a stack.
        Args:
            deployment: The pipeline deployment.
            stack: The stack on which to run the pipeline.
        Returns:
            Orchestrator-specific return value.
        """
        self._prepare_run(deployment=deployment)
        result = self.prepare_or_run_pipeline(
            deployment=deployment, stack=stack
        )
        self._cleanup_run()
        return result
    def run_step(
        self, step: "Step", run_name: Optional[str] = None
    ) -> Optional[data_types.ExecutionInfo]:
        """This sets up a component launcher and executes the given step.
        Args:
            step: The step to be executed
            run_name: The unique run name
        Returns:
            The execution info of the step.
        """
        assert self._active_deployment
        assert self._active_pb2_pipeline
        self._ensure_artifact_classes_loaded(step.config)
        step_name = step.config.name
        pb2_pipeline = self._active_pb2_pipeline
        run_name = run_name or self._active_deployment.run_name
        # Substitute the runtime parameter to be a concrete run_id, it is
        # important for this to be unique for each run.
        runtime_parameter_utils.substitute_runtime_parameter(
            pb2_pipeline,
            {PIPELINE_RUN_ID_PARAMETER_NAME: run_name},
        )
        # Extract the deployment_configs and use it to access the executor and
        # custom driver spec
        deployment_config = runner_utils.extract_local_deployment_config(
            pb2_pipeline
        )
        executor_spec = runner_utils.extract_executor_spec(
            deployment_config, step_name
        )
        custom_driver_spec = runner_utils.extract_custom_driver_spec(
            deployment_config, step_name
        )
        metadata_connection_cfg = Client().zen_store.get_metadata_config()
        # At this point the active metadata store is queried for the
        # metadata_connection
        stack = Client().active_stack
        executor_operator = self._get_executor_operator(
            step_operator=step.config.step_operator
        )
        custom_executor_operators = {
            executable_spec_pb2.PythonClassExecutableSpec: executor_operator
        }
        step_run_info = StepRunInfo(
            config=step.config,
            pipeline=self._active_deployment.pipeline,
            run_name=run_name,
        )
        # The protobuf node for the current step is loaded here.
        pipeline_node = self._get_node_with_step_name(step_name)
        proto_utils.add_mlmd_contexts(
            pipeline_node=pipeline_node,
            step=step,
            deployment=self._active_deployment,
            stack=stack,
        )
        component_launcher = launcher.Launcher(
            pipeline_node=pipeline_node,
            mlmd_connection=metadata.Metadata(metadata_connection_cfg),
            pipeline_info=pb2_pipeline.pipeline_info,
            pipeline_runtime_spec=pb2_pipeline.runtime_spec,
            executor_spec=executor_spec,
            custom_driver_spec=custom_driver_spec,
            custom_executor_operators=custom_executor_operators,
        )
        # If a step operator is used, the current environment will not be the
        # one executing the step function code and therefore we don't need to
        # run any preparation
        if step.config.step_operator:
            execution_info = self._execute_step(component_launcher)
        else:
            stack.prepare_step_run(info=step_run_info)
            try:
                execution_info = self._execute_step(component_launcher)
            finally:
                stack.cleanup_step_run(info=step_run_info)
        return execution_info
    @staticmethod
    def requires_resources_in_orchestration_environment(
        step: "Step",
    ) -> bool:
        """Checks if the orchestrator should run this step on special resources.
        Args:
            step: The step that will be checked.
        Returns:
            True if the step requires special resources in the orchestration
            environment, False otherwise.
        """
        # If the step requires custom resources and doesn't run with a step
        # operator, it would need these requirements in the orchestrator
        # environment
        if step.config.step_operator:
            return False
        return not step.config.resource_settings.empty
    def _prepare_run(self, deployment: "PipelineDeployment") -> None:
        """Prepares a run.
        Args:
            deployment: The deployment to prepare.
        """
        self._active_deployment = deployment
        pb2_pipeline = Pb2Pipeline()
        pb2_pipeline_json = string_utils.b64_decode(
            self._active_deployment.proto_pipeline
        )
        json_format.Parse(pb2_pipeline_json, pb2_pipeline)
        self._active_pb2_pipeline = pb2_pipeline
    def _cleanup_run(self) -> None:
        """Cleans up the active run."""
        self._active_deployment = None
        self._active_pb2_pipeline = None
    @staticmethod
    def _ensure_artifact_classes_loaded(
        step_configuration: "StepConfiguration",
    ) -> None:
        """Ensures that all artifact classes for a step are loaded.
        Args:
            step_configuration: A step configuration.
        """
        artifact_class_sources = set(
            input_.artifact_source
            for input_ in step_configuration.inputs.values()
        ) | set(
            output.artifact_source
            for output in step_configuration.outputs.values()
        )
        for source in artifact_class_sources:
            # Tfx depends on these classes being loaded so it can detect the
            # correct artifact class
            source_utils.validate_source_class(
                source, expected_class=BaseArtifact
            )
    @staticmethod
    def _execute_step(
        tfx_launcher: launcher.Launcher,
    ) -> Optional[data_types.ExecutionInfo]:
        """Executes a tfx component.
        Args:
            tfx_launcher: A tfx launcher to execute the component.
        Returns:
            Optional execution info returned by the launcher.
        """
        pipeline_step_name = tfx_launcher._pipeline_node.node_info.id
        start_time = time.time()
        logger.info(f"Step `{pipeline_step_name}` has started.")
        execution_info = tfx_launcher.launch()
        if execution_info and get_cache_status(execution_info):
            logger.info(f"Using cached version of `{pipeline_step_name}`.")
        run_duration = time.time() - start_time
        logger.info(
            f"Step `{pipeline_step_name}` has finished in "
            f"{string_utils.get_human_readable_time(run_duration)}."
        )
        return execution_info
    @staticmethod
    def _get_executor_operator(
        step_operator: Optional[str],
    ) -> Type[BaseExecutorOperator]:
        """Gets the TFX executor operator for the given step operator.
        Args:
            step_operator: The optional step operator used to run a step.
        Returns:
            The executor operator for the given step operator.
        """
        if step_operator:
            from zenml.step_operators.step_executor_operator import (
                StepExecutorOperator,
            )
            return StepExecutorOperator
        else:
            return PythonExecutorOperator
    def _get_node_with_step_name(self, step_name: str) -> PipelineNode:
        """Given the name of a step, return the node with that name from the pb2_pipeline.
        Args:
            step_name: Name of the step
        Returns:
            PipelineNode instance
        Raises:
            KeyError: If the step name is not found in the pipeline.
        """
        assert self._active_pb2_pipeline
        for node in self._active_pb2_pipeline.nodes:
            if (
                node.WhichOneof("node") == "pipeline_node"
                and node.pipeline_node.node_info.id == step_name
            ):
                return node.pipeline_node
        raise KeyError(
            f"Step {step_name} not found in Pipeline "
            f"{self._active_pb2_pipeline.pipeline_info.id}"
        )
config: BaseOrchestratorConfig
  
      property
      readonly
  
    Returns the BaseOrchestratorConfig config.
Returns:
| Type | Description | 
|---|---|
| BaseOrchestratorConfig | The configuration. | 
prepare_or_run_pipeline(self, deployment, stack)
    This method needs to be implemented by the respective orchestrator.
Depending on the type of orchestrator you'll have to perform slightly different operations.
Simple Case:
The Steps are run directly from within the same environment in which
the orchestrator code is executed. In this case you will need to
deal with implementation-specific runtime configurations (like the
schedule) and then iterate through the steps and finally call
self.run_step(...) to execute each step.
Advanced Case:
Most orchestrators will not run the steps directly. Instead, they build some intermediate representation of the pipeline that is then used to create and run the pipeline and its steps on the target environment. For such orchestrators this method will have to build this representation and deploy it.
Regardless of the implementation details, the orchestrator will need
to run each step in the target environment. For this the
self.run_step(...) method should be used.
The easiest way to make this work is by using an entrypoint
configuration to run single steps (zenml.entrypoints.step_entrypoint_configuration.StepEntrypointConfiguration)
or entire pipelines (zenml.entrypoints.pipeline_entrypoint_configuration.PipelineEntrypointConfiguration).
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| deployment | PipelineDeployment | The pipeline deployment to prepare or run. | required | 
| stack | Stack | The stack the pipeline will run on. | required | 
Returns:
| Type | Description | 
|---|---|
| Any | The optional return value from this method will be returned by the
 | 
Source code in zenml/orchestrators/base_orchestrator.py
          @abstractmethod
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeployment",
    stack: "Stack",
) -> Any:
    """This method needs to be implemented by the respective orchestrator.
    Depending on the type of orchestrator you'll have to perform slightly
    different operations.
    Simple Case:
    ------------
    The Steps are run directly from within the same environment in which
    the orchestrator code is executed. In this case you will need to
    deal with implementation-specific runtime configurations (like the
    schedule) and then iterate through the steps and finally call
    `self.run_step(...)` to execute each step.
    Advanced Case:
    --------------
    Most orchestrators will not run the steps directly. Instead, they
    build some intermediate representation of the pipeline that is then
    used to create and run the pipeline and its steps on the target
    environment. For such orchestrators this method will have to build
    this representation and deploy it.
    Regardless of the implementation details, the orchestrator will need
    to run each step in the target environment. For this the
    `self.run_step(...)` method should be used.
    The easiest way to make this work is by using an entrypoint
    configuration to run single steps (`zenml.entrypoints.step_entrypoint_configuration.StepEntrypointConfiguration`)
    or entire pipelines (`zenml.entrypoints.pipeline_entrypoint_configuration.PipelineEntrypointConfiguration`).
    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
    Returns:
        The optional return value from this method will be returned by the
        `pipeline_instance.run()` call when someone is running a pipeline.
    """
requires_resources_in_orchestration_environment(step)
  
      staticmethod
  
    Checks if the orchestrator should run this step on special resources.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| step | Step | The step that will be checked. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | True if the step requires special resources in the orchestration environment, False otherwise. | 
Source code in zenml/orchestrators/base_orchestrator.py
          @staticmethod
def requires_resources_in_orchestration_environment(
    step: "Step",
) -> bool:
    """Checks if the orchestrator should run this step on special resources.
    Args:
        step: The step that will be checked.
    Returns:
        True if the step requires special resources in the orchestration
        environment, False otherwise.
    """
    # If the step requires custom resources and doesn't run with a step
    # operator, it would need these requirements in the orchestrator
    # environment
    if step.config.step_operator:
        return False
    return not step.config.resource_settings.empty
run(self, deployment, stack)
    Runs a pipeline on a stack.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| deployment | PipelineDeployment | The pipeline deployment. | required | 
| stack | Stack | The stack on which to run the pipeline. | required | 
Returns:
| Type | Description | 
|---|---|
| Any | Orchestrator-specific return value. | 
Source code in zenml/orchestrators/base_orchestrator.py
          def run(self, deployment: "PipelineDeployment", stack: "Stack") -> Any:
    """Runs a pipeline on a stack.
    Args:
        deployment: The pipeline deployment.
        stack: The stack on which to run the pipeline.
    Returns:
        Orchestrator-specific return value.
    """
    self._prepare_run(deployment=deployment)
    result = self.prepare_or_run_pipeline(
        deployment=deployment, stack=stack
    )
    self._cleanup_run()
    return result
run_step(self, step, run_name=None)
    This sets up a component launcher and executes the given step.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| step | Step | The step to be executed | required | 
| run_name | Optional[str] | The unique run name | None | 
Returns:
| Type | Description | 
|---|---|
| Optional[tfx.orchestration.portable.data_types.ExecutionInfo] | The execution info of the step. | 
Source code in zenml/orchestrators/base_orchestrator.py
          def run_step(
    self, step: "Step", run_name: Optional[str] = None
) -> Optional[data_types.ExecutionInfo]:
    """This sets up a component launcher and executes the given step.
    Args:
        step: The step to be executed
        run_name: The unique run name
    Returns:
        The execution info of the step.
    """
    assert self._active_deployment
    assert self._active_pb2_pipeline
    self._ensure_artifact_classes_loaded(step.config)
    step_name = step.config.name
    pb2_pipeline = self._active_pb2_pipeline
    run_name = run_name or self._active_deployment.run_name
    # Substitute the runtime parameter to be a concrete run_id, it is
    # important for this to be unique for each run.
    runtime_parameter_utils.substitute_runtime_parameter(
        pb2_pipeline,
        {PIPELINE_RUN_ID_PARAMETER_NAME: run_name},
    )
    # Extract the deployment_configs and use it to access the executor and
    # custom driver spec
    deployment_config = runner_utils.extract_local_deployment_config(
        pb2_pipeline
    )
    executor_spec = runner_utils.extract_executor_spec(
        deployment_config, step_name
    )
    custom_driver_spec = runner_utils.extract_custom_driver_spec(
        deployment_config, step_name
    )
    metadata_connection_cfg = Client().zen_store.get_metadata_config()
    # At this point the active metadata store is queried for the
    # metadata_connection
    stack = Client().active_stack
    executor_operator = self._get_executor_operator(
        step_operator=step.config.step_operator
    )
    custom_executor_operators = {
        executable_spec_pb2.PythonClassExecutableSpec: executor_operator
    }
    step_run_info = StepRunInfo(
        config=step.config,
        pipeline=self._active_deployment.pipeline,
        run_name=run_name,
    )
    # The protobuf node for the current step is loaded here.
    pipeline_node = self._get_node_with_step_name(step_name)
    proto_utils.add_mlmd_contexts(
        pipeline_node=pipeline_node,
        step=step,
        deployment=self._active_deployment,
        stack=stack,
    )
    component_launcher = launcher.Launcher(
        pipeline_node=pipeline_node,
        mlmd_connection=metadata.Metadata(metadata_connection_cfg),
        pipeline_info=pb2_pipeline.pipeline_info,
        pipeline_runtime_spec=pb2_pipeline.runtime_spec,
        executor_spec=executor_spec,
        custom_driver_spec=custom_driver_spec,
        custom_executor_operators=custom_executor_operators,
    )
    # If a step operator is used, the current environment will not be the
    # one executing the step function code and therefore we don't need to
    # run any preparation
    if step.config.step_operator:
        execution_info = self._execute_step(component_launcher)
    else:
        stack.prepare_step_run(info=step_run_info)
        try:
            execution_info = self._execute_step(component_launcher)
        finally:
            stack.cleanup_step_run(info=step_run_info)
    return execution_info
        
BaseOrchestratorConfig            (StackComponentConfig)
        
  
      pydantic-model
  
    Base orchestrator config.
Source code in zenml/orchestrators/base_orchestrator.py
          class BaseOrchestratorConfig(StackComponentConfig):
    """Base orchestrator config."""
    @root_validator(pre=True)
    def _deprecations(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        """Validate and/or remove deprecated fields.
        Args:
            values: The values to validate.
        Returns:
            The validated values.
        """
        if "custom_docker_base_image_name" in values:
            image_name = values.pop("custom_docker_base_image_name", None)
            if image_name:
                logger.warning(
                    "The 'custom_docker_base_image_name' field has been "
                    "deprecated. To use a custom base container image with your "
                    "orchestrators, please use the DockerSettings in your "
                    "pipeline (see https://docs.zenml.io/advanced-guide/pipelines/containerization)."
                )
        return values
        
BaseOrchestratorFlavor            (Flavor)
        
    Base orchestrator flavor class.
Source code in zenml/orchestrators/base_orchestrator.py
          class BaseOrchestratorFlavor(Flavor):
    """Base orchestrator flavor class."""
    @property
    def type(self) -> StackComponentType:
        """Returns the flavor type.
        Returns:
            The flavor type.
        """
        return StackComponentType.ORCHESTRATOR
    @property
    def config_class(self) -> Type[BaseOrchestratorConfig]:
        """Config class for the base orchestrator flavor.
        Returns:
            The config class.
        """
        return BaseOrchestratorConfig
    @property
    @abstractmethod
    def implementation_class(self) -> Type["BaseOrchestrator"]:
        """Implementation class for this flavor.
        Returns:
            The implementation class.
        """
config_class: Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig]
  
      property
      readonly
  
    Config class for the base orchestrator flavor.
Returns:
| Type | Description | 
|---|---|
| Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig] | The config class. | 
implementation_class: Type[BaseOrchestrator]
  
      property
      readonly
  
    Implementation class for this flavor.
Returns:
| Type | Description | 
|---|---|
| Type[BaseOrchestrator] | The implementation class. | 
type: StackComponentType
  
      property
      readonly
  
    Returns the flavor type.
Returns:
| Type | Description | 
|---|---|
| StackComponentType | The flavor type. | 
        local
  
      special
  
    Initialization for the local orchestrator.
        local_orchestrator
    Implementation of the ZenML local orchestrator.
        
LocalOrchestrator            (BaseOrchestrator)
        
    Orchestrator responsible for running pipelines locally.
This orchestrator does not allow for concurrent execution of steps and also does not support running on a schedule.
Source code in zenml/orchestrators/local/local_orchestrator.py
          class LocalOrchestrator(BaseOrchestrator):
    """Orchestrator responsible for running pipelines locally.
    This orchestrator does not allow for concurrent execution of steps and also
    does not support running on a schedule.
    """
    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
    ) -> Any:
        """Iterates through all steps and executes them sequentially.
        Args:
            deployment: The pipeline deployment to prepare or run.
            stack: The stack on which the pipeline is deployed.
        """
        if deployment.schedule:
            logger.warning(
                "Local Orchestrator currently does not support the"
                "use of schedules. The `schedule` will be ignored "
                "and the pipeline will be run immediately."
            )
        # Run each step
        for step in deployment.steps.values():
            if self.requires_resources_in_orchestration_environment(step):
                logger.warning(
                    "Specifying step resources is not supported for the local "
                    "orchestrator, ignoring resource configuration for "
                    "step %s.",
                    step.config.name,
                )
            self.run_step(
                step=step,
            )
prepare_or_run_pipeline(self, deployment, stack)
    Iterates through all steps and executes them sequentially.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| deployment | PipelineDeployment | The pipeline deployment to prepare or run. | required | 
| stack | Stack | The stack on which the pipeline is deployed. | required | 
Source code in zenml/orchestrators/local/local_orchestrator.py
          def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeployment",
    stack: "Stack",
) -> Any:
    """Iterates through all steps and executes them sequentially.
    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack on which the pipeline is deployed.
    """
    if deployment.schedule:
        logger.warning(
            "Local Orchestrator currently does not support the"
            "use of schedules. The `schedule` will be ignored "
            "and the pipeline will be run immediately."
        )
    # Run each step
    for step in deployment.steps.values():
        if self.requires_resources_in_orchestration_environment(step):
            logger.warning(
                "Specifying step resources is not supported for the local "
                "orchestrator, ignoring resource configuration for "
                "step %s.",
                step.config.name,
            )
        self.run_step(
            step=step,
        )
        
LocalOrchestratorConfig            (BaseOrchestratorConfig)
        
  
      pydantic-model
  
    Local orchestrator config.
Source code in zenml/orchestrators/local/local_orchestrator.py
          class LocalOrchestratorConfig(BaseOrchestratorConfig):
    """Local orchestrator config."""
    @property
    def is_local(self) -> bool:
        """Checks if this stack component is running locally.
        This designation is used to determine if the stack component can be
        shared with other users or if it is only usable on the local host.
        Returns:
            True if this config is for a local component, False otherwise.
        """
        return True
is_local: bool
  
      property
      readonly
  
    Checks if this stack component is running locally.
This designation is used to determine if the stack component can be shared with other users or if it is only usable on the local host.
Returns:
| Type | Description | 
|---|---|
| bool | True if this config is for a local component, False otherwise. | 
        
LocalOrchestratorFlavor            (BaseOrchestratorFlavor)
        
    Class for the LocalOrchestratorFlavor.
Source code in zenml/orchestrators/local/local_orchestrator.py
          class LocalOrchestratorFlavor(BaseOrchestratorFlavor):
    """Class for the `LocalOrchestratorFlavor`."""
    @property
    def name(self) -> str:
        """The flavor name.
        Returns:
            The flavor name.
        """
        return "local"
    @property
    def config_class(self) -> Type[BaseOrchestratorConfig]:
        """Config class for the base orchestrator flavor.
        Returns:
            The config class.
        """
        return LocalOrchestratorConfig
    @property
    def implementation_class(self) -> Type[LocalOrchestrator]:
        """Implementation class for this flavor.
        Returns:
            The implementation class for this flavor.
        """
        return LocalOrchestrator
config_class: Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig]
  
      property
      readonly
  
    Config class for the base orchestrator flavor.
Returns:
| Type | Description | 
|---|---|
| Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig] | The config class. | 
implementation_class: Type[zenml.orchestrators.local.local_orchestrator.LocalOrchestrator]
  
      property
      readonly
  
    Implementation class for this flavor.
Returns:
| Type | Description | 
|---|---|
| Type[zenml.orchestrators.local.local_orchestrator.LocalOrchestrator] | The implementation class for this flavor. | 
name: str
  
      property
      readonly
  
    The flavor name.
Returns:
| Type | Description | 
|---|---|
| str | The flavor name. | 
        local_docker
  
      special
  
    Initialization for the local Docker orchestrator.
        local_docker_orchestrator
    Implementation of the ZenML local Docker orchestrator.
        
LocalDockerOrchestrator            (BaseOrchestrator)
        
    Orchestrator responsible for running pipelines locally using Docker.
This orchestrator does not allow for concurrent execution of steps and also does not support running on a schedule.
Source code in zenml/orchestrators/local_docker/local_docker_orchestrator.py
          class LocalDockerOrchestrator(BaseOrchestrator):
    """Orchestrator responsible for running pipelines locally using Docker.
    This orchestrator does not allow for concurrent execution of steps and also
    does not support running on a schedule.
    """
    def prepare_pipeline_deployment(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
    ) -> None:
        """Build a Docker image and (maybe) push it to the container registry.
        Args:
            deployment: The pipeline deployment configuration.
            stack: The stack on which the pipeline will be deployed.
        """
        docker_image_builder = PipelineDockerImageBuilder()
        if stack.container_registry:
            repo_digest = docker_image_builder.build_and_push_docker_image(
                deployment=deployment, stack=stack
            )
            deployment.add_extra(ORCHESTRATOR_DOCKER_IMAGE_KEY, repo_digest)
        else:
            # If there is no container registry, we only build the image
            target_image_name = docker_image_builder.get_target_image_name(
                deployment=deployment
            )
            docker_image_builder.build_docker_image(
                target_image_name=target_image_name,
                deployment=deployment,
                stack=stack,
            )
            deployment.add_extra(
                ORCHESTRATOR_DOCKER_IMAGE_KEY, target_image_name
            )
    @staticmethod
    def _get_volumes(stack: "Stack") -> Dict[str, Dict[str, str]]:
        """Get mount volumes for all necessary local stack components.
        Args:
            stack: The stack on which the pipeline is running.
        Returns:
            List of volumes to mount.
        """
        volumes = {}
        # Add a volume for all local paths of stack components
        for stack_component in stack.components.values():
            local_path = stack_component.local_path
            if not local_path:
                continue
            volumes[local_path] = {"bind": local_path, "mode": "rw"}
        return volumes
    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
    ) -> Any:
        """Sequentially runs all pipeline steps in local Docker containers.
        Args:
            deployment: The pipeline deployment to prepare or run.
            stack: The stack the pipeline will run on.
        """
        if deployment.schedule:
            logger.warning(
                "Local Docker Orchestrator currently does not support the"
                "use of schedules. The `schedule` will be ignored "
                "and the pipeline will be run immediately."
            )
        from docker.client import DockerClient
        docker_client = DockerClient.from_env()
        image_name = deployment.pipeline.extra[ORCHESTRATOR_DOCKER_IMAGE_KEY]
        entrypoint = StepEntrypointConfiguration.get_entrypoint_command()
        # Run each step
        for step_name, step in deployment.steps.items():
            if self.requires_resources_in_orchestration_environment(step):
                logger.warning(
                    "Specifying step resources is not supported for the local "
                    "Docker orchestrator, ignoring resource configuration for "
                    "step %s.",
                    step.config.name,
                )
            arguments = StepEntrypointConfiguration.get_entrypoint_arguments(
                step_name=step_name
            )
            volumes = self._get_volumes(stack=stack)
            user = None
            if sys.platform != "win32":
                user = os.getuid()
            logger.info("Running step `%s` in Docker:", step_name)
            logs = docker_client.containers.run(
                image=image_name,
                entrypoint=entrypoint,
                command=arguments,
                user=user,
                volumes=volumes,
                stream=True,
            )
            for line in logs:
                logger.info(line.strip().decode())
prepare_or_run_pipeline(self, deployment, stack)
    Sequentially runs all pipeline steps in local Docker containers.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| deployment | PipelineDeployment | The pipeline deployment to prepare or run. | required | 
| stack | Stack | The stack the pipeline will run on. | required | 
Source code in zenml/orchestrators/local_docker/local_docker_orchestrator.py
          def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeployment",
    stack: "Stack",
) -> Any:
    """Sequentially runs all pipeline steps in local Docker containers.
    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
    """
    if deployment.schedule:
        logger.warning(
            "Local Docker Orchestrator currently does not support the"
            "use of schedules. The `schedule` will be ignored "
            "and the pipeline will be run immediately."
        )
    from docker.client import DockerClient
    docker_client = DockerClient.from_env()
    image_name = deployment.pipeline.extra[ORCHESTRATOR_DOCKER_IMAGE_KEY]
    entrypoint = StepEntrypointConfiguration.get_entrypoint_command()
    # Run each step
    for step_name, step in deployment.steps.items():
        if self.requires_resources_in_orchestration_environment(step):
            logger.warning(
                "Specifying step resources is not supported for the local "
                "Docker orchestrator, ignoring resource configuration for "
                "step %s.",
                step.config.name,
            )
        arguments = StepEntrypointConfiguration.get_entrypoint_arguments(
            step_name=step_name
        )
        volumes = self._get_volumes(stack=stack)
        user = None
        if sys.platform != "win32":
            user = os.getuid()
        logger.info("Running step `%s` in Docker:", step_name)
        logs = docker_client.containers.run(
            image=image_name,
            entrypoint=entrypoint,
            command=arguments,
            user=user,
            volumes=volumes,
            stream=True,
        )
        for line in logs:
            logger.info(line.strip().decode())
prepare_pipeline_deployment(self, deployment, stack)
    Build a Docker image and (maybe) push it to the container registry.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| deployment | PipelineDeployment | The pipeline deployment configuration. | required | 
| stack | Stack | The stack on which the pipeline will be deployed. | required | 
Source code in zenml/orchestrators/local_docker/local_docker_orchestrator.py
          def prepare_pipeline_deployment(
    self,
    deployment: "PipelineDeployment",
    stack: "Stack",
) -> None:
    """Build a Docker image and (maybe) push it to the container registry.
    Args:
        deployment: The pipeline deployment configuration.
        stack: The stack on which the pipeline will be deployed.
    """
    docker_image_builder = PipelineDockerImageBuilder()
    if stack.container_registry:
        repo_digest = docker_image_builder.build_and_push_docker_image(
            deployment=deployment, stack=stack
        )
        deployment.add_extra(ORCHESTRATOR_DOCKER_IMAGE_KEY, repo_digest)
    else:
        # If there is no container registry, we only build the image
        target_image_name = docker_image_builder.get_target_image_name(
            deployment=deployment
        )
        docker_image_builder.build_docker_image(
            target_image_name=target_image_name,
            deployment=deployment,
            stack=stack,
        )
        deployment.add_extra(
            ORCHESTRATOR_DOCKER_IMAGE_KEY, target_image_name
        )
        
LocalDockerOrchestratorConfig            (BaseOrchestratorConfig)
        
  
      pydantic-model
  
    Local Docker orchestrator config.
Source code in zenml/orchestrators/local_docker/local_docker_orchestrator.py
          class LocalDockerOrchestratorConfig(BaseOrchestratorConfig):
    """Local Docker orchestrator config."""
    @property
    def is_local(self) -> bool:
        """Checks if this stack component is running locally.
        This designation is used to determine if the stack component can be
        shared with other users or if it is only usable on the local host.
        Returns:
            True if this config is for a local component, False otherwise.
        """
        return True
is_local: bool
  
      property
      readonly
  
    Checks if this stack component is running locally.
This designation is used to determine if the stack component can be shared with other users or if it is only usable on the local host.
Returns:
| Type | Description | 
|---|---|
| bool | True if this config is for a local component, False otherwise. | 
        
LocalDockerOrchestratorFlavor            (BaseOrchestratorFlavor)
        
    Flavor for the local Docker orchestrator.
Source code in zenml/orchestrators/local_docker/local_docker_orchestrator.py
          class LocalDockerOrchestratorFlavor(BaseOrchestratorFlavor):
    """Flavor for the local Docker orchestrator."""
    @property
    def name(self) -> str:
        """Name of the orchestrator flavor.
        Returns:
            Name of the orchestrator flavor.
        """
        return "local_docker"
    @property
    def config_class(self) -> Type[BaseOrchestratorConfig]:
        """Config class for the base orchestrator flavor.
        Returns:
            The config class.
        """
        return LocalDockerOrchestratorConfig
    @property
    def implementation_class(self) -> Type["LocalDockerOrchestrator"]:
        """Implementation class for this flavor.
        Returns:
            Implementation class for this flavor.
        """
        return LocalDockerOrchestrator
config_class: Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig]
  
      property
      readonly
  
    Config class for the base orchestrator flavor.
Returns:
| Type | Description | 
|---|---|
| Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig] | The config class. | 
implementation_class: Type[LocalDockerOrchestrator]
  
      property
      readonly
  
    Implementation class for this flavor.
Returns:
| Type | Description | 
|---|---|
| Type[LocalDockerOrchestrator] | Implementation class for this flavor. | 
name: str
  
      property
      readonly
  
    Name of the orchestrator flavor.
Returns:
| Type | Description | 
|---|---|
| str | Name of the orchestrator flavor. | 
        utils
    Utility functions for the orchestrator.
get_cache_status(execution_info)
    Returns whether a cached execution was used or not.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| execution_info | Optional[tfx.orchestration.portable.data_types.ExecutionInfo] | The execution info. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | 
 | 
Source code in zenml/orchestrators/utils.py
          def get_cache_status(
    execution_info: Optional[data_types.ExecutionInfo],
) -> bool:
    """Returns whether a cached execution was used or not.
    Args:
        execution_info: The execution info.
    Returns:
        `True` if the execution was cached, `False` otherwise.
    """
    # An execution output URI is only provided if the step needs to be
    # executed (= is not cached)
    if execution_info and execution_info.execution_output_uri is None:
        return True
    else:
        return False