Step Operators
        zenml.step_operators
  
      special
  
    Step operators allow you to run steps on custom infrastructure.
While an orchestrator defines how and where your entire pipeline runs, a step operator defines how and where an individual step runs. This can be useful in a variety of scenarios. An example could be if one step within a pipeline should run on a separate environment equipped with a GPU (like a trainer step).
        base_step_operator
    Base class for ZenML step operators.
        
BaseStepOperator            (StackComponent, ABC)
        
    Base class for all ZenML step operators.
Source code in zenml/step_operators/base_step_operator.py
          class BaseStepOperator(StackComponent, ABC):
    """Base class for all ZenML step operators."""
    @property
    def config(self) -> BaseStepOperatorConfig:
        """Returns the config of the step operator.
        Returns:
            The config of the step operator.
        """
        return cast(BaseStepOperatorConfig, self._config)
    @property
    def entrypoint_config_class(
        self,
    ) -> Type[StepOperatorEntrypointConfiguration]:
        """Returns the entrypoint configuration class for this step operator.
        Concrete step operator implementations may override this property
        to return a custom entrypoint configuration class if they need to
        customize the entrypoint configuration.
        Returns:
            The entrypoint configuration class for this step operator.
        """
        return StepOperatorEntrypointConfiguration
    @abstractmethod
    def launch(
        self,
        info: "StepRunInfo",
        entrypoint_command: List[str],
        environment: Dict[str, str],
    ) -> None:
        """Abstract method to execute a step.
        Subclasses must implement this method and launch a **synchronous**
        job that executes the `entrypoint_command`.
        Args:
            info: Information about the step run.
            entrypoint_command: Command that executes the step.
            environment: Environment variables to set in the step operator
                environment.
        """
config: BaseStepOperatorConfig
  
      property
      readonly
  
    Returns the config of the step operator.
Returns:
| Type | Description | 
|---|---|
| BaseStepOperatorConfig | The config of the step operator. | 
entrypoint_config_class: Type[zenml.step_operators.step_operator_entrypoint_configuration.StepOperatorEntrypointConfiguration]
  
      property
      readonly
  
    Returns the entrypoint configuration class for this step operator.
Concrete step operator implementations may override this property to return a custom entrypoint configuration class if they need to customize the entrypoint configuration.
Returns:
| Type | Description | 
|---|---|
| Type[zenml.step_operators.step_operator_entrypoint_configuration.StepOperatorEntrypointConfiguration] | The entrypoint configuration class for this step operator. | 
launch(self, info, entrypoint_command, environment)
    Abstract method to execute a step.
Subclasses must implement this method and launch a synchronous
job that executes the entrypoint_command.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| info | StepRunInfo | Information about the step run. | required | 
| entrypoint_command | List[str] | Command that executes the step. | required | 
| environment | Dict[str, str] | Environment variables to set in the step operator environment. | required | 
Source code in zenml/step_operators/base_step_operator.py
          @abstractmethod
def launch(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
    environment: Dict[str, str],
) -> None:
    """Abstract method to execute a step.
    Subclasses must implement this method and launch a **synchronous**
    job that executes the `entrypoint_command`.
    Args:
        info: Information about the step run.
        entrypoint_command: Command that executes the step.
        environment: Environment variables to set in the step operator
            environment.
    """
        
BaseStepOperatorConfig            (StackComponentConfig)
        
    Base config for step operators.
Source code in zenml/step_operators/base_step_operator.py
          class BaseStepOperatorConfig(StackComponentConfig):
    """Base config for step operators."""
        
BaseStepOperatorFlavor            (Flavor)
        
    Base class for all ZenML step operator flavors.
Source code in zenml/step_operators/base_step_operator.py
          class BaseStepOperatorFlavor(Flavor):
    """Base class for all ZenML step operator flavors."""
    @property
    def type(self) -> StackComponentType:
        """Returns the flavor type.
        Returns:
            The type of the flavor.
        """
        return StackComponentType.STEP_OPERATOR
    @property
    def config_class(self) -> Type[BaseStepOperatorConfig]:
        """Returns the config class for this flavor.
        Returns:
            The config class for this flavor.
        """
        return BaseStepOperatorConfig
    @property
    @abstractmethod
    def implementation_class(self) -> Type[BaseStepOperator]:
        """Returns the implementation class for this flavor.
        Returns:
            The implementation class for this flavor.
        """
config_class: Type[zenml.step_operators.base_step_operator.BaseStepOperatorConfig]
  
      property
      readonly
  
    Returns the config class for this flavor.
