Skip to content

Step Operators

zenml.step_operators

Step operators allow you to run steps on custom infrastructure.

While an orchestrator defines how and where your entire pipeline runs, a step operator defines how and where an individual step runs. This can be useful in a variety of scenarios. An example could be if one step within a pipeline should run on a separate environment equipped with a GPU (like a trainer step).

Attributes

__all__ = ['BaseStepOperator', 'BaseStepOperatorFlavor', 'BaseStepOperatorConfig'] module-attribute

Classes

BaseStepOperator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: StackComponent, ABC

Base class for all ZenML step operators.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: BaseStepOperatorConfig property

Returns the config of the step operator.

Returns:

Type Description
BaseStepOperatorConfig

The config of the step operator.

entrypoint_config_class: Type[StepOperatorEntrypointConfiguration] property

Returns the entrypoint configuration class for this step operator.

Concrete step operator implementations may override this property to return a custom entrypoint configuration class if they need to customize the entrypoint configuration.

Returns:

Type Description
Type[StepOperatorEntrypointConfiguration]

The entrypoint configuration class for this step operator.

Functions
launch(info: StepRunInfo, entrypoint_command: List[str], environment: Dict[str, str]) -> None abstractmethod

Abstract method to execute a step.

Subclasses must implement this method and launch a synchronous job that executes the entrypoint_command.

Parameters:

Name Type Description Default
info StepRunInfo

Information about the step run.

required
entrypoint_command List[str]

Command that executes the step.

required
environment Dict[str, str]

Environment variables to set in the step operator environment.

required
Source code in src/zenml/step_operators/base_step_operator.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@abstractmethod
def launch(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
    environment: Dict[str, str],
) -> None:
    """Abstract method to execute a step.

    Subclasses must implement this method and launch a **synchronous**
    job that executes the `entrypoint_command`.

    Args:
        info: Information about the step run.
        entrypoint_command: Command that executes the step.
        environment: Environment variables to set in the step operator
            environment.
    """

BaseStepOperatorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: StackComponentConfig

Base config for step operators.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)

BaseStepOperatorFlavor

Bases: Flavor

Base class for all ZenML step operator flavors.

Attributes
config_class: Type[BaseStepOperatorConfig] property

Returns the config class for this flavor.

Returns:

Type Description
Type[BaseStepOperatorConfig]

The config class for this flavor.

implementation_class: Type[BaseStepOperator] abstractmethod property

Returns the implementation class for this flavor.

Returns:

Type Description
Type[BaseStepOperator]

The implementation class for this flavor.

type: StackComponentType property

Returns the flavor type.

Returns:

Type Description
StackComponentType

The type of the flavor.

Modules

base_step_operator

Base class for ZenML step operators.

Classes
BaseStepOperator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: StackComponent, ABC

Base class for all ZenML step operators.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: BaseStepOperatorConfig property

Returns the config of the step operator.

Returns:

Type Description
BaseStepOperatorConfig

The config of the step operator.

entrypoint_config_class: Type[StepOperatorEntrypointConfiguration] property

Returns the entrypoint configuration class for this step operator.

Concrete step operator implementations may override this property to return a custom entrypoint configuration class if they need to customize the entrypoint configuration.

Returns:

Type Description
Type[StepOperatorEntrypointConfiguration]

The entrypoint configuration class for this step operator.

Functions
launch(info: StepRunInfo, entrypoint_command: List[str], environment: Dict[str, str]) -> None abstractmethod

Abstract method to execute a step.

Subclasses must implement this method and launch a synchronous job that executes the entrypoint_command.

Parameters:

Name Type Description Default
info StepRunInfo

Information about the step run.

required
entrypoint_command List[str]

Command that executes the step.

required
environment Dict[str, str]

Environment variables to set in the step operator environment.

required
Source code in src/zenml/step_operators/base_step_operator.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@abstractmethod
def launch(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
    environment: Dict[str, str],
) -> None:
    """Abstract method to execute a step.

    Subclasses must implement this method and launch a **synchronous**
    job that executes the `entrypoint_command`.

    Args:
        info: Information about the step run.
        entrypoint_command: Command that executes the step.
        environment: Environment variables to set in the step operator
            environment.
    """
BaseStepOperatorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: StackComponentConfig

Base config for step operators.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
BaseStepOperatorFlavor

Bases: Flavor

Base class for all ZenML step operator flavors.

Attributes
config_class: Type[BaseStepOperatorConfig] property

Returns the config class for this flavor.

Returns:

Type Description
Type[BaseStepOperatorConfig]

The config class for this flavor.

implementation_class: Type[BaseStepOperator] abstractmethod property

Returns the implementation class for this flavor.

Returns:

Type Description
Type[BaseStepOperator]

The implementation class for this flavor.

type: StackComponentType property

Returns the flavor type.

Returns:

Type Description
StackComponentType

The type of the flavor.

Functions

step_operator_entrypoint_configuration

Abstract base class for entrypoint configurations that run a single step.

Classes
StepOperatorEntrypointConfiguration(arguments: List[str])

Bases: StepEntrypointConfiguration

Base class for step operator entrypoint configurations.

Source code in src/zenml/entrypoints/base_entrypoint_configuration.py
60
61
62
63
64
65
66
def __init__(self, arguments: List[str]):
    """Initializes the entrypoint configuration.

    Args:
        arguments: Command line arguments to configure this object.
    """
    self.entrypoint_args = self._parse_arguments(arguments)
Functions
get_entrypoint_arguments(**kwargs: Any) -> List[str] classmethod

Gets all arguments that the entrypoint command should be called with.

Parameters:

Name Type Description Default
**kwargs Any

Kwargs, must include the step run id.

{}

Returns:

Type Description
List[str]

The superclass arguments as well as arguments for the step run id.

Source code in src/zenml/step_operators/step_operator_entrypoint_configuration.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@classmethod
def get_entrypoint_arguments(
    cls,
    **kwargs: Any,
) -> List[str]:
    """Gets all arguments that the entrypoint command should be called with.

    Args:
        **kwargs: Kwargs, must include the step run id.

    Returns:
        The superclass arguments as well as arguments for the step run id.
    """
    return super().get_entrypoint_arguments(**kwargs) + [
        f"--{STEP_RUN_ID_OPTION}",
        kwargs[STEP_RUN_ID_OPTION],
    ]
get_entrypoint_options() -> Set[str] classmethod

Gets all options required for running with this configuration.

Returns:

Type Description
Set[str]

The superclass options as well as an option for the step run id.

Source code in src/zenml/step_operators/step_operator_entrypoint_configuration.py
38
39
40
41
42
43
44
45
46
47
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
    """Gets all options required for running with this configuration.

    Returns:
        The superclass options as well as an option for the step run id.
    """
    return super().get_entrypoint_options() | {
        STEP_RUN_ID_OPTION,
    }
Modules