Skip to content

Step Operators

zenml.step_operators special

Step operators allow you to run steps on custom infrastructure.

While an orchestrator defines how and where your entire pipeline runs, a step operator defines how and where an individual step runs. This can be useful in a variety of scenarios. An example could be if one step within a pipeline should run on a separate environment equipped with a GPU (like a trainer step).

base_step_operator

Base class for ZenML step operators.

BaseStepOperator (StackComponent, ABC)

Base class for all ZenML step operators.

Source code in zenml/step_operators/base_step_operator.py
class BaseStepOperator(StackComponent, ABC):
    """Base class for all ZenML step operators."""

    @property
    def config(self) -> BaseStepOperatorConfig:
        """Returns the config of the step operator.

        Returns:
            The config of the step operator.
        """
        return cast(BaseStepOperatorConfig, self._config)

    @property
    def entrypoint_config_class(
        self,
    ) -> Type[StepOperatorEntrypointConfiguration]:
        """Returns the entrypoint configuration class for this step operator.

        Concrete step operator implementations may override this property
        to return a custom entrypoint configuration class if they need to
        customize the entrypoint configuration.

        Returns:
            The entrypoint configuration class for this step operator.
        """
        return StepOperatorEntrypointConfiguration

    @abstractmethod
    def launch(
        self,
        info: "StepRunInfo",
        entrypoint_command: List[str],
        environment: Dict[str, str],
    ) -> None:
        """Abstract method to execute a step.

        Subclasses must implement this method and launch a **synchronous**
        job that executes the `entrypoint_command`.

        Args:
            info: Information about the step run.
            entrypoint_command: Command that executes the step.
            environment: Environment variables to set in the step operator
                environment.
        """
config: BaseStepOperatorConfig property readonly

Returns the config of the step operator.

Returns:

Type Description
BaseStepOperatorConfig

The config of the step operator.

entrypoint_config_class: Type[zenml.step_operators.step_operator_entrypoint_configuration.StepOperatorEntrypointConfiguration] property readonly

Returns the entrypoint configuration class for this step operator.

Concrete step operator implementations may override this property to return a custom entrypoint configuration class if they need to customize the entrypoint configuration.

Returns:

Type Description
Type[zenml.step_operators.step_operator_entrypoint_configuration.StepOperatorEntrypointConfiguration]

The entrypoint configuration class for this step operator.

launch(self, info, entrypoint_command, environment)

Abstract method to execute a step.

Subclasses must implement this method and launch a synchronous job that executes the entrypoint_command.

Parameters:

Name Type Description Default
info StepRunInfo

Information about the step run.

required
entrypoint_command List[str]

Command that executes the step.

required
environment Dict[str, str]

Environment variables to set in the step operator environment.

required
Source code in zenml/step_operators/base_step_operator.py
@abstractmethod
def launch(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
    environment: Dict[str, str],
) -> None:
    """Abstract method to execute a step.

    Subclasses must implement this method and launch a **synchronous**
    job that executes the `entrypoint_command`.

    Args:
        info: Information about the step run.
        entrypoint_command: Command that executes the step.
        environment: Environment variables to set in the step operator
            environment.
    """

BaseStepOperatorConfig (StackComponentConfig) pydantic-model

Base config for step operators.

Source code in zenml/step_operators/base_step_operator.py
class BaseStepOperatorConfig(StackComponentConfig):
    """Base config for step operators."""

BaseStepOperatorFlavor (Flavor)

Base class for all ZenML step operator flavors.

Source code in zenml/step_operators/base_step_operator.py
class BaseStepOperatorFlavor(Flavor):
    """Base class for all ZenML step operator flavors."""

    @property
    def type(self) -> StackComponentType:
        """Returns the flavor type.

        Returns:
            The type of the flavor.
        """
        return StackComponentType.STEP_OPERATOR

    @property
    def config_class(self) -> Type[BaseStepOperatorConfig]:
        """Returns the config class for this flavor.

        Returns:
            The config class for this flavor.
        """
        return BaseStepOperatorConfig

    @property
    @abstractmethod
    def implementation_class(self) -> Type[BaseStepOperator]:
        """Returns the implementation class for this flavor.

        Returns:
            The implementation class for this flavor.
        """
config_class: Type[zenml.step_operators.base_step_operator.BaseStepOperatorConfig] property readonly

Returns the config class for this flavor.

Returns:

Type Description
Type[zenml.step_operators.base_step_operator.BaseStepOperatorConfig]

The config class for this flavor.

implementation_class: Type[zenml.step_operators.base_step_operator.BaseStepOperator] property readonly

Returns the implementation class for this flavor.

Returns:

Type Description
Type[zenml.step_operators.base_step_operator.BaseStepOperator]

The implementation class for this flavor.

type: StackComponentType property readonly

Returns the flavor type.

Returns:

Type Description
StackComponentType

The type of the flavor.

step_operator_entrypoint_configuration

Abstract base class for entrypoint configurations that run a single step.

StepOperatorEntrypointConfiguration (StepEntrypointConfiguration)

Base class for step operator entrypoint configurations.

Source code in zenml/step_operators/step_operator_entrypoint_configuration.py
class StepOperatorEntrypointConfiguration(StepEntrypointConfiguration):
    """Base class for step operator entrypoint configurations."""

    @classmethod
    def get_entrypoint_options(cls) -> Set[str]:
        """Gets all options required for running with this configuration.

        Returns:
            The superclass options as well as an option for the step run id.
        """
        return super().get_entrypoint_options() | {
            STEP_RUN_ID_OPTION,
        }

    @classmethod
    def get_entrypoint_arguments(
        cls,
        **kwargs: Any,
    ) -> List[str]:
        """Gets all arguments that the entrypoint command should be called with.

        Args:
            **kwargs: Kwargs, must include the step run id.

        Returns:
            The superclass arguments as well as arguments for the step run id.
        """
        return super().get_entrypoint_arguments(**kwargs) + [
            f"--{STEP_RUN_ID_OPTION}",
            kwargs[STEP_RUN_ID_OPTION],
        ]

    def _run_step(
        self,
        step: "Step",
        deployment: "PipelineDeploymentResponse",
    ) -> None:
        """Runs a single step.

        Args:
            step: The step to run.
            deployment: The deployment configuration.
        """
        step_run_id = UUID(self.entrypoint_args[STEP_RUN_ID_OPTION])
        step_run = Client().zen_store.get_run_step(step_run_id)
        pipeline_run = Client().get_pipeline_run(step_run.pipeline_run_id)

        step_run_info = StepRunInfo(
            config=step.config,
            pipeline=deployment.pipeline_configuration,
            run_name=pipeline_run.name,
            pipeline_step_name=self.entrypoint_args[STEP_NAME_OPTION],
            run_id=pipeline_run.id,
            step_run_id=step_run_id,
        )

        stack = Client().active_stack
        input_artifacts, _ = input_utils.resolve_step_inputs(
            step=step, run_id=pipeline_run.id
        )
        output_artifact_uris = output_utils.prepare_output_artifact_uris(
            step_run=step_run, stack=stack, step=step
        )

        step_runner = StepRunner(step=step, stack=stack)
        step_runner.run(
            pipeline_run=pipeline_run,
            step_run=step_run,
            input_artifacts=input_artifacts,
            output_artifact_uris=output_artifact_uris,
            step_run_info=step_run_info,
        )
get_entrypoint_arguments(**kwargs) classmethod

Gets all arguments that the entrypoint command should be called with.

Parameters:

Name Type Description Default
**kwargs Any

Kwargs, must include the step run id.

{}

Returns:

Type Description
List[str]

The superclass arguments as well as arguments for the step run id.

Source code in zenml/step_operators/step_operator_entrypoint_configuration.py
@classmethod
def get_entrypoint_arguments(
    cls,
    **kwargs: Any,
) -> List[str]:
    """Gets all arguments that the entrypoint command should be called with.

    Args:
        **kwargs: Kwargs, must include the step run id.

    Returns:
        The superclass arguments as well as arguments for the step run id.
    """
    return super().get_entrypoint_arguments(**kwargs) + [
        f"--{STEP_RUN_ID_OPTION}",
        kwargs[STEP_RUN_ID_OPTION],
    ]
get_entrypoint_options() classmethod

Gets all options required for running with this configuration.

Returns:

Type Description
Set[str]

The superclass options as well as an option for the step run id.

Source code in zenml/step_operators/step_operator_entrypoint_configuration.py
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
    """Gets all options required for running with this configuration.

    Returns:
        The superclass options as well as an option for the step run id.
    """
    return super().get_entrypoint_options() | {
        STEP_RUN_ID_OPTION,
    }