Returns:
| Type | Description | 
|---|---|
| Type[zenml.step_operators.base_step_operator.BaseStepOperatorConfig] | The config class for this flavor. | 
implementation_class: Type[zenml.step_operators.base_step_operator.BaseStepOperator]
  
      property
      readonly
  
    Returns the implementation class for this flavor.
Returns:
| Type | Description | 
|---|---|
| Type[zenml.step_operators.base_step_operator.BaseStepOperator] | The implementation class for this flavor. | 
type: StackComponentType
  
      property
      readonly
  
    Returns the flavor type.
Returns:
| Type | Description | 
|---|---|
| StackComponentType | The type of the flavor. | 
        step_operator_entrypoint_configuration
    Abstract base class for entrypoint configurations that run a single step.
        
StepOperatorEntrypointConfiguration            (StepEntrypointConfiguration)
        
    Base class for step operator entrypoint configurations.
Source code in zenml/step_operators/step_operator_entrypoint_configuration.py
          class StepOperatorEntrypointConfiguration(StepEntrypointConfiguration):
    """Base class for step operator entrypoint configurations."""
    @classmethod
    def get_entrypoint_options(cls) -> Set[str]:
        """Gets all options required for running with this configuration.
        Returns:
            The superclass options as well as an option for the step run id.
        """
        return super().get_entrypoint_options() | {
            STEP_RUN_ID_OPTION,
        }
    @classmethod
    def get_entrypoint_arguments(
        cls,
        **kwargs: Any,
    ) -> List[str]:
        """Gets all arguments that the entrypoint command should be called with.
        Args:
            **kwargs: Kwargs, must include the step run id.
        Returns:
            The superclass arguments as well as arguments for the step run id.
        """
        return super().get_entrypoint_arguments(**kwargs) + [
            f"--{STEP_RUN_ID_OPTION}",
            kwargs[STEP_RUN_ID_OPTION],
        ]
    def _run_step(
        self,
        step: "Step",
        deployment: "PipelineDeploymentResponse",
    ) -> None:
        """Runs a single step.
        Args:
            step: The step to run.
            deployment: The deployment configuration.
        """
        step_run_id = UUID(self.entrypoint_args[STEP_RUN_ID_OPTION])
        step_run = Client().zen_store.get_run_step(step_run_id)
        pipeline_run = Client().get_pipeline_run(step_run.pipeline_run_id)
        step_run_info = StepRunInfo(
            config=step.config,
            pipeline=deployment.pipeline_configuration,
            run_name=pipeline_run.name,
            pipeline_step_name=self.entrypoint_args[STEP_NAME_OPTION],
            run_id=pipeline_run.id,
            step_run_id=step_run_id,
            force_write_logs=lambda: None,
        )
        stack = Client().active_stack
        input_artifacts, _ = input_utils.resolve_step_inputs(
            step=step, run_id=pipeline_run.id
        )
        output_artifact_uris = output_utils.prepare_output_artifact_uris(
            step_run=step_run, stack=stack, step=step
        )
        step_runner = StepRunner(step=step, stack=stack)
        step_runner.run(
            pipeline_run=pipeline_run,
            step_run=step_run,
            input_artifacts=input_artifacts,
            output_artifact_uris=output_artifact_uris,
            step_run_info=step_run_info,
        )
get_entrypoint_arguments(**kwargs)
  
      classmethod
  
    Gets all arguments that the entrypoint command should be called with.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| **kwargs | Any | Kwargs, must include the step run id. | {} | 
Returns:
| Type | Description | 
|---|---|
| List[str] | The superclass arguments as well as arguments for the step run id. | 
Source code in zenml/step_operators/step_operator_entrypoint_configuration.py
          @classmethod
def get_entrypoint_arguments(
    cls,
    **kwargs: Any,
) -> List[str]:
    """Gets all arguments that the entrypoint command should be called with.
    Args:
        **kwargs: Kwargs, must include the step run id.
    Returns:
        The superclass arguments as well as arguments for the step run id.
    """
    return super().get_entrypoint_arguments(**kwargs) + [
        f"--{STEP_RUN_ID_OPTION}",
        kwargs[STEP_RUN_ID_OPTION],
    ]
get_entrypoint_options()
  
      classmethod
  
    Gets all options required for running with this configuration.
Returns:
| Type | Description | 
|---|---|
| Set[str] | The superclass options as well as an option for the step run id. | 
Source code in zenml/step_operators/step_operator_entrypoint_configuration.py
          @classmethod
def get_entrypoint_options(cls) -> Set[str]:
    """Gets all options required for running with this configuration.
    Returns:
        The superclass options as well as an option for the step run id.
    """
    return super().get_entrypoint_options() | {
        STEP_RUN_ID_OPTION,
    }