Step Operators
zenml.step_operators
special
Step operators allow you to run steps on custom infrastructure.
While an orchestrator defines how and where your entire pipeline runs, a step operator defines how and where an individual step runs. This can be useful in a variety of scenarios. An example could be if one step within a pipeline should run on a separate environment equipped with a GPU (like a trainer step).
base_step_operator
Base class for ZenML step operators.
BaseStepOperator (StackComponent, ABC)
Base class for all ZenML step operators.
Source code in zenml/step_operators/base_step_operator.py
class BaseStepOperator(StackComponent, ABC):
"""Base class for all ZenML step operators."""
@property
def config(self) -> BaseStepOperatorConfig:
"""Returns the config of the step operator.
Returns:
The config of the step operator.
"""
return cast(BaseStepOperatorConfig, self._config)
@property
def entrypoint_config_class(
self,
) -> Type[StepOperatorEntrypointConfiguration]:
"""Returns the entrypoint configuration class for this step operator.
Concrete step operator implementations may override this property
to return a custom entrypoint configuration class if they need to
customize the entrypoint configuration.
Returns:
The entrypoint configuration class for this step operator.
"""
return StepOperatorEntrypointConfiguration
@abstractmethod
def launch(
self,
info: "StepRunInfo",
entrypoint_command: List[str],
environment: Dict[str, str],
) -> None:
"""Abstract method to execute a step.
Subclasses must implement this method and launch a **synchronous**
job that executes the `entrypoint_command`.
Args:
info: Information about the step run.
entrypoint_command: Command that executes the step.
environment: Environment variables to set in the step operator
environment.
"""
config: BaseStepOperatorConfig
property
readonly
Returns the config of the step operator.
Returns:
Type | Description |
---|---|
BaseStepOperatorConfig |
The config of the step operator. |
entrypoint_config_class: Type[zenml.step_operators.step_operator_entrypoint_configuration.StepOperatorEntrypointConfiguration]
property
readonly
Returns the entrypoint configuration class for this step operator.
Concrete step operator implementations may override this property to return a custom entrypoint configuration class if they need to customize the entrypoint configuration.
Returns:
Type | Description |
---|---|
Type[zenml.step_operators.step_operator_entrypoint_configuration.StepOperatorEntrypointConfiguration] |
The entrypoint configuration class for this step operator. |
launch(self, info, entrypoint_command, environment)
Abstract method to execute a step.
Subclasses must implement this method and launch a synchronous
job that executes the entrypoint_command
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
info |
StepRunInfo |
Information about the step run. |
required |
entrypoint_command |
List[str] |
Command that executes the step. |
required |
environment |
Dict[str, str] |
Environment variables to set in the step operator environment. |
required |
Source code in zenml/step_operators/base_step_operator.py
@abstractmethod
def launch(
self,
info: "StepRunInfo",
entrypoint_command: List[str],
environment: Dict[str, str],
) -> None:
"""Abstract method to execute a step.
Subclasses must implement this method and launch a **synchronous**
job that executes the `entrypoint_command`.
Args:
info: Information about the step run.
entrypoint_command: Command that executes the step.
environment: Environment variables to set in the step operator
environment.
"""
BaseStepOperatorConfig (StackComponentConfig)
Base config for step operators.
Source code in zenml/step_operators/base_step_operator.py
class BaseStepOperatorConfig(StackComponentConfig):
"""Base config for step operators."""
BaseStepOperatorFlavor (Flavor)
Base class for all ZenML step operator flavors.
Source code in zenml/step_operators/base_step_operator.py
class BaseStepOperatorFlavor(Flavor):
"""Base class for all ZenML step operator flavors."""
@property
def type(self) -> StackComponentType:
"""Returns the flavor type.
Returns:
The type of the flavor.
"""
return StackComponentType.STEP_OPERATOR
@property
def config_class(self) -> Type[BaseStepOperatorConfig]:
"""Returns the config class for this flavor.
Returns:
The config class for this flavor.
"""
return BaseStepOperatorConfig
@property
@abstractmethod
def implementation_class(self) -> Type[BaseStepOperator]:
"""Returns the implementation class for this flavor.
Returns:
The implementation class for this flavor.
"""
config_class: Type[zenml.step_operators.base_step_operator.BaseStepOperatorConfig]
property
readonly
Returns the config class for this flavor.
Returns:
Type | Description |
---|---|
Type[zenml.step_operators.base_step_operator.BaseStepOperatorConfig] |
The config class for this flavor. |
implementation_class: Type[zenml.step_operators.base_step_operator.BaseStepOperator]
property
readonly
Returns the implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[zenml.step_operators.base_step_operator.BaseStepOperator] |
The implementation class for this flavor. |
type: StackComponentType
property
readonly
Returns the flavor type.
Returns:
Type | Description |
---|---|
StackComponentType |
The type of the flavor. |
step_operator_entrypoint_configuration
Abstract base class for entrypoint configurations that run a single step.
StepOperatorEntrypointConfiguration (StepEntrypointConfiguration)
Base class for step operator entrypoint configurations.
Source code in zenml/step_operators/step_operator_entrypoint_configuration.py
class StepOperatorEntrypointConfiguration(StepEntrypointConfiguration):
"""Base class for step operator entrypoint configurations."""
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
"""Gets all options required for running with this configuration.
Returns:
The superclass options as well as an option for the step run id.
"""
return super().get_entrypoint_options() | {
STEP_RUN_ID_OPTION,
}
@classmethod
def get_entrypoint_arguments(
cls,
**kwargs: Any,
) -> List[str]:
"""Gets all arguments that the entrypoint command should be called with.
Args:
**kwargs: Kwargs, must include the step run id.
Returns:
The superclass arguments as well as arguments for the step run id.
"""
return super().get_entrypoint_arguments(**kwargs) + [
f"--{STEP_RUN_ID_OPTION}",
kwargs[STEP_RUN_ID_OPTION],
]
def _run_step(
self,
step: "Step",
deployment: "PipelineDeploymentResponse",
) -> None:
"""Runs a single step.
Args:
step: The step to run.
deployment: The deployment configuration.
"""
step_run_id = UUID(self.entrypoint_args[STEP_RUN_ID_OPTION])
step_run = Client().zen_store.get_run_step(step_run_id)
pipeline_run = Client().get_pipeline_run(step_run.pipeline_run_id)
step_run_info = StepRunInfo(
config=step.config,
pipeline=deployment.pipeline_configuration,
run_name=pipeline_run.name,
pipeline_step_name=self.entrypoint_args[STEP_NAME_OPTION],
run_id=pipeline_run.id,
step_run_id=step_run_id,
force_write_logs=lambda: None,
)
stack = Client().active_stack
input_artifacts, _ = input_utils.resolve_step_inputs(
step=step, run_id=pipeline_run.id
)
output_artifact_uris = output_utils.prepare_output_artifact_uris(
step_run=step_run, stack=stack, step=step
)
step_runner = StepRunner(step=step, stack=stack)
step_runner.run(
pipeline_run=pipeline_run,
step_run=step_run,
input_artifacts=input_artifacts,
output_artifact_uris=output_artifact_uris,
step_run_info=step_run_info,
)
get_entrypoint_arguments(**kwargs)
classmethod
Gets all arguments that the entrypoint command should be called with.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs |
Any |
Kwargs, must include the step run id. |
{} |
Returns:
Type | Description |
---|---|
List[str] |
The superclass arguments as well as arguments for the step run id. |
Source code in zenml/step_operators/step_operator_entrypoint_configuration.py
@classmethod
def get_entrypoint_arguments(
cls,
**kwargs: Any,
) -> List[str]:
"""Gets all arguments that the entrypoint command should be called with.
Args:
**kwargs: Kwargs, must include the step run id.
Returns:
The superclass arguments as well as arguments for the step run id.
"""
return super().get_entrypoint_arguments(**kwargs) + [
f"--{STEP_RUN_ID_OPTION}",
kwargs[STEP_RUN_ID_OPTION],
]
get_entrypoint_options()
classmethod
Gets all options required for running with this configuration.
Returns:
Type | Description |
---|---|
Set[str] |
The superclass options as well as an option for the step run id. |
Source code in zenml/step_operators/step_operator_entrypoint_configuration.py
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
"""Gets all options required for running with this configuration.
Returns:
The superclass options as well as an option for the step run id.
"""
return super().get_entrypoint_options() | {
STEP_RUN_ID_OPTION,
}