Orchestrators
zenml.orchestrators
special
An orchestrator is a special kind of backend that manages the running of each step of the pipeline. Orchestrators administer the actual pipeline runs. You can think of it as the 'root' of any pipeline job that you run during your experimentation.
ZenML supports a local orchestrator out of the box which allows you to run your pipelines in a local environment. We also support using Apache Airflow as the orchestrator to handle the steps of your pipeline.
base_orchestrator
BaseOrchestrator (StackComponent, ABC)
pydantic-model
Base class for all orchestrators. In order to implement an orchestrator you will need to subclass from this class.
How it works:
The run()
method is the entrypoint that is executed when the
pipeline's run method is called within the user code
(pipeline_instance.run()
).
This method will take the ZenML Pipeline instance and prepare it for eventual execution. To do this the following steps are taken:
-
The underlying protobuf pipeline is created.
-
Within the
_configure_node_context()
method the pipeline requirements, stack and runtime configuration is added to the step context -
The
_get_sorted_steps()
method then generates a sorted list of steps which will later be used to directly execute these steps in order, or to easily build a dag -
After these initial steps comes the most crucial one. Within the
prepare_or_run_pipeline()
method each orchestrator will have its own implementation that dictates the pipeline orchestration. In the simplest case this method will iterate through all steps and execute them one by one. In other cases this method will build and deploy an intermediate representation of the pipeline (e.g an airflow dag or a kubeflow pipelines yaml) to be executed within the orchestrators environment.
Building your own:
In order to build your own orchestrator, all you need to do is subclass
from this class and implement your own prepare_or_run_pipeline()
method. Overwriting other methods is NOT recommended but possible.
See the docstring of the prepare_or_run_pipeline()
method to find out
details of what needs to be implemented within it.
Source code in zenml/orchestrators/base_orchestrator.py
class BaseOrchestrator(StackComponent, ABC):
"""
Base class for all orchestrators. In order to implement an
orchestrator you will need to subclass from this class.
How it works:
-------------
The `run()` method is the entrypoint that is executed when the
pipeline's run method is called within the user code
(`pipeline_instance.run()`).
This method will take the ZenML Pipeline instance and prepare it for
eventual execution. To do this the following steps are taken:
* The underlying protobuf pipeline is created.
* Within the `_configure_node_context()` method the pipeline
requirements, stack and runtime configuration is added to the step
context
* The `_get_sorted_steps()` method then generates a sorted list of
steps which will later be used to directly execute these steps in order,
or to easily build a dag
* After these initial steps comes the most crucial one. Within the
`prepare_or_run_pipeline()` method each orchestrator will have its own
implementation that dictates the pipeline orchestration. In the simplest
case this method will iterate through all steps and execute them one by
one. In other cases this method will build and deploy an intermediate
representation of the pipeline (e.g an airflow dag or a kubeflow
pipelines yaml) to be executed within the orchestrators environment.
Building your own:
------------------
In order to build your own orchestrator, all you need to do is subclass
from this class and implement your own `prepare_or_run_pipeline()`
method. Overwriting other methods is NOT recommended but possible.
See the docstring of the `prepare_or_run_pipeline()` method to find out
details of what needs to be implemented within it.
"""
# Class Configuration
TYPE: ClassVar[StackComponentType] = StackComponentType.ORCHESTRATOR
@abstractmethod
def prepare_or_run_pipeline(
self,
sorted_steps: List[BaseStep],
pipeline: "BasePipeline",
pb2_pipeline: Pb2Pipeline,
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> Any:
"""
This method needs to be implemented by the respective orchestrator.
Depending on the type of orchestrator you'll have to perform slightly
different operations.
Simple Case:
------------
The Steps are run directly from within the same environment in which
the orchestrator code is executed. In this case you will need to
deal with implementation-specific runtime configurations (like the
schedule) and then iterate through each step and finally call
`self.run_step()` to execute each step.
Advanced Case:
--------------
Most orchestrators will not run the steps directly. Instead, they
build some intermediate representation of the pipeline that is then
used to create and run the pipeline and its steps on the target
environment. For such orchestrators this method will have to build
this representation and either deploy it directly or return it.
Regardless of the implementation details, the orchestrator will need
to a way to trigger each step in the target environment. For this
the `run_step()` method should be used.
In case the orchestrator is using docker containers for orchestration
of each step, the `zenml.entrypoints.step_entrypoint` module can be
used as a generalized entrypoint that sets up all the necessary
prerequisites, parses input parameters and finally executes the step
using the `run_step()`method.
If the orchestrator needs to know the upstream steps for a specific
step to build a DAG, it can use the `get_upstream_step_names()` method
to get them.
Args:
sorted_steps: List of sorted steps
pipeline: Zenml Pipeline instance
pb2_pipeline: Protobuf Pipeline instance
stack: The stack the pipeline was run on
runtime_configuration: The Runtime configuration of the current run
Returns:
The optional return value from this method will be returned by the
`pipeline_instance.run()` call when someone is running a pipeline.
"""
def run(
self,
pipeline: "BasePipeline",
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> Any:
"""Runs a pipeline. To do this, a protobuf pipeline is created, the
context of the individual steps is expanded to include relevant data,
the steps are sorted into execution order and the implementation
specific `prepare_or_run_pipeline()` method is called.
Args:
pipeline: The pipeline to run.
stack: The stack on which the pipeline is run.
runtime_configuration: Runtime configuration of the pipeline run.
Return:
The result of the call to `prepare_or_run_pipeline()`.
"""
# Create the protobuf pipeline which will be needed for various reasons
# in the following steps
pb2_pipeline: Pb2Pipeline = Compiler().compile(
create_tfx_pipeline(pipeline, stack=stack)
)
self._configure_node_context(
pipeline=pipeline,
pb2_pipeline=pb2_pipeline,
stack=stack,
runtime_configuration=runtime_configuration,
)
sorted_steps = self._get_sorted_steps(
pipeline=pipeline, pb2_pipeline=pb2_pipeline
)
result = self.prepare_or_run_pipeline(
sorted_steps=sorted_steps,
pipeline=pipeline,
pb2_pipeline=pb2_pipeline,
stack=stack,
runtime_configuration=runtime_configuration,
)
return result
@staticmethod
def _get_sorted_steps(
pipeline: "BasePipeline", pb2_pipeline: Pb2Pipeline
) -> List["BaseStep"]:
"""Get steps sorted in the execution order. This simplifies the
building of a DAG at a later stage as it can be built with one iteration
over this sorted list of steps.
Args:
pipeline: The pipeline
pb2_pipeline: The protobuf pipeline representation
Returns:
List of steps in execution order
"""
# Create a list of sorted steps
sorted_steps = []
for node in pb2_pipeline.nodes:
pipeline_node: PipelineNode = node.pipeline_node
sorted_steps.append(
get_step_for_node(
pipeline_node, steps=list(pipeline.steps.values())
)
)
return sorted_steps
def run_step(
self,
step: "BaseStep",
run_name: str,
pb2_pipeline: Pb2Pipeline,
) -> Optional[data_types.ExecutionInfo]:
"""This sets up a component launcher and executes the given step.
Args:
step: The step to be executed
run_name: The unique run name
pb2_pipeline: Protobuf Pipeline instance
"""
# Substitute the runtime parameter to be a concrete run_id, it is
# important for this to be unique for each run.
runtime_parameter_utils.substitute_runtime_parameter(
pb2_pipeline,
{PIPELINE_RUN_ID_PARAMETER_NAME: run_name},
)
# Extract the deployment_configs and use it to access the executor and
# custom driver spec
deployment_config = runner_utils.extract_local_deployment_config(
pb2_pipeline
)
executor_spec = runner_utils.extract_executor_spec(
deployment_config, step.name
)
custom_driver_spec = runner_utils.extract_custom_driver_spec(
deployment_config, step.name
)
# At this point the active metadata store is queried for the
# metadata_connection
repo = Repository()
metadata_store = repo.active_stack.metadata_store
metadata_connection = metadata.Metadata(
metadata_store.get_tfx_metadata_config()
)
custom_executor_operators = {
executable_spec_pb2.PythonClassExecutableSpec: step.executor_operator
}
# The protobuf node for the current step is loaded here.
pipeline_node = self._get_node_with_step_name(
step_name=step.name, pb2_pipeline=pb2_pipeline
)
# Create the tfx launcher responsible for executing the step.
component_launcher = launcher.Launcher(
pipeline_node=pipeline_node,
mlmd_connection=metadata_connection,
pipeline_info=pb2_pipeline.pipeline_info,
pipeline_runtime_spec=pb2_pipeline.runtime_spec,
executor_spec=executor_spec,
custom_driver_spec=custom_driver_spec,
custom_executor_operators=custom_executor_operators,
)
# In some stack configurations, some stack components (like experiment
# trackers) will run some code before and after the actual step run.
# This is where the step actually gets executed using the
# component_launcher
repo.active_stack.prepare_step_run()
execution_info = self._execute_step(component_launcher)
repo.active_stack.cleanup_step_run()
return execution_info
@staticmethod
def _execute_step(
tfx_launcher: launcher.Launcher,
) -> Optional[data_types.ExecutionInfo]:
"""Executes a tfx component.
Args:
tfx_launcher: A tfx launcher to execute the component.
Returns:
Optional execution info returned by the launcher.
"""
step_name_param = (
INTERNAL_EXECUTION_PARAMETER_PREFIX + PARAM_PIPELINE_PARAMETER_NAME
)
pipeline_step_name = tfx_launcher._pipeline_node.node_info.id
start_time = time.time()
logger.info(f"Step `{pipeline_step_name}` has started.")
try:
execution_info = tfx_launcher.launch()
if execution_info and get_cache_status(execution_info):
if execution_info.exec_properties:
step_name = json.loads(
execution_info.exec_properties[step_name_param]
)
logger.info(
f"Using cached version of `{pipeline_step_name}` "
f"[`{step_name}`].",
)
else:
logger.error(
f"No execution properties found for step "
f"`{pipeline_step_name}`."
)
except RuntimeError as e:
if "execution has already succeeded" in str(e):
# Hacky workaround to catch the error that a pipeline run with
# this name already exists. Raise an error with a more
# descriptive
# message instead.
raise DuplicateRunNameError()
else:
raise
run_duration = time.time() - start_time
logger.info(
f"Step `{pipeline_step_name}` has finished in "
f"{string_utils.get_human_readable_time(run_duration)}."
)
return execution_info
def get_upstream_step_names(
self, step: "BaseStep", pb2_pipeline: Pb2Pipeline
) -> List[str]:
"""Given a step, use the associated pb2 node to find the names of all
upstream nodes.
Args:
step: Instance of a Pipeline Step
pb2_pipeline: Protobuf Pipeline instance
Returns:
List of step names from direct upstream steps
"""
node = self._get_node_with_step_name(step.name, pb2_pipeline)
upstream_steps = []
for upstream_node in node.upstream_nodes:
upstream_steps.append(upstream_node)
return upstream_steps
@staticmethod
def _get_node_with_step_name(
step_name: str, pb2_pipeline: Pb2Pipeline
) -> PipelineNode:
"""Given the name of a step, return the node with that name from the
pb2_pipeline.
Args:
step_name: Name of the step
pb2_pipeline: pb2 pipeline containing nodes
Returns:
PipelineNode instance
"""
for node in pb2_pipeline.nodes:
if (
node.WhichOneof("node") == "pipeline_node"
and node.pipeline_node.node_info.id == step_name
):
return node.pipeline_node
raise KeyError(
f"Step {step_name} not found in Pipeline "
f"{pb2_pipeline.pipeline_info.id}"
)
@staticmethod
def _configure_node_context(
pipeline: "BasePipeline",
pb2_pipeline: Pb2Pipeline,
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> None:
"""Iterates through each node of a pb2_pipeline and attaches important
contexts to the nodes; namely pipeline.requirements, stack
information and the runtime configuration.
Args:
pipeline: Zenml Pipeline instance
pb2_pipeline: Protobuf Pipeline instance
stack: The stack the pipeline was run on
runtime_configuration: The Runtime configuration of the current run
"""
for node in pb2_pipeline.nodes:
pipeline_node: PipelineNode = node.pipeline_node
# Add pipeline requirements to the step context
requirements = " ".join(sorted(pipeline.requirements))
context_utils.add_context_to_node(
pipeline_node,
type_=MetadataContextTypes.PIPELINE_REQUIREMENTS.value,
name=str(hash(requirements)),
properties={"pipeline_requirements": requirements},
)
# Add the zenml stack to the step context
context_utils.add_context_to_node(
pipeline_node,
type_=MetadataContextTypes.STACK.value,
name=str(hash(json.dumps(stack.dict(), sort_keys=True))),
properties=stack.dict(),
)
# Add all pydantic objects from runtime_configuration to the context
context_utils.add_runtime_configuration_to_node(
pipeline_node, runtime_configuration
)
get_upstream_step_names(self, step, pb2_pipeline)
Given a step, use the associated pb2 node to find the names of all upstream nodes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step |
BaseStep |
Instance of a Pipeline Step |
required |
pb2_pipeline |
Pipeline |
Protobuf Pipeline instance |
required |
Returns:
Type | Description |
---|---|
List[str] |
List of step names from direct upstream steps |
Source code in zenml/orchestrators/base_orchestrator.py
def get_upstream_step_names(
self, step: "BaseStep", pb2_pipeline: Pb2Pipeline
) -> List[str]:
"""Given a step, use the associated pb2 node to find the names of all
upstream nodes.
Args:
step: Instance of a Pipeline Step
pb2_pipeline: Protobuf Pipeline instance
Returns:
List of step names from direct upstream steps
"""
node = self._get_node_with_step_name(step.name, pb2_pipeline)
upstream_steps = []
for upstream_node in node.upstream_nodes:
upstream_steps.append(upstream_node)
return upstream_steps
prepare_or_run_pipeline(self, sorted_steps, pipeline, pb2_pipeline, stack, runtime_configuration)
This method needs to be implemented by the respective orchestrator. Depending on the type of orchestrator you'll have to perform slightly different operations.
Simple Case:
The Steps are run directly from within the same environment in which
the orchestrator code is executed. In this case you will need to
deal with implementation-specific runtime configurations (like the
schedule) and then iterate through each step and finally call
self.run_step()
to execute each step.
Advanced Case:
Most orchestrators will not run the steps directly. Instead, they build some intermediate representation of the pipeline that is then used to create and run the pipeline and its steps on the target environment. For such orchestrators this method will have to build this representation and either deploy it directly or return it.
Regardless of the implementation details, the orchestrator will need
to a way to trigger each step in the target environment. For this
the run_step()
method should be used.
In case the orchestrator is using docker containers for orchestration
of each step, the zenml.entrypoints.step_entrypoint
module can be
used as a generalized entrypoint that sets up all the necessary
prerequisites, parses input parameters and finally executes the step
using the run_step()
method.
If the orchestrator needs to know the upstream steps for a specific
step to build a DAG, it can use the get_upstream_step_names()
method
to get them.
!!! args sorted_steps: List of sorted steps pipeline: Zenml Pipeline instance pb2_pipeline: Protobuf Pipeline instance stack: The stack the pipeline was run on runtime_configuration: The Runtime configuration of the current run
Returns:
Type | Description |
---|---|
Any |
The optional return value from this method will be returned by the
|
Source code in zenml/orchestrators/base_orchestrator.py
@abstractmethod
def prepare_or_run_pipeline(
self,
sorted_steps: List[BaseStep],
pipeline: "BasePipeline",
pb2_pipeline: Pb2Pipeline,
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> Any:
"""
This method needs to be implemented by the respective orchestrator.
Depending on the type of orchestrator you'll have to perform slightly
different operations.
Simple Case:
------------
The Steps are run directly from within the same environment in which
the orchestrator code is executed. In this case you will need to
deal with implementation-specific runtime configurations (like the
schedule) and then iterate through each step and finally call
`self.run_step()` to execute each step.
Advanced Case:
--------------
Most orchestrators will not run the steps directly. Instead, they
build some intermediate representation of the pipeline that is then
used to create and run the pipeline and its steps on the target
environment. For such orchestrators this method will have to build
this representation and either deploy it directly or return it.
Regardless of the implementation details, the orchestrator will need
to a way to trigger each step in the target environment. For this
the `run_step()` method should be used.
In case the orchestrator is using docker containers for orchestration
of each step, the `zenml.entrypoints.step_entrypoint` module can be
used as a generalized entrypoint that sets up all the necessary
prerequisites, parses input parameters and finally executes the step
using the `run_step()`method.
If the orchestrator needs to know the upstream steps for a specific
step to build a DAG, it can use the `get_upstream_step_names()` method
to get them.
Args:
sorted_steps: List of sorted steps
pipeline: Zenml Pipeline instance
pb2_pipeline: Protobuf Pipeline instance
stack: The stack the pipeline was run on
runtime_configuration: The Runtime configuration of the current run
Returns:
The optional return value from this method will be returned by the
`pipeline_instance.run()` call when someone is running a pipeline.
"""
run(self, pipeline, stack, runtime_configuration)
Runs a pipeline. To do this, a protobuf pipeline is created, the
context of the individual steps is expanded to include relevant data,
the steps are sorted into execution order and the implementation
specific prepare_or_run_pipeline()
method is called.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
BasePipeline |
The pipeline to run. |
required |
stack |
Stack |
The stack on which the pipeline is run. |
required |
runtime_configuration |
RuntimeConfiguration |
Runtime configuration of the pipeline run. |
required |
Returns:
Type | Description |
---|---|
Any |
The result of the call to |
Source code in zenml/orchestrators/base_orchestrator.py
def run(
self,
pipeline: "BasePipeline",
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> Any:
"""Runs a pipeline. To do this, a protobuf pipeline is created, the
context of the individual steps is expanded to include relevant data,
the steps are sorted into execution order and the implementation
specific `prepare_or_run_pipeline()` method is called.
Args:
pipeline: The pipeline to run.
stack: The stack on which the pipeline is run.
runtime_configuration: Runtime configuration of the pipeline run.
Return:
The result of the call to `prepare_or_run_pipeline()`.
"""
# Create the protobuf pipeline which will be needed for various reasons
# in the following steps
pb2_pipeline: Pb2Pipeline = Compiler().compile(
create_tfx_pipeline(pipeline, stack=stack)
)
self._configure_node_context(
pipeline=pipeline,
pb2_pipeline=pb2_pipeline,
stack=stack,
runtime_configuration=runtime_configuration,
)
sorted_steps = self._get_sorted_steps(
pipeline=pipeline, pb2_pipeline=pb2_pipeline
)
result = self.prepare_or_run_pipeline(
sorted_steps=sorted_steps,
pipeline=pipeline,
pb2_pipeline=pb2_pipeline,
stack=stack,
runtime_configuration=runtime_configuration,
)
return result
run_step(self, step, run_name, pb2_pipeline)
This sets up a component launcher and executes the given step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step |
BaseStep |
The step to be executed |
required |
run_name |
str |
The unique run name |
required |
pb2_pipeline |
Pipeline |
Protobuf Pipeline instance |
required |
Source code in zenml/orchestrators/base_orchestrator.py
def run_step(
self,
step: "BaseStep",
run_name: str,
pb2_pipeline: Pb2Pipeline,
) -> Optional[data_types.ExecutionInfo]:
"""This sets up a component launcher and executes the given step.
Args:
step: The step to be executed
run_name: The unique run name
pb2_pipeline: Protobuf Pipeline instance
"""
# Substitute the runtime parameter to be a concrete run_id, it is
# important for this to be unique for each run.
runtime_parameter_utils.substitute_runtime_parameter(
pb2_pipeline,
{PIPELINE_RUN_ID_PARAMETER_NAME: run_name},
)
# Extract the deployment_configs and use it to access the executor and
# custom driver spec
deployment_config = runner_utils.extract_local_deployment_config(
pb2_pipeline
)
executor_spec = runner_utils.extract_executor_spec(
deployment_config, step.name
)
custom_driver_spec = runner_utils.extract_custom_driver_spec(
deployment_config, step.name
)
# At this point the active metadata store is queried for the
# metadata_connection
repo = Repository()
metadata_store = repo.active_stack.metadata_store
metadata_connection = metadata.Metadata(
metadata_store.get_tfx_metadata_config()
)
custom_executor_operators = {
executable_spec_pb2.PythonClassExecutableSpec: step.executor_operator
}
# The protobuf node for the current step is loaded here.
pipeline_node = self._get_node_with_step_name(
step_name=step.name, pb2_pipeline=pb2_pipeline
)
# Create the tfx launcher responsible for executing the step.
component_launcher = launcher.Launcher(
pipeline_node=pipeline_node,
mlmd_connection=metadata_connection,
pipeline_info=pb2_pipeline.pipeline_info,
pipeline_runtime_spec=pb2_pipeline.runtime_spec,
executor_spec=executor_spec,
custom_driver_spec=custom_driver_spec,
custom_executor_operators=custom_executor_operators,
)
# In some stack configurations, some stack components (like experiment
# trackers) will run some code before and after the actual step run.
# This is where the step actually gets executed using the
# component_launcher
repo.active_stack.prepare_step_run()
execution_info = self._execute_step(component_launcher)
repo.active_stack.cleanup_step_run()
return execution_info
context_utils
add_context_to_node(pipeline_node, type_, name, properties)
Add a new context to a TFX protobuf pipeline node.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_node |
pipeline_pb2.PipelineNode |
A tfx protobuf pipeline node |
required |
type_ |
str |
The type name for the context to be added |
required |
name |
str |
Unique key for the context |
required |
properties |
Dict[str, str] |
dictionary of strings as properties of the context |
required |
Source code in zenml/orchestrators/context_utils.py
def add_context_to_node(
pipeline_node: "pipeline_pb2.PipelineNode",
type_: str,
name: str,
properties: Dict[str, str],
) -> None:
"""
Add a new context to a TFX protobuf pipeline node.
Args:
pipeline_node: A tfx protobuf pipeline node
type_: The type name for the context to be added
name: Unique key for the context
properties: dictionary of strings as properties of the context
"""
# Add a new context to the pipeline
context: "pipeline_pb2.ContextSpec" = pipeline_node.contexts.contexts.add()
# Adding the type of context
context.type.name = type_
# Setting the name of the context
context.name.field_value.string_value = name
# Setting the properties of the context depending on attribute type
for key, value in properties.items():
c_property = context.properties[key]
c_property.field_value.string_value = value
add_runtime_configuration_to_node(pipeline_node, runtime_config)
Add the runtime configuration of a pipeline run to a protobuf pipeline node.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_node |
pipeline_pb2.PipelineNode |
a tfx protobuf pipeline node |
required |
runtime_config |
RuntimeConfiguration |
a ZenML RuntimeConfiguration |
required |
Source code in zenml/orchestrators/context_utils.py
def add_runtime_configuration_to_node(
pipeline_node: "pipeline_pb2.PipelineNode",
runtime_config: RuntimeConfiguration,
) -> None:
"""
Add the runtime configuration of a pipeline run to a protobuf pipeline node.
Args:
pipeline_node: a tfx protobuf pipeline node
runtime_config: a ZenML RuntimeConfiguration
"""
skip_errors: bool = runtime_config.get(
"ignore_unserializable_fields", False
)
# Determine the name of the context
def _name(obj: "BaseModel") -> str:
"""Compute a unique context name for a pydantic BaseModel."""
try:
return str(hash(obj.json(sort_keys=True)))
except TypeError as e:
class_name = obj.__class__.__name__
logging.info(
"Cannot convert %s to json, generating uuid instead. Error: %s",
class_name,
e,
)
return f"{class_name}_{uuid.uuid1()}"
# iterate over all attributes of runtime context, serializing all pydantic
# objects to node context.
for key, obj in runtime_config.items():
if isinstance(obj, BaseModel):
logger.debug("Adding %s to context", key)
add_context_to_node(
pipeline_node,
type_=obj.__repr_name__().lower(),
name=_name(obj),
properties=serialize_pydantic_object(
obj, skip_errors=skip_errors
),
)
serialize_pydantic_object(obj, *, skip_errors=False)
Convert a pydantic object to a dict of strings
Source code in zenml/orchestrators/context_utils.py
def serialize_pydantic_object(
obj: BaseModel, *, skip_errors: bool = False
) -> Dict[str, str]:
"""Convert a pydantic object to a dict of strings"""
class PydanticEncoder(json.JSONEncoder):
def default(self, o: Any) -> Any:
try:
return cast(Callable[[Any], str], obj.__json_encoder__)(o)
except TypeError:
return super().default(o)
def _inner_generator(
dictionary: Dict[str, Any]
) -> Iterator[Tuple[str, str]]:
"""Itemwise serialize each element in a dictionary."""
for key, item in dictionary.items():
try:
yield key, json.dumps(item, cls=PydanticEncoder)
except TypeError as e:
if skip_errors:
logging.info(
"Skipping adding field '%s' to metadata context as "
"it cannot be serialized due to %s.",
key,
e,
)
else:
raise TypeError(
f"Invalid type {type(item)} for key {key} can not be "
"serialized."
) from e
return {key: value for key, value in _inner_generator(obj.dict())}
local
special
local_orchestrator
LocalOrchestrator (BaseOrchestrator)
pydantic-model
Orchestrator responsible for running pipelines locally. This orchestrator does not allow for concurrent execution of steps and also does not support running on a schedule.
Source code in zenml/orchestrators/local/local_orchestrator.py
class LocalOrchestrator(BaseOrchestrator):
"""Orchestrator responsible for running pipelines locally. This orchestrator
does not allow for concurrent execution of steps and also does not support
running on a schedule."""
FLAVOR: ClassVar[str] = "local"
def prepare_or_run_pipeline(
self,
sorted_steps: List[BaseStep],
pipeline: "BasePipeline",
pb2_pipeline: Pb2Pipeline,
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> Any:
"""This method iterates through all steps and executes them sequentially."""
if runtime_configuration.schedule:
logger.warning(
"Local Orchestrator currently does not support the"
"use of schedules. The `schedule` will be ignored "
"and the pipeline will be run immediately."
)
assert runtime_configuration.run_name, "Run name must be set"
# Run each step
for step in sorted_steps:
self.run_step(
step=step,
run_name=runtime_configuration.run_name,
pb2_pipeline=pb2_pipeline,
)
prepare_or_run_pipeline(self, sorted_steps, pipeline, pb2_pipeline, stack, runtime_configuration)
This method iterates through all steps and executes them sequentially.
Source code in zenml/orchestrators/local/local_orchestrator.py
def prepare_or_run_pipeline(
self,
sorted_steps: List[BaseStep],
pipeline: "BasePipeline",
pb2_pipeline: Pb2Pipeline,
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> Any:
"""This method iterates through all steps and executes them sequentially."""
if runtime_configuration.schedule:
logger.warning(
"Local Orchestrator currently does not support the"
"use of schedules. The `schedule` will be ignored "
"and the pipeline will be run immediately."
)
assert runtime_configuration.run_name, "Run name must be set"
# Run each step
for step in sorted_steps:
self.run_step(
step=step,
run_name=runtime_configuration.run_name,
pb2_pipeline=pb2_pipeline,
)
utils
create_tfx_pipeline(zenml_pipeline, stack)
Creates a tfx pipeline from a ZenML pipeline.
Source code in zenml/orchestrators/utils.py
def create_tfx_pipeline(
zenml_pipeline: "BasePipeline", stack: "Stack"
) -> tfx_pipeline.Pipeline:
"""Creates a tfx pipeline from a ZenML pipeline."""
# Connect the inputs/outputs of all steps in the pipeline
zenml_pipeline.connect(**zenml_pipeline.steps)
tfx_components = [step.component for step in zenml_pipeline.steps.values()]
artifact_store = stack.artifact_store
metadata_store = stack.metadata_store
return tfx_pipeline.Pipeline(
pipeline_name=zenml_pipeline.name,
components=tfx_components, # type: ignore[arg-type]
pipeline_root=artifact_store.path,
metadata_connection_config=metadata_store.get_tfx_metadata_config(),
enable_cache=zenml_pipeline.enable_cache,
)
get_cache_status(execution_info)
Returns the caching status of a step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
execution_info |
ExecutionInfo |
The execution info of a |
required |
Exceptions:
Type | Description |
---|---|
AttributeError |
If the execution info is |
KeyError |
If no pipeline info is found in the |
Returns:
Type | Description |
---|---|
bool |
The caching status of a |
Source code in zenml/orchestrators/utils.py
def get_cache_status(
execution_info: data_types.ExecutionInfo,
) -> bool:
"""Returns the caching status of a step.
Args:
execution_info: The execution info of a `tfx` step.
Raises:
AttributeError: If the execution info is `None`.
KeyError: If no pipeline info is found in the `execution_info`.
Returns:
The caching status of a `tfx` step as a boolean value.
"""
if execution_info is None:
logger.warning("No execution info found when checking cache status.")
return False
status = False
repository = Repository()
# TODO [ENG-706]: Get the current running stack instead of just the active
# stack
active_stack = repository.active_stack
if not active_stack:
raise RuntimeError(
"No active stack is configured for the repository. Run "
"`zenml stack set STACK_NAME` to update the active stack."
)
metadata_store = active_stack.metadata_store
step_name_param = (
INTERNAL_EXECUTION_PARAMETER_PREFIX + PARAM_PIPELINE_PARAMETER_NAME
)
step_name = json.loads(execution_info.exec_properties[step_name_param])
if execution_info.pipeline_info:
pipeline_name = execution_info.pipeline_info.id
else:
raise KeyError(f"No pipeline info found for step `{step_name}`.")
pipeline_run_name = cast(str, execution_info.pipeline_run_id)
pipeline = metadata_store.get_pipeline(pipeline_name)
if pipeline is None:
logger.error(f"Pipeline {pipeline_name} not found in Metadata Store.")
else:
status = (
pipeline.get_run(pipeline_run_name).get_step(step_name).is_cached
)
return status
get_step_for_node(node, steps)
Finds the matching step for a tfx pipeline node.
Source code in zenml/orchestrators/utils.py
def get_step_for_node(node: PipelineNode, steps: List[BaseStep]) -> BaseStep:
"""Finds the matching step for a tfx pipeline node."""
step_name = node.node_info.id
try:
return next(step for step in steps if step.name == step_name)
except StopIteration:
raise RuntimeError(f"Unable to find step with name '{step_name}'.")