Step Operators
zenml.step_operators
special
Step operators allow you to run steps on custom infrastructure.
While an orchestrator defines how and where your entire pipeline runs, a step operator defines how and where an individual step runs. This can be useful in a variety of scenarios. An example could be if one step within a pipeline should run on a separate environment equipped with a GPU (like a trainer step).
base_step_operator
Base class for ZenML step operators.
BaseStepOperator (StackComponent, ABC)
Base class for all ZenML step operators.
Source code in zenml/step_operators/base_step_operator.py
class BaseStepOperator(StackComponent, ABC):
"""Base class for all ZenML step operators."""
@property
def config(self) -> BaseStepOperatorConfig:
"""Returns the config of the step operator.
Returns:
The config of the step operator.
"""
return cast(BaseStepOperatorConfig, self._config)
@abstractmethod
def launch(
self,
info: "StepRunInfo",
entrypoint_command: List[str],
environment: Dict[str, str],
) -> None:
"""Abstract method to execute a step.
Subclasses must implement this method and launch a **synchronous**
job that executes the `entrypoint_command`.
Args:
info: Information about the step run.
entrypoint_command: Command that executes the step.
environment: Environment variables to set in the step operator
environment.
"""
config: BaseStepOperatorConfig
property
readonly
Returns the config of the step operator.
Returns:
Type | Description |
---|---|
BaseStepOperatorConfig |
The config of the step operator. |
launch(self, info, entrypoint_command, environment)
Abstract method to execute a step.
Subclasses must implement this method and launch a synchronous
job that executes the entrypoint_command
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
info |
StepRunInfo |
Information about the step run. |
required |
entrypoint_command |
List[str] |
Command that executes the step. |
required |
environment |
Dict[str, str] |
Environment variables to set in the step operator environment. |
required |
Source code in zenml/step_operators/base_step_operator.py
@abstractmethod
def launch(
self,
info: "StepRunInfo",
entrypoint_command: List[str],
environment: Dict[str, str],
) -> None:
"""Abstract method to execute a step.
Subclasses must implement this method and launch a **synchronous**
job that executes the `entrypoint_command`.
Args:
info: Information about the step run.
entrypoint_command: Command that executes the step.
environment: Environment variables to set in the step operator
environment.
"""
BaseStepOperatorConfig (StackComponentConfig)
pydantic-model
Base config for step operators.
Source code in zenml/step_operators/base_step_operator.py
class BaseStepOperatorConfig(StackComponentConfig):
"""Base config for step operators."""
@root_validator(pre=True)
def _deprecations(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and/or remove deprecated fields.
Args:
values: The values to validate.
Returns:
The validated values.
"""
if "base_image" in values:
image_name = values.pop("base_image", None)
if image_name:
logger.warning(
"The 'base_image' field has been deprecated. To use a "
"custom base container image with your "
"step operators, please use the DockerSettings in your "
"pipeline (see https://docs.zenml.io/user-guide/advanced-guide/containerize-your-pipeline)."
)
return values
BaseStepOperatorFlavor (Flavor)
Base class for all ZenML step operator flavors.
Source code in zenml/step_operators/base_step_operator.py
class BaseStepOperatorFlavor(Flavor):
"""Base class for all ZenML step operator flavors."""
@property
def type(self) -> StackComponentType:
"""Returns the flavor type.
Returns:
The type of the flavor.
"""
return StackComponentType.STEP_OPERATOR
@property
def config_class(self) -> Type[BaseStepOperatorConfig]:
"""Returns the config class for this flavor.
Returns:
The config class for this flavor.
"""
return BaseStepOperatorConfig
@property
@abstractmethod
def implementation_class(self) -> Type[BaseStepOperator]:
"""Returns the implementation class for this flavor.
Returns:
The implementation class for this flavor.
"""
config_class: Type[zenml.step_operators.base_step_operator.BaseStepOperatorConfig]
property
readonly
Returns the config class for this flavor.
Returns:
Type | Description |
---|---|
Type[zenml.step_operators.base_step_operator.BaseStepOperatorConfig] |
The config class for this flavor. |
implementation_class: Type[zenml.step_operators.base_step_operator.BaseStepOperator]
property
readonly
Returns the implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[zenml.step_operators.base_step_operator.BaseStepOperator] |
The implementation class for this flavor. |
type: StackComponentType
property
readonly
Returns the flavor type.
Returns:
Type | Description |
---|---|
StackComponentType |
The type of the flavor. |
step_operator_entrypoint_configuration
Abstract base class for entrypoint configurations that run a single step.
StepOperatorEntrypointConfiguration (StepEntrypointConfiguration)
Base class for step operator entrypoint configurations.
Source code in zenml/step_operators/step_operator_entrypoint_configuration.py
class StepOperatorEntrypointConfiguration(StepEntrypointConfiguration):
"""Base class for step operator entrypoint configurations."""
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
"""Gets all options required for running with this configuration.
Returns:
The superclass options as well as an option for the step run id.
"""
return super().get_entrypoint_options() | {
STEP_RUN_ID_OPTION,
}
@classmethod
def get_entrypoint_arguments(
cls,
**kwargs: Any,
) -> List[str]:
"""Gets all arguments that the entrypoint command should be called with.
Args:
**kwargs: Kwargs, must include the step run id.
Returns:
The superclass arguments as well as arguments for the step run id.
"""
return super().get_entrypoint_arguments(**kwargs) + [
f"--{STEP_RUN_ID_OPTION}",
kwargs[STEP_RUN_ID_OPTION],
]
def _run_step(
self,
step: "Step",
deployment: "PipelineDeploymentBaseModel",
) -> None:
"""Runs a single step.
Args:
step: The step to run.
deployment: The deployment configuration.
"""
step_run_id = UUID(self.entrypoint_args[STEP_RUN_ID_OPTION])
step_run = Client().zen_store.get_run_step(step_run_id)
pipeline_run = Client().get_pipeline_run(step_run.pipeline_run_id)
step_run_info = StepRunInfo(
config=step.config,
pipeline=deployment.pipeline_configuration,
run_name=pipeline_run.name,
pipeline_step_name=self.entrypoint_args[STEP_NAME_OPTION],
run_id=pipeline_run.id,
step_run_id=step_run_id,
)
stack = Client().active_stack
input_artifacts, _ = input_utils.resolve_step_inputs(
step=step, run_id=pipeline_run.id
)
output_artifact_uris = output_utils.prepare_output_artifact_uris(
step_run=step_run, stack=stack, step=step
)
step_runner = StepRunner(step=step, stack=stack)
step_runner.run(
input_artifacts=input_artifacts,
output_artifact_uris=output_artifact_uris,
step_run_info=step_run_info,
)
get_entrypoint_arguments(**kwargs)
classmethod
Gets all arguments that the entrypoint command should be called with.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs |
Any |
Kwargs, must include the step run id. |
{} |
Returns:
Type | Description |
---|---|
List[str] |
The superclass arguments as well as arguments for the step run id. |
Source code in zenml/step_operators/step_operator_entrypoint_configuration.py
@classmethod
def get_entrypoint_arguments(
cls,
**kwargs: Any,
) -> List[str]:
"""Gets all arguments that the entrypoint command should be called with.
Args:
**kwargs: Kwargs, must include the step run id.
Returns:
The superclass arguments as well as arguments for the step run id.
"""
return super().get_entrypoint_arguments(**kwargs) + [
f"--{STEP_RUN_ID_OPTION}",
kwargs[STEP_RUN_ID_OPTION],
]
get_entrypoint_options()
classmethod
Gets all options required for running with this configuration.
Returns:
Type | Description |
---|---|
Set[str] |
The superclass options as well as an option for the step run id. |
Source code in zenml/step_operators/step_operator_entrypoint_configuration.py
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
"""Gets all options required for running with this configuration.
Returns:
The superclass options as well as an option for the step run id.
"""
return super().get_entrypoint_options() | {
STEP_RUN_ID_OPTION,
}