Orchestrators
zenml.orchestrators
special
Initialization for ZenML orchestrators.
An orchestrator is a special kind of backend that manages the running of each step of the pipeline. Orchestrators administer the actual pipeline runs. You can think of it as the 'root' of any pipeline job that you run during your experimentation.
ZenML supports a local orchestrator out of the box which allows you to run your pipelines in a local environment. We also support using Apache Airflow as the orchestrator to handle the steps of your pipeline.
base_orchestrator
Base orchestrator class.
BaseOrchestrator (StackComponent, ABC)
Base class for all orchestrators.
In order to implement an orchestrator you will need to subclass from this class.
How it works:
The run(...)
method is the entrypoint that is executed when the
pipeline's run method is called within the user code
(pipeline_instance.run(...)
).
This method will do some internal preparation and then call the
prepare_or_run_pipeline(...)
method. BaseOrchestrator subclasses must
implement this method and either run the pipeline steps directly or deploy
the pipeline to some remote infrastructure.
Source code in zenml/orchestrators/base_orchestrator.py
class BaseOrchestrator(StackComponent, ABC):
"""Base class for all orchestrators.
In order to implement an orchestrator you will need to subclass from this
class.
How it works:
-------------
The `run(...)` method is the entrypoint that is executed when the
pipeline's run method is called within the user code
(`pipeline_instance.run(...)`).
This method will do some internal preparation and then call the
`prepare_or_run_pipeline(...)` method. BaseOrchestrator subclasses must
implement this method and either run the pipeline steps directly or deploy
the pipeline to some remote infrastructure.
"""
# Class Configuration
TYPE: ClassVar[StackComponentType] = StackComponentType.ORCHESTRATOR
_active_deployment: Optional["PipelineDeployment"] = None
_active_pb2_pipeline: Optional[Pb2Pipeline] = None
@property
def config(self) -> BaseOrchestratorConfig:
"""Returns the `BaseOrchestratorConfig` config.
Returns:
The configuration.
"""
return cast(BaseOrchestratorConfig, self._config)
@abstractmethod
def get_orchestrator_run_id(self) -> str:
"""Returns the run id of the active orchestrator run.
Important: This needs to be a unique ID and return the same value for
all steps of a pipeline run.
Returns:
The orchestrator run id.
"""
@abstractmethod
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> Any:
"""This method needs to be implemented by the respective orchestrator.
Depending on the type of orchestrator you'll have to perform slightly
different operations.
Simple Case:
------------
The Steps are run directly from within the same environment in which
the orchestrator code is executed. In this case you will need to
deal with implementation-specific runtime configurations (like the
schedule) and then iterate through the steps and finally call
`self.run_step(...)` to execute each step.
Advanced Case:
--------------
Most orchestrators will not run the steps directly. Instead, they
build some intermediate representation of the pipeline that is then
used to create and run the pipeline and its steps on the target
environment. For such orchestrators this method will have to build
this representation and deploy it.
Regardless of the implementation details, the orchestrator will need
to run each step in the target environment. For this the
`self.run_step(...)` method should be used.
The easiest way to make this work is by using an entrypoint
configuration to run single steps (`zenml.entrypoints.step_entrypoint_configuration.StepEntrypointConfiguration`)
or entire pipelines (`zenml.entrypoints.pipeline_entrypoint_configuration.PipelineEntrypointConfiguration`).
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
Returns:
The optional return value from this method will be returned by the
`pipeline_instance.run()` call when someone is running a pipeline.
"""
def run(self, deployment: "PipelineDeployment", stack: "Stack") -> Any:
"""Runs a pipeline on a stack.
Args:
deployment: The pipeline deployment.
stack: The stack on which to run the pipeline.
Returns:
Orchestrator-specific return value.
"""
self._prepare_run(deployment=deployment)
result = self.prepare_or_run_pipeline(
deployment=deployment, stack=stack
)
self._cleanup_run()
return result
def run_step(self, step: "Step") -> Optional[data_types.ExecutionInfo]:
"""This sets up a component launcher and executes the given step.
Args:
step: The step to be executed
Returns:
The execution info of the step.
"""
assert self._active_deployment
assert self._active_pb2_pipeline
self._ensure_artifact_classes_loaded(step.config)
step_name = step.config.name
pb2_pipeline = self._active_pb2_pipeline
run_model = self._create_or_reuse_run()
# Substitute the runtime parameter to be a concrete run_id, it is
# important for this to be unique for each run.
runtime_parameter_utils.substitute_runtime_parameter(
pb2_pipeline,
{PIPELINE_RUN_ID_PARAMETER_NAME: run_model.name},
)
# Extract the deployment_configs and use it to access the executor and
# custom driver spec
deployment_config = runner_utils.extract_local_deployment_config(
pb2_pipeline
)
executor_spec = runner_utils.extract_executor_spec(
deployment_config, step_name
)
custom_driver_spec = runner_utils.extract_custom_driver_spec(
deployment_config, step_name
)
metadata_connection_cfg = Client().zen_store.get_metadata_config()
executor_operator = self._get_executor_operator(
step_operator=step.config.step_operator
)
custom_executor_operators = {
executable_spec_pb2.PythonClassExecutableSpec: executor_operator
}
step_run_info = StepRunInfo(
config=step.config,
pipeline=self._active_deployment.pipeline,
run_name=run_model.name,
)
# The protobuf node for the current step is loaded here.
pipeline_node = self._get_node_with_step_name(step_name)
stack = Client().active_stack
proto_utils.add_mlmd_contexts(
pipeline_node=pipeline_node,
step=step,
deployment=self._active_deployment,
stack=stack,
)
component_launcher = launcher.Launcher(
pipeline_node=pipeline_node,
mlmd_connection=metadata.Metadata(metadata_connection_cfg),
pipeline_info=pb2_pipeline.pipeline_info,
pipeline_runtime_spec=pb2_pipeline.runtime_spec,
executor_spec=executor_spec,
custom_driver_spec=custom_driver_spec,
custom_executor_operators=custom_executor_operators,
)
# If a step operator is used, the current environment will not be the
# one executing the step function code and therefore we don't need to
# run any preparation
if step.config.step_operator:
execution_info = self._execute_step(component_launcher)
else:
stack.prepare_step_run(info=step_run_info)
try:
execution_info = self._execute_step(component_launcher)
except: # noqa: E722
self._publish_failed_run(run_name_or_id=run_model.name)
raise
finally:
stack.cleanup_step_run(info=step_run_info)
return execution_info
@staticmethod
def requires_resources_in_orchestration_environment(
step: "Step",
) -> bool:
"""Checks if the orchestrator should run this step on special resources.
Args:
step: The step that will be checked.
Returns:
True if the step requires special resources in the orchestration
environment, False otherwise.
"""
# If the step requires custom resources and doesn't run with a step
# operator, it would need these requirements in the orchestrator
# environment
if step.config.step_operator:
return False
return not step.config.resource_settings.empty
def _prepare_run(self, deployment: "PipelineDeployment") -> None:
"""Prepares a run.
Args:
deployment: The deployment to prepare.
"""
self._active_deployment = deployment
pb2_pipeline = Pb2Pipeline()
pb2_pipeline_json = string_utils.b64_decode(
self._active_deployment.proto_pipeline
)
json_format.Parse(pb2_pipeline_json, pb2_pipeline)
self._active_pb2_pipeline = pb2_pipeline
def _cleanup_run(self) -> None:
"""Cleans up the active run."""
self._active_deployment = None
self._active_pb2_pipeline = None
def get_run_id_for_orchestrator_run_id(
self, orchestrator_run_id: str
) -> UUID:
"""Generates a run ID from an orchestrator run id.
Args:
orchestrator_run_id: The orchestrator run id.
Returns:
The run id generated from the orchestrator run id.
"""
run_id_seed = f"{self.id}-{orchestrator_run_id}"
return uuid_utils.generate_uuid_from_string(run_id_seed)
def _create_or_reuse_run(self) -> PipelineRunModel:
"""Creates a run or reuses an existing one.
Returns:
The created or existing run.
"""
assert self._active_deployment
orchestrator_run_id = self.get_orchestrator_run_id()
run_id = self.get_run_id_for_orchestrator_run_id(orchestrator_run_id)
date = datetime.now().strftime("%Y_%m_%d")
time = datetime.now().strftime("%H_%M_%S_%f")
run_name = self._active_deployment.run_name.format(date=date, time=time)
logger.debug("Creating run with ID: %s, name: %s", run_id, run_name)
client = Client()
run_model = PipelineRunModel(
id=run_id,
name=run_name,
orchestrator_run_id=orchestrator_run_id,
user=client.active_user.id,
project=client.active_project.id,
stack_id=self._active_deployment.stack_id,
pipeline_id=self._active_deployment.pipeline_id,
status=ExecutionStatus.RUNNING,
pipeline_configuration=self._active_deployment.pipeline.dict(),
num_steps=len(self._active_deployment.steps),
)
return client.zen_store.get_or_create_run(run_model)
@staticmethod
def _publish_failed_run(run_name_or_id: Union[str, UUID]) -> None:
"""Set run status to failed.
Args:
run_name_or_id: The name or ID of the run that failed.
"""
client = Client()
run = client.zen_store.get_run(run_name_or_id)
run.status = ExecutionStatus.FAILED
client.zen_store.update_run(run)
@staticmethod
def _ensure_artifact_classes_loaded(
step_configuration: "StepConfiguration",
) -> None:
"""Ensures that all artifact classes for a step are loaded.
Args:
step_configuration: A step configuration.
"""
artifact_class_sources = set(
input_.artifact_source
for input_ in step_configuration.inputs.values()
) | set(
output.artifact_source
for output in step_configuration.outputs.values()
)
for source in artifact_class_sources:
# Tfx depends on these classes being loaded so it can detect the
# correct artifact class
source_utils.validate_source_class(
source, expected_class=BaseArtifact
)
@staticmethod
def _execute_step(
tfx_launcher: launcher.Launcher,
) -> Optional[data_types.ExecutionInfo]:
"""Executes a tfx component.
Args:
tfx_launcher: A tfx launcher to execute the component.
Returns:
Optional execution info returned by the launcher.
Raises:
RuntimeError: If the execution failed during preparation.
"""
pipeline_step_name = tfx_launcher._pipeline_node.node_info.id
start_time = time.time()
logger.info(f"Step `{pipeline_step_name}` has started.")
# There is no way to differentiate between a cached and a failed
# execution based on the execution info returned by the TFX launcher.
# We patch the _publish_failed_execution method in order to check
# if an execution failed.
execution_failed = False
original_publish_failed_execution = (
tfx_launcher._publish_failed_execution
)
def _new_publish_failed_execution(
self: launcher.Launcher, *args: Any, **kwargs: Any
) -> None:
original_publish_failed_execution(*args, **kwargs)
nonlocal execution_failed
execution_failed = True
setattr(
tfx_launcher,
"_publish_failed_execution",
types.MethodType(_new_publish_failed_execution, tfx_launcher),
)
execution_info = tfx_launcher.launch()
if execution_failed:
raise RuntimeError(
"Failed to execute step. This is probably because some input "
f"artifacts for the step {pipeline_step_name} could not be "
"found in the database."
)
if execution_info and get_cache_status(execution_info):
logger.info(f"Using cached version of `{pipeline_step_name}`.")
run_duration = time.time() - start_time
logger.info(
f"Step `{pipeline_step_name}` has finished in "
f"{string_utils.get_human_readable_time(run_duration)}."
)
return execution_info
@staticmethod
def _get_executor_operator(
step_operator: Optional[str],
) -> Type[BaseExecutorOperator]:
"""Gets the TFX executor operator for the given step operator.
Args:
step_operator: The optional step operator used to run a step.
Returns:
The executor operator for the given step operator.
"""
if step_operator:
from zenml.step_operators.step_executor_operator import (
StepExecutorOperator,
)
return StepExecutorOperator
else:
return PythonExecutorOperator
def _get_node_with_step_name(self, step_name: str) -> PipelineNode:
"""Given the name of a step, return the node with that name from the pb2_pipeline.
Args:
step_name: Name of the step
Returns:
PipelineNode instance
Raises:
KeyError: If the step name is not found in the pipeline.
"""
assert self._active_pb2_pipeline
for node in self._active_pb2_pipeline.nodes:
if (
node.WhichOneof("node") == "pipeline_node"
and node.pipeline_node.node_info.id == step_name
):
return node.pipeline_node
raise KeyError(
f"Step {step_name} not found in Pipeline "
f"{self._active_pb2_pipeline.pipeline_info.id}"
)
config: BaseOrchestratorConfig
property
readonly
Returns the BaseOrchestratorConfig
config.
Returns:
Type | Description |
---|---|
BaseOrchestratorConfig |
The configuration. |
get_orchestrator_run_id(self)
Returns the run id of the active orchestrator run.
Important: This needs to be a unique ID and return the same value for all steps of a pipeline run.
Returns:
Type | Description |
---|---|
str |
The orchestrator run id. |
Source code in zenml/orchestrators/base_orchestrator.py
@abstractmethod
def get_orchestrator_run_id(self) -> str:
"""Returns the run id of the active orchestrator run.
Important: This needs to be a unique ID and return the same value for
all steps of a pipeline run.
Returns:
The orchestrator run id.
"""
get_run_id_for_orchestrator_run_id(self, orchestrator_run_id)
Generates a run ID from an orchestrator run id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
orchestrator_run_id |
str |
The orchestrator run id. |
required |
Returns:
Type | Description |
---|---|
UUID |
The run id generated from the orchestrator run id. |
Source code in zenml/orchestrators/base_orchestrator.py
def get_run_id_for_orchestrator_run_id(
self, orchestrator_run_id: str
) -> UUID:
"""Generates a run ID from an orchestrator run id.
Args:
orchestrator_run_id: The orchestrator run id.
Returns:
The run id generated from the orchestrator run id.
"""
run_id_seed = f"{self.id}-{orchestrator_run_id}"
return uuid_utils.generate_uuid_from_string(run_id_seed)
prepare_or_run_pipeline(self, deployment, stack)
This method needs to be implemented by the respective orchestrator.
Depending on the type of orchestrator you'll have to perform slightly different operations.
Simple Case:
The Steps are run directly from within the same environment in which
the orchestrator code is executed. In this case you will need to
deal with implementation-specific runtime configurations (like the
schedule) and then iterate through the steps and finally call
self.run_step(...)
to execute each step.
Advanced Case:
Most orchestrators will not run the steps directly. Instead, they build some intermediate representation of the pipeline that is then used to create and run the pipeline and its steps on the target environment. For such orchestrators this method will have to build this representation and deploy it.
Regardless of the implementation details, the orchestrator will need
to run each step in the target environment. For this the
self.run_step(...)
method should be used.
The easiest way to make this work is by using an entrypoint
configuration to run single steps (zenml.entrypoints.step_entrypoint_configuration.StepEntrypointConfiguration
)
or entire pipelines (zenml.entrypoints.pipeline_entrypoint_configuration.PipelineEntrypointConfiguration
).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeployment |
The pipeline deployment to prepare or run. |
required |
stack |
Stack |
The stack the pipeline will run on. |
required |
Returns:
Type | Description |
---|---|
Any |
The optional return value from this method will be returned by the
|
Source code in zenml/orchestrators/base_orchestrator.py
@abstractmethod
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> Any:
"""This method needs to be implemented by the respective orchestrator.
Depending on the type of orchestrator you'll have to perform slightly
different operations.
Simple Case:
------------
The Steps are run directly from within the same environment in which
the orchestrator code is executed. In this case you will need to
deal with implementation-specific runtime configurations (like the
schedule) and then iterate through the steps and finally call
`self.run_step(...)` to execute each step.
Advanced Case:
--------------
Most orchestrators will not run the steps directly. Instead, they
build some intermediate representation of the pipeline that is then
used to create and run the pipeline and its steps on the target
environment. For such orchestrators this method will have to build
this representation and deploy it.
Regardless of the implementation details, the orchestrator will need
to run each step in the target environment. For this the
`self.run_step(...)` method should be used.
The easiest way to make this work is by using an entrypoint
configuration to run single steps (`zenml.entrypoints.step_entrypoint_configuration.StepEntrypointConfiguration`)
or entire pipelines (`zenml.entrypoints.pipeline_entrypoint_configuration.PipelineEntrypointConfiguration`).
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
Returns:
The optional return value from this method will be returned by the
`pipeline_instance.run()` call when someone is running a pipeline.
"""
requires_resources_in_orchestration_environment(step)
staticmethod
Checks if the orchestrator should run this step on special resources.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step |
Step |
The step that will be checked. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the step requires special resources in the orchestration environment, False otherwise. |
Source code in zenml/orchestrators/base_orchestrator.py
@staticmethod
def requires_resources_in_orchestration_environment(
step: "Step",
) -> bool:
"""Checks if the orchestrator should run this step on special resources.
Args:
step: The step that will be checked.
Returns:
True if the step requires special resources in the orchestration
environment, False otherwise.
"""
# If the step requires custom resources and doesn't run with a step
# operator, it would need these requirements in the orchestrator
# environment
if step.config.step_operator:
return False
return not step.config.resource_settings.empty
run(self, deployment, stack)
Runs a pipeline on a stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeployment |
The pipeline deployment. |
required |
stack |
Stack |
The stack on which to run the pipeline. |
required |
Returns:
Type | Description |
---|---|
Any |
Orchestrator-specific return value. |
Source code in zenml/orchestrators/base_orchestrator.py
def run(self, deployment: "PipelineDeployment", stack: "Stack") -> Any:
"""Runs a pipeline on a stack.
Args:
deployment: The pipeline deployment.
stack: The stack on which to run the pipeline.
Returns:
Orchestrator-specific return value.
"""
self._prepare_run(deployment=deployment)
result = self.prepare_or_run_pipeline(
deployment=deployment, stack=stack
)
self._cleanup_run()
return result
run_step(self, step)
This sets up a component launcher and executes the given step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step |
Step |
The step to be executed |
required |
Returns:
Type | Description |
---|---|
Optional[tfx.orchestration.portable.data_types.ExecutionInfo] |
The execution info of the step. |
Source code in zenml/orchestrators/base_orchestrator.py
def run_step(self, step: "Step") -> Optional[data_types.ExecutionInfo]:
"""This sets up a component launcher and executes the given step.
Args:
step: The step to be executed
Returns:
The execution info of the step.
"""
assert self._active_deployment
assert self._active_pb2_pipeline
self._ensure_artifact_classes_loaded(step.config)
step_name = step.config.name
pb2_pipeline = self._active_pb2_pipeline
run_model = self._create_or_reuse_run()
# Substitute the runtime parameter to be a concrete run_id, it is
# important for this to be unique for each run.
runtime_parameter_utils.substitute_runtime_parameter(
pb2_pipeline,
{PIPELINE_RUN_ID_PARAMETER_NAME: run_model.name},
)
# Extract the deployment_configs and use it to access the executor and
# custom driver spec
deployment_config = runner_utils.extract_local_deployment_config(
pb2_pipeline
)
executor_spec = runner_utils.extract_executor_spec(
deployment_config, step_name
)
custom_driver_spec = runner_utils.extract_custom_driver_spec(
deployment_config, step_name
)
metadata_connection_cfg = Client().zen_store.get_metadata_config()
executor_operator = self._get_executor_operator(
step_operator=step.config.step_operator
)
custom_executor_operators = {
executable_spec_pb2.PythonClassExecutableSpec: executor_operator
}
step_run_info = StepRunInfo(
config=step.config,
pipeline=self._active_deployment.pipeline,
run_name=run_model.name,
)
# The protobuf node for the current step is loaded here.
pipeline_node = self._get_node_with_step_name(step_name)
stack = Client().active_stack
proto_utils.add_mlmd_contexts(
pipeline_node=pipeline_node,
step=step,
deployment=self._active_deployment,
stack=stack,
)
component_launcher = launcher.Launcher(
pipeline_node=pipeline_node,
mlmd_connection=metadata.Metadata(metadata_connection_cfg),
pipeline_info=pb2_pipeline.pipeline_info,
pipeline_runtime_spec=pb2_pipeline.runtime_spec,
executor_spec=executor_spec,
custom_driver_spec=custom_driver_spec,
custom_executor_operators=custom_executor_operators,
)
# If a step operator is used, the current environment will not be the
# one executing the step function code and therefore we don't need to
# run any preparation
if step.config.step_operator:
execution_info = self._execute_step(component_launcher)
else:
stack.prepare_step_run(info=step_run_info)
try:
execution_info = self._execute_step(component_launcher)
except: # noqa: E722
self._publish_failed_run(run_name_or_id=run_model.name)
raise
finally:
stack.cleanup_step_run(info=step_run_info)
return execution_info
BaseOrchestratorConfig (StackComponentConfig)
pydantic-model
Base orchestrator config.
Source code in zenml/orchestrators/base_orchestrator.py
class BaseOrchestratorConfig(StackComponentConfig):
"""Base orchestrator config."""
@root_validator(pre=True)
def _deprecations(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and/or remove deprecated fields.
Args:
values: The values to validate.
Returns:
The validated values.
"""
if "custom_docker_base_image_name" in values:
image_name = values.pop("custom_docker_base_image_name", None)
if image_name:
logger.warning(
"The 'custom_docker_base_image_name' field has been "
"deprecated. To use a custom base container image with your "
"orchestrators, please use the DockerSettings in your "
"pipeline (see https://docs.zenml.io/advanced-guide/pipelines/containerization)."
)
return values
BaseOrchestratorFlavor (Flavor)
Base orchestrator flavor class.
Source code in zenml/orchestrators/base_orchestrator.py
class BaseOrchestratorFlavor(Flavor):
"""Base orchestrator flavor class."""
@property
def type(self) -> StackComponentType:
"""Returns the flavor type.
Returns:
The flavor type.
"""
return StackComponentType.ORCHESTRATOR
@property
def config_class(self) -> Type[BaseOrchestratorConfig]:
"""Config class for the base orchestrator flavor.
Returns:
The config class.
"""
return BaseOrchestratorConfig
@property
@abstractmethod
def implementation_class(self) -> Type["BaseOrchestrator"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
config_class: Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig]
property
readonly
Config class for the base orchestrator flavor.
Returns:
Type | Description |
---|---|
Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig] |
The config class. |
implementation_class: Type[BaseOrchestrator]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[BaseOrchestrator] |
The implementation class. |
type: StackComponentType
property
readonly
Returns the flavor type.
Returns:
Type | Description |
---|---|
StackComponentType |
The flavor type. |
local
special
Initialization for the local orchestrator.
local_orchestrator
Implementation of the ZenML local orchestrator.
LocalOrchestrator (BaseOrchestrator)
Orchestrator responsible for running pipelines locally.
This orchestrator does not allow for concurrent execution of steps and also does not support running on a schedule.
Source code in zenml/orchestrators/local/local_orchestrator.py
class LocalOrchestrator(BaseOrchestrator):
"""Orchestrator responsible for running pipelines locally.
This orchestrator does not allow for concurrent execution of steps and also
does not support running on a schedule.
"""
_orchestrator_run_id: Optional[str] = None
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> Any:
"""Iterates through all steps and executes them sequentially.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack on which the pipeline is deployed.
"""
if deployment.schedule:
logger.warning(
"Local Orchestrator currently does not support the "
"use of schedules. The `schedule` will be ignored "
"and the pipeline will be run immediately."
)
self._orchestrator_run_id = str(uuid4())
start_time = time.time()
# Run each step
for step in deployment.steps.values():
if self.requires_resources_in_orchestration_environment(step):
logger.warning(
"Specifying step resources is not supported for the local "
"orchestrator, ignoring resource configuration for "
"step %s.",
step.config.name,
)
self.run_step(
step=step,
)
run_duration = time.time() - start_time
run_id = self.get_run_id_for_orchestrator_run_id(
self._orchestrator_run_id
)
run_model = Client().zen_store.get_run(run_id)
logger.info(
"Pipeline run `%s` has finished in %s.",
run_model.name,
string_utils.get_human_readable_time(run_duration),
)
self._orchestrator_run_id = None
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If no run id exists. This happens when this method
gets called while the orchestrator is not running a pipeline.
Returns:
The orchestrator run id.
"""
if not self._orchestrator_run_id:
raise RuntimeError("No run id set.")
return self._orchestrator_run_id
get_orchestrator_run_id(self)
Returns the active orchestrator run id.
Exceptions:
Type | Description |
---|---|
RuntimeError |
If no run id exists. This happens when this method gets called while the orchestrator is not running a pipeline. |
Returns:
Type | Description |
---|---|
str |
The orchestrator run id. |
Source code in zenml/orchestrators/local/local_orchestrator.py
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If no run id exists. This happens when this method
gets called while the orchestrator is not running a pipeline.
Returns:
The orchestrator run id.
"""
if not self._orchestrator_run_id:
raise RuntimeError("No run id set.")
return self._orchestrator_run_id
prepare_or_run_pipeline(self, deployment, stack)
Iterates through all steps and executes them sequentially.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeployment |
The pipeline deployment to prepare or run. |
required |
stack |
Stack |
The stack on which the pipeline is deployed. |
required |
Source code in zenml/orchestrators/local/local_orchestrator.py
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> Any:
"""Iterates through all steps and executes them sequentially.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack on which the pipeline is deployed.
"""
if deployment.schedule:
logger.warning(
"Local Orchestrator currently does not support the "
"use of schedules. The `schedule` will be ignored "
"and the pipeline will be run immediately."
)
self._orchestrator_run_id = str(uuid4())
start_time = time.time()
# Run each step
for step in deployment.steps.values():
if self.requires_resources_in_orchestration_environment(step):
logger.warning(
"Specifying step resources is not supported for the local "
"orchestrator, ignoring resource configuration for "
"step %s.",
step.config.name,
)
self.run_step(
step=step,
)
run_duration = time.time() - start_time
run_id = self.get_run_id_for_orchestrator_run_id(
self._orchestrator_run_id
)
run_model = Client().zen_store.get_run(run_id)
logger.info(
"Pipeline run `%s` has finished in %s.",
run_model.name,
string_utils.get_human_readable_time(run_duration),
)
self._orchestrator_run_id = None
LocalOrchestratorConfig (BaseOrchestratorConfig)
pydantic-model
Local orchestrator config.
Source code in zenml/orchestrators/local/local_orchestrator.py
class LocalOrchestratorConfig(BaseOrchestratorConfig):
"""Local orchestrator config."""
@property
def is_local(self) -> bool:
"""Checks if this stack component is running locally.
This designation is used to determine if the stack component can be
shared with other users or if it is only usable on the local host.
Returns:
True if this config is for a local component, False otherwise.
"""
return True
is_local: bool
property
readonly
Checks if this stack component is running locally.
This designation is used to determine if the stack component can be shared with other users or if it is only usable on the local host.
Returns:
Type | Description |
---|---|
bool |
True if this config is for a local component, False otherwise. |
LocalOrchestratorFlavor (BaseOrchestratorFlavor)
Class for the LocalOrchestratorFlavor
.
Source code in zenml/orchestrators/local/local_orchestrator.py
class LocalOrchestratorFlavor(BaseOrchestratorFlavor):
"""Class for the `LocalOrchestratorFlavor`."""
@property
def name(self) -> str:
"""The flavor name.
Returns:
The flavor name.
"""
return "local"
@property
def config_class(self) -> Type[BaseOrchestratorConfig]:
"""Config class for the base orchestrator flavor.
Returns:
The config class.
"""
return LocalOrchestratorConfig
@property
def implementation_class(self) -> Type[LocalOrchestrator]:
"""Implementation class for this flavor.
Returns:
The implementation class for this flavor.
"""
return LocalOrchestrator
config_class: Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig]
property
readonly
Config class for the base orchestrator flavor.
Returns:
Type | Description |
---|---|
Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig] |
The config class. |
implementation_class: Type[zenml.orchestrators.local.local_orchestrator.LocalOrchestrator]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[zenml.orchestrators.local.local_orchestrator.LocalOrchestrator] |
The implementation class for this flavor. |
name: str
property
readonly
The flavor name.
Returns:
Type | Description |
---|---|
str |
The flavor name. |
local_docker
special
Initialization for the local Docker orchestrator.
local_docker_orchestrator
Implementation of the ZenML local Docker orchestrator.
LocalDockerOrchestrator (BaseOrchestrator)
Orchestrator responsible for running pipelines locally using Docker.
This orchestrator does not allow for concurrent execution of steps and also does not support running on a schedule.
Source code in zenml/orchestrators/local_docker/local_docker_orchestrator.py
class LocalDockerOrchestrator(BaseOrchestrator):
"""Orchestrator responsible for running pipelines locally using Docker.
This orchestrator does not allow for concurrent execution of steps and also
does not support running on a schedule.
"""
@property
def settings_class(self) -> Optional[Type["BaseSettings"]]:
"""Settings class for the Local Docker orchestrator.
Returns:
The settings class.
"""
return LocalDockerOrchestratorSettings
def prepare_pipeline_deployment(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> None:
"""Build a Docker image and (maybe) push it to the container registry.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
"""
docker_image_builder = PipelineDockerImageBuilder()
if stack.container_registry:
repo_digest = docker_image_builder.build_and_push_docker_image(
deployment=deployment, stack=stack
)
deployment.add_extra(ORCHESTRATOR_DOCKER_IMAGE_KEY, repo_digest)
else:
# If there is no container registry, we only build the image
target_image_name = docker_image_builder.get_target_image_name(
deployment=deployment
)
docker_image_builder.build_docker_image(
target_image_name=target_image_name,
deployment=deployment,
stack=stack,
)
deployment.add_extra(
ORCHESTRATOR_DOCKER_IMAGE_KEY, target_image_name
)
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If the environment variable specifying the run id
is not set.
Returns:
The orchestrator run id.
"""
try:
return os.environ[ENV_ZENML_DOCKER_ORCHESTRATOR_RUN_ID]
except KeyError:
raise RuntimeError(
"Unable to read run id from environment variable "
f"{ENV_ZENML_DOCKER_ORCHESTRATOR_RUN_ID}."
)
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> Any:
"""Sequentially runs all pipeline steps in local Docker containers.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
"""
if deployment.schedule:
logger.warning(
"Local Docker Orchestrator currently does not support the"
"use of schedules. The `schedule` will be ignored "
"and the pipeline will be run immediately."
)
from docker.client import DockerClient
docker_client = DockerClient.from_env()
image_name = deployment.pipeline.extra[ORCHESTRATOR_DOCKER_IMAGE_KEY]
entrypoint = StepEntrypointConfiguration.get_entrypoint_command()
# Add the local stores path as a volume mount
stack.check_local_paths()
local_stores_path = GlobalConfiguration().local_stores_path
volumes = {
local_stores_path: {
"bind": local_stores_path,
"mode": "rw",
}
}
orchestrator_run_id = str(uuid4())
environment = {
ENV_ZENML_DOCKER_ORCHESTRATOR_RUN_ID: orchestrator_run_id,
ENV_ZENML_LOCAL_STORES_PATH: local_stores_path,
}
start_time = time.time()
# Run each step
for step_name, step in deployment.steps.items():
if self.requires_resources_in_orchestration_environment(step):
logger.warning(
"Specifying step resources is not supported for the local "
"Docker orchestrator, ignoring resource configuration for "
"step %s.",
step.config.name,
)
arguments = StepEntrypointConfiguration.get_entrypoint_arguments(
step_name=step_name
)
settings = cast(
LocalDockerOrchestratorSettings,
self.get_settings(step),
)
user = None
if sys.platform != "win32":
user = os.getuid()
logger.info("Running step `%s` in Docker:", step_name)
logs = docker_client.containers.run(
image=image_name,
entrypoint=entrypoint,
command=arguments,
user=user,
volumes=volumes,
environment=environment,
stream=True,
extra_hosts={"host.docker.internal": "host-gateway"},
**settings.run_args,
)
for line in logs:
logger.info(line.strip().decode())
run_duration = time.time() - start_time
run_id = self.get_run_id_for_orchestrator_run_id(orchestrator_run_id)
run_model = Client().zen_store.get_run(run_id)
logger.info(
"Pipeline run `%s` has finished in %s.",
run_model.name,
string_utils.get_human_readable_time(run_duration),
)
settings_class: Optional[Type[BaseSettings]]
property
readonly
Settings class for the Local Docker orchestrator.
Returns:
Type | Description |
---|---|
Optional[Type[BaseSettings]] |
The settings class. |
get_orchestrator_run_id(self)
Returns the active orchestrator run id.
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the environment variable specifying the run id is not set. |
Returns:
Type | Description |
---|---|
str |
The orchestrator run id. |
Source code in zenml/orchestrators/local_docker/local_docker_orchestrator.py
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If the environment variable specifying the run id
is not set.
Returns:
The orchestrator run id.
"""
try:
return os.environ[ENV_ZENML_DOCKER_ORCHESTRATOR_RUN_ID]
except KeyError:
raise RuntimeError(
"Unable to read run id from environment variable "
f"{ENV_ZENML_DOCKER_ORCHESTRATOR_RUN_ID}."
)
prepare_or_run_pipeline(self, deployment, stack)
Sequentially runs all pipeline steps in local Docker containers.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeployment |
The pipeline deployment to prepare or run. |
required |
stack |
Stack |
The stack the pipeline will run on. |
required |
Source code in zenml/orchestrators/local_docker/local_docker_orchestrator.py
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> Any:
"""Sequentially runs all pipeline steps in local Docker containers.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
"""
if deployment.schedule:
logger.warning(
"Local Docker Orchestrator currently does not support the"
"use of schedules. The `schedule` will be ignored "
"and the pipeline will be run immediately."
)
from docker.client import DockerClient
docker_client = DockerClient.from_env()
image_name = deployment.pipeline.extra[ORCHESTRATOR_DOCKER_IMAGE_KEY]
entrypoint = StepEntrypointConfiguration.get_entrypoint_command()
# Add the local stores path as a volume mount
stack.check_local_paths()
local_stores_path = GlobalConfiguration().local_stores_path
volumes = {
local_stores_path: {
"bind": local_stores_path,
"mode": "rw",
}
}
orchestrator_run_id = str(uuid4())
environment = {
ENV_ZENML_DOCKER_ORCHESTRATOR_RUN_ID: orchestrator_run_id,
ENV_ZENML_LOCAL_STORES_PATH: local_stores_path,
}
start_time = time.time()
# Run each step
for step_name, step in deployment.steps.items():
if self.requires_resources_in_orchestration_environment(step):
logger.warning(
"Specifying step resources is not supported for the local "
"Docker orchestrator, ignoring resource configuration for "
"step %s.",
step.config.name,
)
arguments = StepEntrypointConfiguration.get_entrypoint_arguments(
step_name=step_name
)
settings = cast(
LocalDockerOrchestratorSettings,
self.get_settings(step),
)
user = None
if sys.platform != "win32":
user = os.getuid()
logger.info("Running step `%s` in Docker:", step_name)
logs = docker_client.containers.run(
image=image_name,
entrypoint=entrypoint,
command=arguments,
user=user,
volumes=volumes,
environment=environment,
stream=True,
extra_hosts={"host.docker.internal": "host-gateway"},
**settings.run_args,
)
for line in logs:
logger.info(line.strip().decode())
run_duration = time.time() - start_time
run_id = self.get_run_id_for_orchestrator_run_id(orchestrator_run_id)
run_model = Client().zen_store.get_run(run_id)
logger.info(
"Pipeline run `%s` has finished in %s.",
run_model.name,
string_utils.get_human_readable_time(run_duration),
)
prepare_pipeline_deployment(self, deployment, stack)
Build a Docker image and (maybe) push it to the container registry.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeployment |
The pipeline deployment configuration. |
required |
stack |
Stack |
The stack on which the pipeline will be deployed. |
required |
Source code in zenml/orchestrators/local_docker/local_docker_orchestrator.py
def prepare_pipeline_deployment(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> None:
"""Build a Docker image and (maybe) push it to the container registry.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
"""
docker_image_builder = PipelineDockerImageBuilder()
if stack.container_registry:
repo_digest = docker_image_builder.build_and_push_docker_image(
deployment=deployment, stack=stack
)
deployment.add_extra(ORCHESTRATOR_DOCKER_IMAGE_KEY, repo_digest)
else:
# If there is no container registry, we only build the image
target_image_name = docker_image_builder.get_target_image_name(
deployment=deployment
)
docker_image_builder.build_docker_image(
target_image_name=target_image_name,
deployment=deployment,
stack=stack,
)
deployment.add_extra(
ORCHESTRATOR_DOCKER_IMAGE_KEY, target_image_name
)
LocalDockerOrchestratorConfig (BaseOrchestratorConfig, LocalDockerOrchestratorSettings)
pydantic-model
Local Docker orchestrator config.
Source code in zenml/orchestrators/local_docker/local_docker_orchestrator.py
class LocalDockerOrchestratorConfig( # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
BaseOrchestratorConfig, LocalDockerOrchestratorSettings
):
"""Local Docker orchestrator config."""
@property
def is_local(self) -> bool:
"""Checks if this stack component is running locally.
This designation is used to determine if the stack component can be
shared with other users or if it is only usable on the local host.
Returns:
True if this config is for a local component, False otherwise.
"""
return True
is_local: bool
property
readonly
Checks if this stack component is running locally.
This designation is used to determine if the stack component can be shared with other users or if it is only usable on the local host.
Returns:
Type | Description |
---|---|
bool |
True if this config is for a local component, False otherwise. |
LocalDockerOrchestratorFlavor (BaseOrchestratorFlavor)
Flavor for the local Docker orchestrator.
Source code in zenml/orchestrators/local_docker/local_docker_orchestrator.py
class LocalDockerOrchestratorFlavor(BaseOrchestratorFlavor):
"""Flavor for the local Docker orchestrator."""
@property
def name(self) -> str:
"""Name of the orchestrator flavor.
Returns:
Name of the orchestrator flavor.
"""
return "local_docker"
@property
def config_class(self) -> Type[BaseOrchestratorConfig]:
"""Config class for the base orchestrator flavor.
Returns:
The config class.
"""
return LocalDockerOrchestratorConfig
@property
def implementation_class(self) -> Type["LocalDockerOrchestrator"]:
"""Implementation class for this flavor.
Returns:
Implementation class for this flavor.
"""
return LocalDockerOrchestrator
config_class: Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig]
property
readonly
Config class for the base orchestrator flavor.
Returns:
Type | Description |
---|---|
Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig] |
The config class. |
implementation_class: Type[LocalDockerOrchestrator]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[LocalDockerOrchestrator] |
Implementation class for this flavor. |
name: str
property
readonly
Name of the orchestrator flavor.
Returns:
Type | Description |
---|---|
str |
Name of the orchestrator flavor. |
LocalDockerOrchestratorSettings (BaseSettings)
pydantic-model
Local Docker orchestrator settings.
Attributes:
Name | Type | Description |
---|---|---|
run_args |
Dict[str, Any] |
Arguments to pass to the |
Source code in zenml/orchestrators/local_docker/local_docker_orchestrator.py
class LocalDockerOrchestratorSettings(BaseSettings):
"""Local Docker orchestrator settings.
Attributes:
run_args: Arguments to pass to the `docker run` call.
"""
run_args: Dict[str, Any] = {}
@validator("run_args", pre=True)
def _convert_json_string(
cls, value: Union[None, str, Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""Converts potential JSON strings passed via the CLI to dictionaries.
Args:
value: The value to convert.
Returns:
The converted value.
Raises:
TypeError: If the value is not a `str`, `Dict` or `None`.
ValueError: If the value is an invalid json string or a json string
that does not decode into a dictionary.
"""
if isinstance(value, str):
try:
dict_ = json.loads(value)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid json string '{value}'") from e
if not isinstance(dict_, Dict):
raise ValueError(
f"Json string '{value}' did not decode into a dictionary."
)
return dict_
elif isinstance(value, Dict) or value is None:
return value
else:
raise TypeError(f"{value} is not a json string or a dictionary.")
utils
Utility functions for the orchestrator.
get_cache_status(execution_info)
Returns whether a cached execution was used or not.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
execution_info |
Optional[tfx.orchestration.portable.data_types.ExecutionInfo] |
The execution info. |
required |
Returns:
Type | Description |
---|---|
bool |
|
Source code in zenml/orchestrators/utils.py
def get_cache_status(
execution_info: Optional[data_types.ExecutionInfo],
) -> bool:
"""Returns whether a cached execution was used or not.
Args:
execution_info: The execution info.
Returns:
`True` if the execution was cached, `False` otherwise.
"""
# An execution output URI is only provided if the step needs to be
# executed (= is not cached)
if execution_info and execution_info.execution_output_uri is None:
return True
else:
return False
get_orchestrator_run_name(pipeline_name)
Gets an orchestrator run name.
This run name is not the same as the ZenML run name but can instead be used to display in the orchestrator UI.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name |
str |
Name of the pipeline that will run. |
required |
Returns:
Type | Description |
---|---|
str |
The orchestrator run name. |
Source code in zenml/orchestrators/utils.py
def get_orchestrator_run_name(pipeline_name: str) -> str:
"""Gets an orchestrator run name.
This run name is not the same as the ZenML run name but can instead be
used to display in the orchestrator UI.
Args:
pipeline_name: Name of the pipeline that will run.
Returns:
The orchestrator run name.
"""
user_name = Client().active_user.name
return f"{pipeline_name}_{user_name}_{random.Random().getrandbits(32):08x}"