Skip to content

Steps

zenml.steps

Initializer for ZenML steps.

A step is a single piece or stage of a ZenML pipeline. Think of each step as being one of the nodes of a Directed Acyclic Graph (or DAG). Steps are responsible for one aspect of processing or interacting with the data / artifacts in the pipeline.

Conceptually, a Step is a discrete and independent part of a pipeline that is responsible for one particular aspect of data manipulation inside a ZenML pipeline.

Steps can be subclassed from the BaseStep class, or used via our @step decorator.

Attributes

__all__ = ['BaseStep', 'ResourceSettings', 'StepContext', 'step', 'get_step_context'] module-attribute

Classes

BaseStep(name: Optional[str] = None, enable_cache: Optional[bool] = None, enable_artifact_metadata: Optional[bool] = None, enable_artifact_visualization: Optional[bool] = None, enable_step_logs: Optional[bool] = None, experiment_tracker: Optional[str] = None, step_operator: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, output_materializers: Optional[OutputMaterializersSpecification] = None, settings: Optional[Mapping[str, SettingsOrDict]] = None, extra: Optional[Dict[str, Any]] = None, on_failure: Optional[HookSpecification] = None, on_success: Optional[HookSpecification] = None, model: Optional[Model] = None, retry: Optional[StepRetryConfig] = None, substitutions: Optional[Dict[str, str]] = None)

Abstract base class for all ZenML steps.

Initializes a step.

Parameters:

Name Type Description Default
name Optional[str]

The name of the step.

None
enable_cache Optional[bool]

If caching should be enabled for this step.

None
enable_artifact_metadata Optional[bool]

If artifact metadata should be enabled for this step.

None
enable_artifact_visualization Optional[bool]

If artifact visualization should be enabled for this step.

None
enable_step_logs Optional[bool]

Enable step logs for this step.

None
experiment_tracker Optional[str]

The experiment tracker to use for this step.

None
step_operator Optional[str]

The step operator to use for this step.

None
parameters Optional[Dict[str, Any]]

Function parameters for this step

None
output_materializers Optional[OutputMaterializersSpecification]

Output materializers for this step. If given as a dict, the keys must be a subset of the output names of this step. If a single value (type or string) is given, the materializer will be used for all outputs.

None
settings Optional[Mapping[str, SettingsOrDict]]

settings for this step.

None
extra Optional[Dict[str, Any]]

Extra configurations for this step.

None
on_failure Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with a single argument of type BaseException, or a source path to such a function (e.g. module.my_function).

None
on_success Optional[HookSpecification]

Callback function in event of success of the step. Can be a function with no arguments, or a source path to such a function (e.g. module.my_function).

None
model Optional[Model]

configuration of the model version in the Model Control Plane.

None
retry Optional[StepRetryConfig]

Configuration for retrying the step in case of failure.

None
substitutions Optional[Dict[str, str]]

Extra placeholders to use in the name template.

None
Source code in src/zenml/steps/base_step.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def __init__(
    self,
    name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    experiment_tracker: Optional[str] = None,
    step_operator: Optional[str] = None,
    parameters: Optional[Dict[str, Any]] = None,
    output_materializers: Optional[
        "OutputMaterializersSpecification"
    ] = None,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
    model: Optional["Model"] = None,
    retry: Optional[StepRetryConfig] = None,
    substitutions: Optional[Dict[str, str]] = None,
) -> None:
    """Initializes a step.

    Args:
        name: The name of the step.
        enable_cache: If caching should be enabled for this step.
        enable_artifact_metadata: If artifact metadata should be enabled
            for this step.
        enable_artifact_visualization: If artifact visualization should be
            enabled for this step.
        enable_step_logs: Enable step logs for this step.
        experiment_tracker: The experiment tracker to use for this step.
        step_operator: The step operator to use for this step.
        parameters: Function parameters for this step
        output_materializers: Output materializers for this step. If
            given as a dict, the keys must be a subset of the output names
            of this step. If a single value (type or string) is given, the
            materializer will be used for all outputs.
        settings: settings for this step.
        extra: Extra configurations for this step.
        on_failure: Callback function in event of failure of the step. Can
            be a function with a single argument of type `BaseException`, or
            a source path to such a function (e.g. `module.my_function`).
        on_success: Callback function in event of success of the step. Can
            be a function with no arguments, or a source path to such a
            function (e.g. `module.my_function`).
        model: configuration of the model version in the Model Control Plane.
        retry: Configuration for retrying the step in case of failure.
        substitutions: Extra placeholders to use in the name template.
    """
    from zenml.config.step_configurations import PartialStepConfiguration

    self.entrypoint_definition = validate_entrypoint_function(
        self.entrypoint,
        reserved_arguments=["after", "id"],
    )

    name = name or self.__class__.__name__

    logger.debug(
        "Step `%s`: Caching %s.",
        name,
        "enabled" if enable_cache is not False else "disabled",
    )
    logger.debug(
        "Step `%s`: Artifact metadata %s.",
        name,
        "enabled" if enable_artifact_metadata is not False else "disabled",
    )
    logger.debug(
        "Step `%s`: Artifact visualization %s.",
        name,
        "enabled"
        if enable_artifact_visualization is not False
        else "disabled",
    )
    logger.debug(
        "Step `%s`: logs %s.",
        name,
        "enabled" if enable_step_logs is not False else "disabled",
    )
    if model is not None:
        logger.debug(
            "Step `%s`: Is in Model context %s.",
            name,
            {
                "model": model.name,
                "version": model.version,
            },
        )

    self._configuration = PartialStepConfiguration(
        name=name,
        enable_cache=enable_cache,
        enable_artifact_metadata=enable_artifact_metadata,
        enable_artifact_visualization=enable_artifact_visualization,
        enable_step_logs=enable_step_logs,
    )
    self.configure(
        experiment_tracker=experiment_tracker,
        step_operator=step_operator,
        output_materializers=output_materializers,
        parameters=parameters,
        settings=settings,
        extra=extra,
        on_failure=on_failure,
        on_success=on_success,
        model=model,
        retry=retry,
        substitutions=substitutions,
    )

    notebook_utils.try_to_save_notebook_cell_code(self.source_object)
Attributes
caching_parameters: Dict[str, Any] property

Caching parameters for this step.

Returns:

Type Description
Dict[str, Any]

A dictionary containing the caching parameters

configuration: PartialStepConfiguration property

The configuration of the step.

Returns:

Type Description
PartialStepConfiguration

The configuration of the step.

docstring: Optional[str] property

The docstring of this step.

Returns:

Type Description
Optional[str]

The docstring of this step.

enable_cache: Optional[bool] property

If caching is enabled for the step.

Returns:

Type Description
Optional[bool]

If caching is enabled for the step.

name: str property

The name of the step.

Returns:

Type Description
str

The name of the step.

source_code: str property

The source code of this step.

Returns:

Type Description
str

The source code of this step.

source_object: Any property

The source object of this step.

Returns:

Type Description
Any

The source object of this step.

Functions
call_entrypoint(*args: Any, **kwargs: Any) -> Any

Calls the entrypoint function of the step.

Parameters:

Name Type Description Default
*args Any

Entrypoint function arguments.

()
**kwargs Any

Entrypoint function keyword arguments.

{}

Returns:

Type Description
Any

The return value of the entrypoint function.

Raises:

Type Description
StepInterfaceError

If the arguments to the entrypoint function are invalid.

Source code in src/zenml/steps/base_step.py
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
def call_entrypoint(self, *args: Any, **kwargs: Any) -> Any:
    """Calls the entrypoint function of the step.

    Args:
        *args: Entrypoint function arguments.
        **kwargs: Entrypoint function keyword arguments.

    Returns:
        The return value of the entrypoint function.

    Raises:
        StepInterfaceError: If the arguments to the entrypoint function are
            invalid.
    """
    try:
        validated_args = pydantic_utils.validate_function_args(
            self.entrypoint,
            ConfigDict(arbitrary_types_allowed=True),
            *args,
            **kwargs,
        )
    except ValidationError as e:
        raise StepInterfaceError(
            "Invalid step function entrypoint arguments. Check out the "
            "pydantic error above for more details."
        ) from e

    return self.entrypoint(**validated_args)
configure(enable_cache: Optional[bool] = None, enable_artifact_metadata: Optional[bool] = None, enable_artifact_visualization: Optional[bool] = None, enable_step_logs: Optional[bool] = None, experiment_tracker: Optional[str] = None, step_operator: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, output_materializers: Optional[OutputMaterializersSpecification] = None, settings: Optional[Mapping[str, SettingsOrDict]] = None, extra: Optional[Dict[str, Any]] = None, on_failure: Optional[HookSpecification] = None, on_success: Optional[HookSpecification] = None, model: Optional[Model] = None, retry: Optional[StepRetryConfig] = None, substitutions: Optional[Dict[str, str]] = None, merge: bool = True) -> T

Configures the step.

Configuration merging example: * merge==True: step.configure(extra={"key1": 1}) step.configure(extra={"key2": 2}, merge=True) step.configuration.extra # {"key1": 1, "key2": 2} * merge==False: step.configure(extra={"key1": 1}) step.configure(extra={"key2": 2}, merge=False) step.configuration.extra # {"key2": 2}

Parameters:

Name Type Description Default
enable_cache Optional[bool]

If caching should be enabled for this step.

None
enable_artifact_metadata Optional[bool]

If artifact metadata should be enabled for this step.

None
enable_artifact_visualization Optional[bool]

If artifact visualization should be enabled for this step.

None
enable_step_logs Optional[bool]

If step logs should be enabled for this step.

None
experiment_tracker Optional[str]

The experiment tracker to use for this step.

None
step_operator Optional[str]

The step operator to use for this step.

None
parameters Optional[Dict[str, Any]]

Function parameters for this step

None
output_materializers Optional[OutputMaterializersSpecification]

Output materializers for this step. If given as a dict, the keys must be a subset of the output names of this step. If a single value (type or string) is given, the materializer will be used for all outputs.

None
settings Optional[Mapping[str, SettingsOrDict]]

settings for this step.

None
extra Optional[Dict[str, Any]]

Extra configurations for this step.

None
on_failure Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with a single argument of type BaseException, or a source path to such a function (e.g. module.my_function).

None
on_success Optional[HookSpecification]

Callback function in event of success of the step. Can be a function with no arguments, or a source path to such a function (e.g. module.my_function).

None
model Optional[Model]

Model to use for this step.

None
retry Optional[StepRetryConfig]

Configuration for retrying the step in case of failure.

None
substitutions Optional[Dict[str, str]]

Extra placeholders to use in the name template.

None
merge bool

If True, will merge the given dictionary configurations like parameters and settings with existing configurations. If False the given configurations will overwrite all existing ones. See the general description of this method for an example.

True

Returns:

Type Description
T

The step instance that this method was called on.

Source code in src/zenml/steps/base_step.py
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
def configure(
    self: T,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    experiment_tracker: Optional[str] = None,
    step_operator: Optional[str] = None,
    parameters: Optional[Dict[str, Any]] = None,
    output_materializers: Optional[
        "OutputMaterializersSpecification"
    ] = None,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
    model: Optional["Model"] = None,
    retry: Optional[StepRetryConfig] = None,
    substitutions: Optional[Dict[str, str]] = None,
    merge: bool = True,
) -> T:
    """Configures the step.

    Configuration merging example:
    * `merge==True`:
        step.configure(extra={"key1": 1})
        step.configure(extra={"key2": 2}, merge=True)
        step.configuration.extra # {"key1": 1, "key2": 2}
    * `merge==False`:
        step.configure(extra={"key1": 1})
        step.configure(extra={"key2": 2}, merge=False)
        step.configuration.extra # {"key2": 2}

    Args:
        enable_cache: If caching should be enabled for this step.
        enable_artifact_metadata: If artifact metadata should be enabled
            for this step.
        enable_artifact_visualization: If artifact visualization should be
            enabled for this step.
        enable_step_logs: If step logs should be enabled for this step.
        experiment_tracker: The experiment tracker to use for this step.
        step_operator: The step operator to use for this step.
        parameters: Function parameters for this step
        output_materializers: Output materializers for this step. If
            given as a dict, the keys must be a subset of the output names
            of this step. If a single value (type or string) is given, the
            materializer will be used for all outputs.
        settings: settings for this step.
        extra: Extra configurations for this step.
        on_failure: Callback function in event of failure of the step. Can
            be a function with a single argument of type `BaseException`, or
            a source path to such a function (e.g. `module.my_function`).
        on_success: Callback function in event of success of the step. Can
            be a function with no arguments, or a source path to such a
            function (e.g. `module.my_function`).
        model: Model to use for this step.
        retry: Configuration for retrying the step in case of failure.
        substitutions: Extra placeholders to use in the name template.
        merge: If `True`, will merge the given dictionary configurations
            like `parameters` and `settings` with existing
            configurations. If `False` the given configurations will
            overwrite all existing ones. See the general description of this
            method for an example.

    Returns:
        The step instance that this method was called on.
    """
    from zenml.config.step_configurations import StepConfigurationUpdate
    from zenml.hooks.hook_validators import resolve_and_validate_hook

    def _resolve_if_necessary(
        value: Union[str, Source, Type[Any]],
    ) -> Source:
        if isinstance(value, str):
            return Source.from_import_path(value)
        elif isinstance(value, Source):
            return value
        else:
            return source_utils.resolve(value)

    def _convert_to_tuple(value: Any) -> Tuple[Source, ...]:
        if isinstance(value, str) or not isinstance(value, Sequence):
            return (_resolve_if_necessary(value),)
        else:
            return tuple(_resolve_if_necessary(v) for v in value)

    outputs: Dict[str, Dict[str, Tuple[Source, ...]]] = defaultdict(dict)
    allowed_output_names = set(self.entrypoint_definition.outputs)

    if output_materializers:
        if not isinstance(output_materializers, Mapping):
            sources = _convert_to_tuple(output_materializers)
            output_materializers = {
                output_name: sources
                for output_name in allowed_output_names
            }

        for output_name, materializer in output_materializers.items():
            sources = _convert_to_tuple(materializer)
            outputs[output_name]["materializer_source"] = sources

    failure_hook_source = None
    if on_failure:
        # string of on_failure hook function to be used for this step
        failure_hook_source = resolve_and_validate_hook(on_failure)

    success_hook_source = None
    if on_success:
        # string of on_success hook function to be used for this step
        success_hook_source = resolve_and_validate_hook(on_success)

    values = dict_utils.remove_none_values(
        {
            "enable_cache": enable_cache,
            "enable_artifact_metadata": enable_artifact_metadata,
            "enable_artifact_visualization": enable_artifact_visualization,
            "enable_step_logs": enable_step_logs,
            "experiment_tracker": experiment_tracker,
            "step_operator": step_operator,
            "parameters": parameters,
            "settings": settings,
            "outputs": outputs or None,
            "extra": extra,
            "failure_hook_source": failure_hook_source,
            "success_hook_source": success_hook_source,
            "model": model,
            "retry": retry,
            "substitutions": substitutions,
        }
    )
    config = StepConfigurationUpdate(**values)
    self._apply_configuration(config, merge=merge)
    return self
copy() -> BaseStep

Copies the step.

Returns:

Type Description
BaseStep

The step copy.

Source code in src/zenml/steps/base_step.py
803
804
805
806
807
808
809
def copy(self) -> "BaseStep":
    """Copies the step.

    Returns:
        The step copy.
    """
    return copy.deepcopy(self)
entrypoint(*args: Any, **kwargs: Any) -> Any abstractmethod

Abstract method for core step logic.

Parameters:

Name Type Description Default
*args Any

Positional arguments passed to the step.

()
**kwargs Any

Keyword arguments passed to the step.

{}

Returns:

Type Description
Any

The output of the step.

Source code in src/zenml/steps/base_step.py
214
215
216
217
218
219
220
221
222
223
224
@abstractmethod
def entrypoint(self, *args: Any, **kwargs: Any) -> Any:
    """Abstract method for core step logic.

    Args:
        *args: Positional arguments passed to the step.
        **kwargs: Keyword arguments passed to the step.

    Returns:
        The output of the step.
    """
load_from_source(source: Union[Source, str]) -> BaseStep classmethod

Loads a step from source.

Parameters:

Name Type Description Default
source Union[Source, str]

The path to the step source.

required

Returns:

Type Description
BaseStep

The loaded step.

Raises:

Type Description
ValueError

If the source is not a valid step source.

Source code in src/zenml/steps/base_step.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
@classmethod
def load_from_source(cls, source: Union[Source, str]) -> "BaseStep":
    """Loads a step from source.

    Args:
        source: The path to the step source.

    Returns:
        The loaded step.

    Raises:
        ValueError: If the source is not a valid step source.
    """
    obj = source_utils.load(source)

    if isinstance(obj, BaseStep):
        return obj
    elif isinstance(obj, type) and issubclass(obj, BaseStep):
        return obj()
    else:
        raise ValueError("Invalid step source.")
resolve() -> Source

Resolves the step.

Returns:

Type Description
Source

The step source.

Source code in src/zenml/steps/base_step.py
248
249
250
251
252
253
254
def resolve(self) -> Source:
    """Resolves the step.

    Returns:
        The step source.
    """
    return source_utils.resolve(self.__class__)
with_options(enable_cache: Optional[bool] = None, enable_artifact_metadata: Optional[bool] = None, enable_artifact_visualization: Optional[bool] = None, enable_step_logs: Optional[bool] = None, experiment_tracker: Optional[str] = None, step_operator: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, output_materializers: Optional[OutputMaterializersSpecification] = None, settings: Optional[Mapping[str, SettingsOrDict]] = None, extra: Optional[Dict[str, Any]] = None, on_failure: Optional[HookSpecification] = None, on_success: Optional[HookSpecification] = None, model: Optional[Model] = None, retry: Optional[StepRetryConfig] = None, substitutions: Optional[Dict[str, str]] = None, merge: bool = True) -> BaseStep

Copies the step and applies the given configurations.

Parameters:

Name Type Description Default
enable_cache Optional[bool]

If caching should be enabled for this step.

None
enable_artifact_metadata Optional[bool]

If artifact metadata should be enabled for this step.

None
enable_artifact_visualization Optional[bool]

If artifact visualization should be enabled for this step.

None
enable_step_logs Optional[bool]

If step logs should be enabled for this step.

None
experiment_tracker Optional[str]

The experiment tracker to use for this step.

None
step_operator Optional[str]

The step operator to use for this step.

None
parameters Optional[Dict[str, Any]]

Function parameters for this step

None
output_materializers Optional[OutputMaterializersSpecification]

Output materializers for this step. If given as a dict, the keys must be a subset of the output names of this step. If a single value (type or string) is given, the materializer will be used for all outputs.

None
settings Optional[Mapping[str, SettingsOrDict]]

settings for this step.

None
extra Optional[Dict[str, Any]]

Extra configurations for this step.

None
on_failure Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with a single argument of type BaseException, or a source path to such a function (e.g. module.my_function).

None
on_success Optional[HookSpecification]

Callback function in event of success of the step. Can be a function with no arguments, or a source path to such a function (e.g. module.my_function).

None
model Optional[Model]

Model to use for this step.

None
retry Optional[StepRetryConfig]

Configuration for retrying the step in case of failure.

None
substitutions Optional[Dict[str, str]]

Extra placeholders for the step name.

None
merge bool

If True, will merge the given dictionary configurations like parameters and settings with existing configurations. If False the given configurations will overwrite all existing ones. See the general description of this method for an example.

True

Returns:

Type Description
BaseStep

The copied step instance.

Source code in src/zenml/steps/base_step.py
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
def with_options(
    self,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    experiment_tracker: Optional[str] = None,
    step_operator: Optional[str] = None,
    parameters: Optional[Dict[str, Any]] = None,
    output_materializers: Optional[
        "OutputMaterializersSpecification"
    ] = None,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
    model: Optional["Model"] = None,
    retry: Optional[StepRetryConfig] = None,
    substitutions: Optional[Dict[str, str]] = None,
    merge: bool = True,
) -> "BaseStep":
    """Copies the step and applies the given configurations.

    Args:
        enable_cache: If caching should be enabled for this step.
        enable_artifact_metadata: If artifact metadata should be enabled
            for this step.
        enable_artifact_visualization: If artifact visualization should be
            enabled for this step.
        enable_step_logs: If step logs should be enabled for this step.
        experiment_tracker: The experiment tracker to use for this step.
        step_operator: The step operator to use for this step.
        parameters: Function parameters for this step
        output_materializers: Output materializers for this step. If
            given as a dict, the keys must be a subset of the output names
            of this step. If a single value (type or string) is given, the
            materializer will be used for all outputs.
        settings: settings for this step.
        extra: Extra configurations for this step.
        on_failure: Callback function in event of failure of the step. Can
            be a function with a single argument of type `BaseException`, or
            a source path to such a function (e.g. `module.my_function`).
        on_success: Callback function in event of success of the step. Can
            be a function with no arguments, or a source path to such a
            function (e.g. `module.my_function`).
        model: Model to use for this step.
        retry: Configuration for retrying the step in case of failure.
        substitutions: Extra placeholders for the step name.
        merge: If `True`, will merge the given dictionary configurations
            like `parameters` and `settings` with existing
            configurations. If `False` the given configurations will
            overwrite all existing ones. See the general description of this
            method for an example.

    Returns:
        The copied step instance.
    """
    step_copy = self.copy()
    step_copy.configure(
        enable_cache=enable_cache,
        enable_artifact_metadata=enable_artifact_metadata,
        enable_artifact_visualization=enable_artifact_visualization,
        enable_step_logs=enable_step_logs,
        experiment_tracker=experiment_tracker,
        step_operator=step_operator,
        parameters=parameters,
        output_materializers=output_materializers,
        settings=settings,
        extra=extra,
        on_failure=on_failure,
        on_success=on_success,
        model=model,
        retry=retry,
        substitutions=substitutions,
        merge=merge,
    )
    return step_copy

ResourceSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Hardware resource settings.

Attributes:

Name Type Description
cpu_count Optional[PositiveFloat]

The amount of CPU cores that should be configured.

gpu_count Optional[NonNegativeInt]

The amount of GPUs that should be configured.

memory Optional[str]

The amount of memory that should be configured.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
Attributes
empty: bool property

Returns if this object is "empty" (=no values configured) or not.

Returns:

Type Description
bool

True if no values were configured, False otherwise.

Functions
get_memory(unit: Union[str, ByteUnit] = ByteUnit.GB) -> Optional[float]

Gets the memory configuration in a specific unit.

Parameters:

Name Type Description Default
unit Union[str, ByteUnit]

The unit to which the memory should be converted.

GB

Raises:

Type Description
ValueError

If the memory string is invalid.

Returns:

Type Description
Optional[float]

The memory configuration converted to the requested unit, or None

Optional[float]

if no memory was configured.

Source code in src/zenml/config/resource_settings.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def get_memory(
    self, unit: Union[str, ByteUnit] = ByteUnit.GB
) -> Optional[float]:
    """Gets the memory configuration in a specific unit.

    Args:
        unit: The unit to which the memory should be converted.

    Raises:
        ValueError: If the memory string is invalid.

    Returns:
        The memory configuration converted to the requested unit, or None
        if no memory was configured.
    """
    if not self.memory:
        return None

    if isinstance(unit, str):
        unit = ByteUnit(unit)

    memory = self.memory
    for memory_unit in ByteUnit:
        if memory.endswith(memory_unit.value):
            memory_value = int(memory[: -len(memory_unit.value)])
            return memory_value * memory_unit.byte_value / unit.byte_value
    else:
        # Should never happen due to the regex validation
        raise ValueError(f"Unable to parse memory unit from '{memory}'.")

StepContext(pipeline_run: PipelineRunResponse, step_run: StepRunResponse, output_materializers: Mapping[str, Sequence[Type[BaseMaterializer]]], output_artifact_uris: Mapping[str, str], output_artifact_configs: Mapping[str, Optional[ArtifactConfig]])

Provides additional context inside a step function.

This singleton class is used to access information about the current run, step run, or its outputs inside a step function.

Usage example:

from zenml.steps import get_step_context

@step
def my_trainer_step() -> Any:
    context = get_step_context()

    # get info about the current pipeline run
    current_pipeline_run = context.pipeline_run

    # get info about the current step run
    current_step_run = context.step_run

    # get info about the future output artifacts of this step
    output_artifact_uri = context.get_output_artifact_uri()

    ...

Initialize the context of the currently running step.

Parameters:

Name Type Description Default
pipeline_run PipelineRunResponse

The model of the current pipeline run.

required
step_run StepRunResponse

The model of the current step run.

required
output_materializers Mapping[str, Sequence[Type[BaseMaterializer]]]

The output materializers of the step that this context is used in.

required
output_artifact_uris Mapping[str, str]

The output artifacts of the step that this context is used in.

required
output_artifact_configs Mapping[str, Optional[ArtifactConfig]]

The outputs' ArtifactConfigs of the step that this context is used in.

required

Raises:

Type Description
StepContextError

If the keys of the output materializers and output artifacts do not match.

Source code in src/zenml/steps/step_context.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def __init__(
    self,
    pipeline_run: "PipelineRunResponse",
    step_run: "StepRunResponse",
    output_materializers: Mapping[str, Sequence[Type["BaseMaterializer"]]],
    output_artifact_uris: Mapping[str, str],
    output_artifact_configs: Mapping[str, Optional["ArtifactConfig"]],
) -> None:
    """Initialize the context of the currently running step.

    Args:
        pipeline_run: The model of the current pipeline run.
        step_run: The model of the current step run.
        output_materializers: The output materializers of the step that
            this context is used in.
        output_artifact_uris: The output artifacts of the step that this
            context is used in.
        output_artifact_configs: The outputs' ArtifactConfigs of the step that this
            context is used in.

    Raises:
        StepContextError: If the keys of the output materializers and
            output artifacts do not match.
    """
    from zenml.client import Client

    try:
        pipeline_run = Client().get_pipeline_run(pipeline_run.id)
    except KeyError:
        pass
    self.pipeline_run = pipeline_run
    try:
        step_run = Client().get_run_step(step_run.id)
    except KeyError:
        pass
    self.step_run = step_run
    self.model_version = (
        step_run.model_version or pipeline_run.model_version
    )

    self.step_name = self.step_run.name

    # set outputs
    if output_materializers.keys() != output_artifact_uris.keys():
        raise StepContextError(
            f"Mismatched keys in output materializers and output artifact "
            f"URIs for step `{self.step_name}`. Output materializer "
            f"keys: {set(output_materializers)}, output artifact URI "
            f"keys: {set(output_artifact_uris)}"
        )
    self._outputs = {
        key: StepContextOutput(
            materializer_classes=output_materializers[key],
            artifact_uri=output_artifact_uris[key],
            artifact_config=output_artifact_configs[key],
        )
        for key in output_materializers.keys()
    }
    self._cleanup_registry = CallbackRegistry()
Attributes
inputs: Dict[str, StepRunInputResponse] property

Returns the input artifacts of the current step.

Returns:

Type Description
Dict[str, StepRunInputResponse]

The input artifacts of the current step.

model: Model property

Returns configured Model.

Order of resolution to search for Model is
  1. Model from the step context
  2. Model from the pipeline context

Returns:

Type Description
Model

The Model object associated with the current step.

Raises:

Type Description
StepContextError

If no Model object was specified for the step or pipeline.

pipeline: PipelineResponse property

Returns the current pipeline.

Returns:

Type Description
PipelineResponse

The current pipeline or None.

Raises:

Type Description
StepContextError

If the pipeline run does not have a pipeline.

Functions
add_output_metadata(metadata: Dict[str, MetadataType], output_name: Optional[str] = None) -> None

Adds metadata for a given step output.

Parameters:

Name Type Description Default
metadata Dict[str, MetadataType]

The metadata to add.

required
output_name Optional[str]

Optional name of the output for which to add the metadata. If no name is given and the step only has a single output, the metadata of this output will be added. If the step has multiple outputs, an exception will be raised.

None
Source code in src/zenml/steps/step_context.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
def add_output_metadata(
    self,
    metadata: Dict[str, "MetadataType"],
    output_name: Optional[str] = None,
) -> None:
    """Adds metadata for a given step output.

    Args:
        metadata: The metadata to add.
        output_name: Optional name of the output for which to add the
            metadata. If no name is given and the step only has a single
            output, the metadata of this output will be added. If the
            step has multiple outputs, an exception will be raised.
    """
    output = self._get_output(output_name)
    if not output.run_metadata:
        output.run_metadata = {}
    output.run_metadata.update(**metadata)
add_output_tags(tags: List[str], output_name: Optional[str] = None) -> None

Adds tags for a given step output.

Parameters:

Name Type Description Default
tags List[str]

The tags to add.

required
output_name Optional[str]

Optional name of the output for which to add the tags. If no name is given and the step only has a single output, the tags of this output will be added. If the step has multiple outputs, an exception will be raised.

None
Source code in src/zenml/steps/step_context.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def add_output_tags(
    self,
    tags: List[str],
    output_name: Optional[str] = None,
) -> None:
    """Adds tags for a given step output.

    Args:
        tags: The tags to add.
        output_name: Optional name of the output for which to add the
            tags. If no name is given and the step only has a single
            output, the tags of this output will be added. If the
            step has multiple outputs, an exception will be raised.
    """
    output = self._get_output(output_name)
    if not output.tags:
        output.tags = []
    output.tags += tags
get_output_artifact_uri(output_name: Optional[str] = None) -> str

Returns the artifact URI for a given step output.

Parameters:

Name Type Description Default
output_name Optional[str]

Optional name of the output for which to get the URI. If no name is given and the step only has a single output, the URI of this output will be returned. If the step has multiple outputs, an exception will be raised.

None

Returns:

Type Description
str

Artifact URI for the given output.

Source code in src/zenml/steps/step_context.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def get_output_artifact_uri(
    self, output_name: Optional[str] = None
) -> str:
    """Returns the artifact URI for a given step output.

    Args:
        output_name: Optional name of the output for which to get the URI.
            If no name is given and the step only has a single output,
            the URI of this output will be returned. If the step has
            multiple outputs, an exception will be raised.

    Returns:
        Artifact URI for the given output.
    """
    return self._get_output(output_name).artifact_uri
get_output_materializer(output_name: Optional[str] = None, custom_materializer_class: Optional[Type[BaseMaterializer]] = None, data_type: Optional[Type[Any]] = None) -> BaseMaterializer

Returns a materializer for a given step output.

Parameters:

Name Type Description Default
output_name Optional[str]

Optional name of the output for which to get the materializer. If no name is given and the step only has a single output, the materializer of this output will be returned. If the step has multiple outputs, an exception will be raised.

None
custom_materializer_class Optional[Type[BaseMaterializer]]

If given, this BaseMaterializer subclass will be initialized with the output artifact instead of the materializer that was registered for this step output.

None
data_type Optional[Type[Any]]

If the output annotation is of type Union and the step therefore has multiple materializers configured, you can provide a data type for the output which will be used to select the correct materializer. If not provided, the first materializer will be used.

None

Returns:

Type Description
BaseMaterializer

A materializer initialized with the output artifact for

BaseMaterializer

the given output.

Source code in src/zenml/steps/step_context.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def get_output_materializer(
    self,
    output_name: Optional[str] = None,
    custom_materializer_class: Optional[Type["BaseMaterializer"]] = None,
    data_type: Optional[Type[Any]] = None,
) -> "BaseMaterializer":
    """Returns a materializer for a given step output.

    Args:
        output_name: Optional name of the output for which to get the
            materializer. If no name is given and the step only has a
            single output, the materializer of this output will be
            returned. If the step has multiple outputs, an exception
            will be raised.
        custom_materializer_class: If given, this `BaseMaterializer`
            subclass will be initialized with the output artifact instead
            of the materializer that was registered for this step output.
        data_type: If the output annotation is of type `Union` and the step
            therefore has multiple materializers configured, you can provide
            a data type for the output which will be used to select the
            correct materializer. If not provided, the first materializer
            will be used.

    Returns:
        A materializer initialized with the output artifact for
        the given output.
    """
    from zenml.utils import materializer_utils

    output = self._get_output(output_name)
    materializer_classes = output.materializer_classes
    artifact_uri = output.artifact_uri

    if custom_materializer_class:
        materializer_class = custom_materializer_class
    elif len(materializer_classes) == 1 or not data_type:
        materializer_class = materializer_classes[0]
    else:
        materializer_class = materializer_utils.select_materializer(
            data_type=data_type, materializer_classes=materializer_classes
        )

    return materializer_class(artifact_uri)
get_output_metadata(output_name: Optional[str] = None) -> Dict[str, MetadataType]

Returns the metadata for a given step output.

Parameters:

Name Type Description Default
output_name Optional[str]

Optional name of the output for which to get the metadata. If no name is given and the step only has a single output, the metadata of this output will be returned. If the step has multiple outputs, an exception will be raised.

None

Returns:

Type Description
Dict[str, MetadataType]

Metadata for the given output.

Source code in src/zenml/steps/step_context.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def get_output_metadata(
    self, output_name: Optional[str] = None
) -> Dict[str, "MetadataType"]:
    """Returns the metadata for a given step output.

    Args:
        output_name: Optional name of the output for which to get the
            metadata. If no name is given and the step only has a single
            output, the metadata of this output will be returned. If the
            step has multiple outputs, an exception will be raised.

    Returns:
        Metadata for the given output.
    """
    output = self._get_output(output_name)
    custom_metadata = output.run_metadata or {}
    if output.artifact_config:
        custom_metadata.update(
            **(output.artifact_config.run_metadata or {})
        )
    return custom_metadata
get_output_tags(output_name: Optional[str] = None) -> List[str]

Returns the tags for a given step output.

Parameters:

Name Type Description Default
output_name Optional[str]

Optional name of the output for which to get the metadata. If no name is given and the step only has a single output, the metadata of this output will be returned. If the step has multiple outputs, an exception will be raised.

None

Returns:

Type Description
List[str]

Tags for the given output.

Source code in src/zenml/steps/step_context.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def get_output_tags(self, output_name: Optional[str] = None) -> List[str]:
    """Returns the tags for a given step output.

    Args:
        output_name: Optional name of the output for which to get the
            metadata. If no name is given and the step only has a single
            output, the metadata of this output will be returned. If the
            step has multiple outputs, an exception will be raised.

    Returns:
        Tags for the given output.
    """
    output = self._get_output(output_name)
    custom_tags = set(output.tags or [])
    if output.artifact_config:
        return list(
            set(output.artifact_config.tags or []).union(custom_tags)
        )
    return list(custom_tags)
remove_output_tags(tags: List[str], output_name: Optional[str] = None) -> None

Removes tags for a given step output.

Parameters:

Name Type Description Default
tags List[str]

The tags to remove.

required
output_name Optional[str]

Optional name of the output for which to remove the tags. If no name is given and the step only has a single output, the tags of this output will be removed. If the step has multiple outputs, an exception will be raised.

None
Source code in src/zenml/steps/step_context.py
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
def remove_output_tags(
    self,
    tags: List[str],
    output_name: Optional[str] = None,
) -> None:
    """Removes tags for a given step output.

    Args:
        tags: The tags to remove.
        output_name: Optional name of the output for which to remove the
            tags. If no name is given and the step only has a single
            output, the tags of this output will be removed. If the
            step has multiple outputs, an exception will be raised.
    """
    output = self._get_output(output_name)
    if not output.tags:
        return
    output.tags = [tag for tag in output.tags if tag not in tags]

Functions

get_step_context() -> StepContext

Get the context of the currently running step.

Returns:

Type Description
StepContext

The context of the currently running step.

Raises:

Type Description
RuntimeError

If no step is currently running.

Source code in src/zenml/steps/step_context.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def get_step_context() -> "StepContext":
    """Get the context of the currently running step.

    Returns:
        The context of the currently running step.

    Raises:
        RuntimeError: If no step is currently running.
    """
    if StepContext._exists():
        return StepContext()  # type: ignore
    raise RuntimeError(
        "The step context is only available inside a step function."
    )

step(_func: Optional[F] = None, *, name: Optional[str] = None, enable_cache: Optional[bool] = None, enable_artifact_metadata: Optional[bool] = None, enable_artifact_visualization: Optional[bool] = None, enable_step_logs: Optional[bool] = None, experiment_tracker: Optional[str] = None, step_operator: Optional[str] = None, output_materializers: Optional[OutputMaterializersSpecification] = None, settings: Optional[Dict[str, SettingsOrDict]] = None, extra: Optional[Dict[str, Any]] = None, on_failure: Optional[HookSpecification] = None, on_success: Optional[HookSpecification] = None, model: Optional[Model] = None, retry: Optional[StepRetryConfig] = None, substitutions: Optional[Dict[str, str]] = None) -> Union[BaseStep, Callable[[F], BaseStep]]

step(_func: F) -> BaseStep
step(
    *,
    name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    experiment_tracker: Optional[str] = None,
    step_operator: Optional[str] = None,
    output_materializers: Optional[
        OutputMaterializersSpecification
    ] = None,
    settings: Optional[Dict[str, SettingsOrDict]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional[HookSpecification] = None,
    on_success: Optional[HookSpecification] = None,
    model: Optional[Model] = None,
    retry: Optional[StepRetryConfig] = None,
    substitutions: Optional[Dict[str, str]] = None,
) -> Callable[[F], BaseStep]

Decorator to create a ZenML step.

Parameters:

Name Type Description Default
_func Optional[F]

The decorated function.

None
name Optional[str]

The name of the step. If left empty, the name of the decorated function will be used as a fallback.

None
enable_cache Optional[bool]

Specify whether caching is enabled for this step. If no value is passed, caching is enabled by default.

None
enable_artifact_metadata Optional[bool]

Specify whether metadata is enabled for this step. If no value is passed, metadata is enabled by default.

None
enable_artifact_visualization Optional[bool]

Specify whether visualization is enabled for this step. If no value is passed, visualization is enabled by default.

None
enable_step_logs Optional[bool]

Specify whether step logs are enabled for this step.

None
experiment_tracker Optional[str]

The experiment tracker to use for this step.

None
step_operator Optional[str]

The step operator to use for this step.

None
output_materializers Optional[OutputMaterializersSpecification]

Output materializers for this step. If given as a dict, the keys must be a subset of the output names of this step. If a single value (type or string) is given, the materializer will be used for all outputs.

None
settings Optional[Dict[str, SettingsOrDict]]

Settings for this step.

None
extra Optional[Dict[str, Any]]

Extra configurations for this step.

None
on_failure Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with a single argument of type BaseException, or a source path to such a function (e.g. module.my_function).

None
on_success Optional[HookSpecification]

Callback function in event of success of the step. Can be a function with no arguments, or a source path to such a function (e.g. module.my_function).

None
model Optional[Model]

configuration of the model in the Model Control Plane.

None
retry Optional[StepRetryConfig]

configuration of step retry in case of step failure.

None
substitutions Optional[Dict[str, str]]

Extra placeholders for the step name.

None

Returns:

Type Description
Union[BaseStep, Callable[[F], BaseStep]]

The step instance.

Source code in src/zenml/steps/step_decorator.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def step(
    _func: Optional["F"] = None,
    *,
    name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    experiment_tracker: Optional[str] = None,
    step_operator: Optional[str] = None,
    output_materializers: Optional["OutputMaterializersSpecification"] = None,
    settings: Optional[Dict[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
    model: Optional["Model"] = None,
    retry: Optional["StepRetryConfig"] = None,
    substitutions: Optional[Dict[str, str]] = None,
) -> Union["BaseStep", Callable[["F"], "BaseStep"]]:
    """Decorator to create a ZenML step.

    Args:
        _func: The decorated function.
        name: The name of the step. If left empty, the name of the decorated
            function will be used as a fallback.
        enable_cache: Specify whether caching is enabled for this step. If no
            value is passed, caching is enabled by default.
        enable_artifact_metadata: Specify whether metadata is enabled for this
            step. If no value is passed, metadata is enabled by default.
        enable_artifact_visualization: Specify whether visualization is enabled
            for this step. If no value is passed, visualization is enabled by
            default.
        enable_step_logs: Specify whether step logs are enabled for this step.
        experiment_tracker: The experiment tracker to use for this step.
        step_operator: The step operator to use for this step.
        output_materializers: Output materializers for this step. If
            given as a dict, the keys must be a subset of the output names
            of this step. If a single value (type or string) is given, the
            materializer will be used for all outputs.
        settings: Settings for this step.
        extra: Extra configurations for this step.
        on_failure: Callback function in event of failure of the step. Can be a
            function with a single argument of type `BaseException`, or a source
            path to such a function (e.g. `module.my_function`).
        on_success: Callback function in event of success of the step. Can be a
            function with no arguments, or a source path to such a function
            (e.g. `module.my_function`).
        model: configuration of the model in the Model Control Plane.
        retry: configuration of step retry in case of step failure.
        substitutions: Extra placeholders for the step name.

    Returns:
        The step instance.
    """

    def inner_decorator(func: "F") -> "BaseStep":
        from zenml.steps.decorated_step import _DecoratedStep

        class_: Type["BaseStep"] = type(
            func.__name__,
            (_DecoratedStep,),
            {
                "entrypoint": staticmethod(func),
                "__module__": func.__module__,
                "__doc__": func.__doc__,
            },
        )

        step_instance = class_(
            name=name or func.__name__,
            enable_cache=enable_cache,
            enable_artifact_metadata=enable_artifact_metadata,
            enable_artifact_visualization=enable_artifact_visualization,
            enable_step_logs=enable_step_logs,
            experiment_tracker=experiment_tracker,
            step_operator=step_operator,
            output_materializers=output_materializers,
            settings=settings,
            extra=extra,
            on_failure=on_failure,
            on_success=on_success,
            model=model,
            retry=retry,
            substitutions=substitutions,
        )

        return step_instance

    if _func is None:
        return inner_decorator
    else:
        return inner_decorator(_func)

Modules

base_step

Base Step for ZenML.

Classes
BaseStep(name: Optional[str] = None, enable_cache: Optional[bool] = None, enable_artifact_metadata: Optional[bool] = None, enable_artifact_visualization: Optional[bool] = None, enable_step_logs: Optional[bool] = None, experiment_tracker: Optional[str] = None, step_operator: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, output_materializers: Optional[OutputMaterializersSpecification] = None, settings: Optional[Mapping[str, SettingsOrDict]] = None, extra: Optional[Dict[str, Any]] = None, on_failure: Optional[HookSpecification] = None, on_success: Optional[HookSpecification] = None, model: Optional[Model] = None, retry: Optional[StepRetryConfig] = None, substitutions: Optional[Dict[str, str]] = None)

Abstract base class for all ZenML steps.

Initializes a step.

Parameters:

Name Type Description Default
name Optional[str]

The name of the step.

None
enable_cache Optional[bool]

If caching should be enabled for this step.

None
enable_artifact_metadata Optional[bool]

If artifact metadata should be enabled for this step.

None
enable_artifact_visualization Optional[bool]

If artifact visualization should be enabled for this step.

None
enable_step_logs Optional[bool]

Enable step logs for this step.

None
experiment_tracker Optional[str]

The experiment tracker to use for this step.

None
step_operator Optional[str]

The step operator to use for this step.

None
parameters Optional[Dict[str, Any]]

Function parameters for this step

None
output_materializers Optional[OutputMaterializersSpecification]

Output materializers for this step. If given as a dict, the keys must be a subset of the output names of this step. If a single value (type or string) is given, the materializer will be used for all outputs.

None
settings Optional[Mapping[str, SettingsOrDict]]

settings for this step.

None
extra Optional[Dict[str, Any]]

Extra configurations for this step.

None
on_failure Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with a single argument of type BaseException, or a source path to such a function (e.g. module.my_function).

None
on_success Optional[HookSpecification]

Callback function in event of success of the step. Can be a function with no arguments, or a source path to such a function (e.g. module.my_function).

None
model Optional[Model]

configuration of the model version in the Model Control Plane.

None
retry Optional[StepRetryConfig]

Configuration for retrying the step in case of failure.

None
substitutions Optional[Dict[str, str]]

Extra placeholders to use in the name template.

None
Source code in src/zenml/steps/base_step.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def __init__(
    self,
    name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    experiment_tracker: Optional[str] = None,
    step_operator: Optional[str] = None,
    parameters: Optional[Dict[str, Any]] = None,
    output_materializers: Optional[
        "OutputMaterializersSpecification"
    ] = None,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
    model: Optional["Model"] = None,
    retry: Optional[StepRetryConfig] = None,
    substitutions: Optional[Dict[str, str]] = None,
) -> None:
    """Initializes a step.

    Args:
        name: The name of the step.
        enable_cache: If caching should be enabled for this step.
        enable_artifact_metadata: If artifact metadata should be enabled
            for this step.
        enable_artifact_visualization: If artifact visualization should be
            enabled for this step.
        enable_step_logs: Enable step logs for this step.
        experiment_tracker: The experiment tracker to use for this step.
        step_operator: The step operator to use for this step.
        parameters: Function parameters for this step
        output_materializers: Output materializers for this step. If
            given as a dict, the keys must be a subset of the output names
            of this step. If a single value (type or string) is given, the
            materializer will be used for all outputs.
        settings: settings for this step.
        extra: Extra configurations for this step.
        on_failure: Callback function in event of failure of the step. Can
            be a function with a single argument of type `BaseException`, or
            a source path to such a function (e.g. `module.my_function`).
        on_success: Callback function in event of success of the step. Can
            be a function with no arguments, or a source path to such a
            function (e.g. `module.my_function`).
        model: configuration of the model version in the Model Control Plane.
        retry: Configuration for retrying the step in case of failure.
        substitutions: Extra placeholders to use in the name template.
    """
    from zenml.config.step_configurations import PartialStepConfiguration

    self.entrypoint_definition = validate_entrypoint_function(
        self.entrypoint,
        reserved_arguments=["after", "id"],
    )

    name = name or self.__class__.__name__

    logger.debug(
        "Step `%s`: Caching %s.",
        name,
        "enabled" if enable_cache is not False else "disabled",
    )
    logger.debug(
        "Step `%s`: Artifact metadata %s.",
        name,
        "enabled" if enable_artifact_metadata is not False else "disabled",
    )
    logger.debug(
        "Step `%s`: Artifact visualization %s.",
        name,
        "enabled"
        if enable_artifact_visualization is not False
        else "disabled",
    )
    logger.debug(
        "Step `%s`: logs %s.",
        name,
        "enabled" if enable_step_logs is not False else "disabled",
    )
    if model is not None:
        logger.debug(
            "Step `%s`: Is in Model context %s.",
            name,
            {
                "model": model.name,
                "version": model.version,
            },
        )

    self._configuration = PartialStepConfiguration(
        name=name,
        enable_cache=enable_cache,
        enable_artifact_metadata=enable_artifact_metadata,
        enable_artifact_visualization=enable_artifact_visualization,
        enable_step_logs=enable_step_logs,
    )
    self.configure(
        experiment_tracker=experiment_tracker,
        step_operator=step_operator,
        output_materializers=output_materializers,
        parameters=parameters,
        settings=settings,
        extra=extra,
        on_failure=on_failure,
        on_success=on_success,
        model=model,
        retry=retry,
        substitutions=substitutions,
    )

    notebook_utils.try_to_save_notebook_cell_code(self.source_object)
Attributes
caching_parameters: Dict[str, Any] property

Caching parameters for this step.

Returns:

Type Description
Dict[str, Any]

A dictionary containing the caching parameters

configuration: PartialStepConfiguration property

The configuration of the step.

Returns:

Type Description
PartialStepConfiguration

The configuration of the step.

docstring: Optional[str] property

The docstring of this step.

Returns:

Type Description
Optional[str]

The docstring of this step.

enable_cache: Optional[bool] property

If caching is enabled for the step.

Returns:

Type Description
Optional[bool]

If caching is enabled for the step.

name: str property

The name of the step.

Returns:

Type Description
str

The name of the step.

source_code: str property

The source code of this step.

Returns:

Type Description
str

The source code of this step.

source_object: Any property

The source object of this step.

Returns:

Type Description
Any

The source object of this step.

Functions
call_entrypoint(*args: Any, **kwargs: Any) -> Any

Calls the entrypoint function of the step.

Parameters:

Name Type Description Default
*args Any

Entrypoint function arguments.

()
**kwargs Any

Entrypoint function keyword arguments.

{}

Returns:

Type Description
Any

The return value of the entrypoint function.

Raises:

Type Description
StepInterfaceError

If the arguments to the entrypoint function are invalid.

Source code in src/zenml/steps/base_step.py
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
def call_entrypoint(self, *args: Any, **kwargs: Any) -> Any:
    """Calls the entrypoint function of the step.

    Args:
        *args: Entrypoint function arguments.
        **kwargs: Entrypoint function keyword arguments.

    Returns:
        The return value of the entrypoint function.

    Raises:
        StepInterfaceError: If the arguments to the entrypoint function are
            invalid.
    """
    try:
        validated_args = pydantic_utils.validate_function_args(
            self.entrypoint,
            ConfigDict(arbitrary_types_allowed=True),
            *args,
            **kwargs,
        )
    except ValidationError as e:
        raise StepInterfaceError(
            "Invalid step function entrypoint arguments. Check out the "
            "pydantic error above for more details."
        ) from e

    return self.entrypoint(**validated_args)
configure(enable_cache: Optional[bool] = None, enable_artifact_metadata: Optional[bool] = None, enable_artifact_visualization: Optional[bool] = None, enable_step_logs: Optional[bool] = None, experiment_tracker: Optional[str] = None, step_operator: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, output_materializers: Optional[OutputMaterializersSpecification] = None, settings: Optional[Mapping[str, SettingsOrDict]] = None, extra: Optional[Dict[str, Any]] = None, on_failure: Optional[HookSpecification] = None, on_success: Optional[HookSpecification] = None, model: Optional[Model] = None, retry: Optional[StepRetryConfig] = None, substitutions: Optional[Dict[str, str]] = None, merge: bool = True) -> T

Configures the step.

Configuration merging example: * merge==True: step.configure(extra={"key1": 1}) step.configure(extra={"key2": 2}, merge=True) step.configuration.extra # {"key1": 1, "key2": 2} * merge==False: step.configure(extra={"key1": 1}) step.configure(extra={"key2": 2}, merge=False) step.configuration.extra # {"key2": 2}

Parameters:

Name Type Description Default
enable_cache Optional[bool]

If caching should be enabled for this step.

None
enable_artifact_metadata Optional[bool]

If artifact metadata should be enabled for this step.

None
enable_artifact_visualization Optional[bool]

If artifact visualization should be enabled for this step.

None
enable_step_logs Optional[bool]

If step logs should be enabled for this step.

None
experiment_tracker Optional[str]

The experiment tracker to use for this step.

None
step_operator Optional[str]

The step operator to use for this step.

None
parameters Optional[Dict[str, Any]]

Function parameters for this step

None
output_materializers Optional[OutputMaterializersSpecification]

Output materializers for this step. If given as a dict, the keys must be a subset of the output names of this step. If a single value (type or string) is given, the materializer will be used for all outputs.

None
settings Optional[Mapping[str, SettingsOrDict]]

settings for this step.

None
extra Optional[Dict[str, Any]]

Extra configurations for this step.

None
on_failure Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with a single argument of type BaseException, or a source path to such a function (e.g. module.my_function).

None
on_success Optional[HookSpecification]

Callback function in event of success of the step. Can be a function with no arguments, or a source path to such a function (e.g. module.my_function).

None
model Optional[Model]

Model to use for this step.

None
retry Optional[StepRetryConfig]

Configuration for retrying the step in case of failure.

None
substitutions Optional[Dict[str, str]]

Extra placeholders to use in the name template.

None
merge bool

If True, will merge the given dictionary configurations like parameters and settings with existing configurations. If False the given configurations will overwrite all existing ones. See the general description of this method for an example.

True

Returns:

Type Description
T

The step instance that this method was called on.

Source code in src/zenml/steps/base_step.py
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
def configure(
    self: T,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    experiment_tracker: Optional[str] = None,
    step_operator: Optional[str] = None,
    parameters: Optional[Dict[str, Any]] = None,
    output_materializers: Optional[
        "OutputMaterializersSpecification"
    ] = None,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
    model: Optional["Model"] = None,
    retry: Optional[StepRetryConfig] = None,
    substitutions: Optional[Dict[str, str]] = None,
    merge: bool = True,
) -> T:
    """Configures the step.

    Configuration merging example:
    * `merge==True`:
        step.configure(extra={"key1": 1})
        step.configure(extra={"key2": 2}, merge=True)
        step.configuration.extra # {"key1": 1, "key2": 2}
    * `merge==False`:
        step.configure(extra={"key1": 1})
        step.configure(extra={"key2": 2}, merge=False)
        step.configuration.extra # {"key2": 2}

    Args:
        enable_cache: If caching should be enabled for this step.
        enable_artifact_metadata: If artifact metadata should be enabled
            for this step.
        enable_artifact_visualization: If artifact visualization should be
            enabled for this step.
        enable_step_logs: If step logs should be enabled for this step.
        experiment_tracker: The experiment tracker to use for this step.
        step_operator: The step operator to use for this step.
        parameters: Function parameters for this step
        output_materializers: Output materializers for this step. If
            given as a dict, the keys must be a subset of the output names
            of this step. If a single value (type or string) is given, the
            materializer will be used for all outputs.
        settings: settings for this step.
        extra: Extra configurations for this step.
        on_failure: Callback function in event of failure of the step. Can
            be a function with a single argument of type `BaseException`, or
            a source path to such a function (e.g. `module.my_function`).
        on_success: Callback function in event of success of the step. Can
            be a function with no arguments, or a source path to such a
            function (e.g. `module.my_function`).
        model: Model to use for this step.
        retry: Configuration for retrying the step in case of failure.
        substitutions: Extra placeholders to use in the name template.
        merge: If `True`, will merge the given dictionary configurations
            like `parameters` and `settings` with existing
            configurations. If `False` the given configurations will
            overwrite all existing ones. See the general description of this
            method for an example.

    Returns:
        The step instance that this method was called on.
    """
    from zenml.config.step_configurations import StepConfigurationUpdate
    from zenml.hooks.hook_validators import resolve_and_validate_hook

    def _resolve_if_necessary(
        value: Union[str, Source, Type[Any]],
    ) -> Source:
        if isinstance(value, str):
            return Source.from_import_path(value)
        elif isinstance(value, Source):
            return value
        else:
            return source_utils.resolve(value)

    def _convert_to_tuple(value: Any) -> Tuple[Source, ...]:
        if isinstance(value, str) or not isinstance(value, Sequence):
            return (_resolve_if_necessary(value),)
        else:
            return tuple(_resolve_if_necessary(v) for v in value)

    outputs: Dict[str, Dict[str, Tuple[Source, ...]]] = defaultdict(dict)
    allowed_output_names = set(self.entrypoint_definition.outputs)

    if output_materializers:
        if not isinstance(output_materializers, Mapping):
            sources = _convert_to_tuple(output_materializers)
            output_materializers = {
                output_name: sources
                for output_name in allowed_output_names
            }

        for output_name, materializer in output_materializers.items():
            sources = _convert_to_tuple(materializer)
            outputs[output_name]["materializer_source"] = sources

    failure_hook_source = None
    if on_failure:
        # string of on_failure hook function to be used for this step
        failure_hook_source = resolve_and_validate_hook(on_failure)

    success_hook_source = None
    if on_success:
        # string of on_success hook function to be used for this step
        success_hook_source = resolve_and_validate_hook(on_success)

    values = dict_utils.remove_none_values(
        {
            "enable_cache": enable_cache,
            "enable_artifact_metadata": enable_artifact_metadata,
            "enable_artifact_visualization": enable_artifact_visualization,
            "enable_step_logs": enable_step_logs,
            "experiment_tracker": experiment_tracker,
            "step_operator": step_operator,
            "parameters": parameters,
            "settings": settings,
            "outputs": outputs or None,
            "extra": extra,
            "failure_hook_source": failure_hook_source,
            "success_hook_source": success_hook_source,
            "model": model,
            "retry": retry,
            "substitutions": substitutions,
        }
    )
    config = StepConfigurationUpdate(**values)
    self._apply_configuration(config, merge=merge)
    return self
copy() -> BaseStep

Copies the step.

Returns:

Type Description
BaseStep

The step copy.

Source code in src/zenml/steps/base_step.py
803
804
805
806
807
808
809
def copy(self) -> "BaseStep":
    """Copies the step.

    Returns:
        The step copy.
    """
    return copy.deepcopy(self)
entrypoint(*args: Any, **kwargs: Any) -> Any abstractmethod

Abstract method for core step logic.

Parameters:

Name Type Description Default
*args Any

Positional arguments passed to the step.

()
**kwargs Any

Keyword arguments passed to the step.

{}

Returns:

Type Description
Any

The output of the step.

Source code in src/zenml/steps/base_step.py
214
215
216
217
218
219
220
221
222
223
224
@abstractmethod
def entrypoint(self, *args: Any, **kwargs: Any) -> Any:
    """Abstract method for core step logic.

    Args:
        *args: Positional arguments passed to the step.
        **kwargs: Keyword arguments passed to the step.

    Returns:
        The output of the step.
    """
load_from_source(source: Union[Source, str]) -> BaseStep classmethod

Loads a step from source.

Parameters:

Name Type Description Default
source Union[Source, str]

The path to the step source.

required

Returns:

Type Description
BaseStep

The loaded step.

Raises:

Type Description
ValueError

If the source is not a valid step source.

Source code in src/zenml/steps/base_step.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
@classmethod
def load_from_source(cls, source: Union[Source, str]) -> "BaseStep":
    """Loads a step from source.

    Args:
        source: The path to the step source.

    Returns:
        The loaded step.

    Raises:
        ValueError: If the source is not a valid step source.
    """
    obj = source_utils.load(source)

    if isinstance(obj, BaseStep):
        return obj
    elif isinstance(obj, type) and issubclass(obj, BaseStep):
        return obj()
    else:
        raise ValueError("Invalid step source.")
resolve() -> Source

Resolves the step.

Returns:

Type Description
Source

The step source.

Source code in src/zenml/steps/base_step.py
248
249
250
251
252
253
254
def resolve(self) -> Source:
    """Resolves the step.

    Returns:
        The step source.
    """
    return source_utils.resolve(self.__class__)
with_options(enable_cache: Optional[bool] = None, enable_artifact_metadata: Optional[bool] = None, enable_artifact_visualization: Optional[bool] = None, enable_step_logs: Optional[bool] = None, experiment_tracker: Optional[str] = None, step_operator: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, output_materializers: Optional[OutputMaterializersSpecification] = None, settings: Optional[Mapping[str, SettingsOrDict]] = None, extra: Optional[Dict[str, Any]] = None, on_failure: Optional[HookSpecification] = None, on_success: Optional[HookSpecification] = None, model: Optional[Model] = None, retry: Optional[StepRetryConfig] = None, substitutions: Optional[Dict[str, str]] = None, merge: bool = True) -> BaseStep

Copies the step and applies the given configurations.

Parameters:

Name Type Description Default
enable_cache Optional[bool]

If caching should be enabled for this step.

None
enable_artifact_metadata Optional[bool]

If artifact metadata should be enabled for this step.

None
enable_artifact_visualization Optional[bool]

If artifact visualization should be enabled for this step.

None
enable_step_logs Optional[bool]

If step logs should be enabled for this step.

None
experiment_tracker Optional[str]

The experiment tracker to use for this step.

None
step_operator Optional[str]

The step operator to use for this step.

None
parameters Optional[Dict[str, Any]]

Function parameters for this step

None
output_materializers Optional[OutputMaterializersSpecification]

Output materializers for this step. If given as a dict, the keys must be a subset of the output names of this step. If a single value (type or string) is given, the materializer will be used for all outputs.

None
settings Optional[Mapping[str, SettingsOrDict]]

settings for this step.

None
extra Optional[Dict[str, Any]]

Extra configurations for this step.

None
on_failure Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with a single argument of type BaseException, or a source path to such a function (e.g. module.my_function).

None
on_success Optional[HookSpecification]

Callback function in event of success of the step. Can be a function with no arguments, or a source path to such a function (e.g. module.my_function).

None
model Optional[Model]

Model to use for this step.

None
retry Optional[StepRetryConfig]

Configuration for retrying the step in case of failure.

None
substitutions Optional[Dict[str, str]]

Extra placeholders for the step name.

None
merge bool

If True, will merge the given dictionary configurations like parameters and settings with existing configurations. If False the given configurations will overwrite all existing ones. See the general description of this method for an example.

True

Returns:

Type Description
BaseStep

The copied step instance.

Source code in src/zenml/steps/base_step.py
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
def with_options(
    self,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    experiment_tracker: Optional[str] = None,
    step_operator: Optional[str] = None,
    parameters: Optional[Dict[str, Any]] = None,
    output_materializers: Optional[
        "OutputMaterializersSpecification"
    ] = None,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
    model: Optional["Model"] = None,
    retry: Optional[StepRetryConfig] = None,
    substitutions: Optional[Dict[str, str]] = None,
    merge: bool = True,
) -> "BaseStep":
    """Copies the step and applies the given configurations.

    Args:
        enable_cache: If caching should be enabled for this step.
        enable_artifact_metadata: If artifact metadata should be enabled
            for this step.
        enable_artifact_visualization: If artifact visualization should be
            enabled for this step.
        enable_step_logs: If step logs should be enabled for this step.
        experiment_tracker: The experiment tracker to use for this step.
        step_operator: The step operator to use for this step.
        parameters: Function parameters for this step
        output_materializers: Output materializers for this step. If
            given as a dict, the keys must be a subset of the output names
            of this step. If a single value (type or string) is given, the
            materializer will be used for all outputs.
        settings: settings for this step.
        extra: Extra configurations for this step.
        on_failure: Callback function in event of failure of the step. Can
            be a function with a single argument of type `BaseException`, or
            a source path to such a function (e.g. `module.my_function`).
        on_success: Callback function in event of success of the step. Can
            be a function with no arguments, or a source path to such a
            function (e.g. `module.my_function`).
        model: Model to use for this step.
        retry: Configuration for retrying the step in case of failure.
        substitutions: Extra placeholders for the step name.
        merge: If `True`, will merge the given dictionary configurations
            like `parameters` and `settings` with existing
            configurations. If `False` the given configurations will
            overwrite all existing ones. See the general description of this
            method for an example.

    Returns:
        The copied step instance.
    """
    step_copy = self.copy()
    step_copy.configure(
        enable_cache=enable_cache,
        enable_artifact_metadata=enable_artifact_metadata,
        enable_artifact_visualization=enable_artifact_visualization,
        enable_step_logs=enable_step_logs,
        experiment_tracker=experiment_tracker,
        step_operator=step_operator,
        parameters=parameters,
        output_materializers=output_materializers,
        settings=settings,
        extra=extra,
        on_failure=on_failure,
        on_success=on_success,
        model=model,
        retry=retry,
        substitutions=substitutions,
        merge=merge,
    )
    return step_copy
Functions
Modules

decorated_step

Internal BaseStep subclass used by the step decorator.

Classes

entrypoint_function_utils

Util functions for step and pipeline entrypoint functions.

Classes
EntrypointFunctionDefinition

Bases: NamedTuple

Class representing a step entrypoint function.

Attributes:

Name Type Description
inputs Dict[str, Parameter]

The entrypoint function inputs.

outputs Dict[str, OutputSignature]

The entrypoint function outputs. This dictionary maps output names to output annotations.

Functions
validate_input(key: str, value: Any) -> None

Validates an input to the step entrypoint function.

Parameters:

Name Type Description Default
key str

The key for which the input was passed

required
value Any

The input value.

required

Raises:

Type Description
KeyError

If the function has no input for the given key.

RuntimeError

If a parameter is passed for an input that is annotated as an UnmaterializedArtifact.

RuntimeError

If the input value is not valid for the type annotation provided for the function parameter.

StepInterfaceError

If the input is a parameter and not JSON serializable.

Source code in src/zenml/steps/entrypoint_function_utils.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def validate_input(self, key: str, value: Any) -> None:
    """Validates an input to the step entrypoint function.

    Args:
        key: The key for which the input was passed
        value: The input value.

    Raises:
        KeyError: If the function has no input for the given key.
        RuntimeError: If a parameter is passed for an input that is
            annotated as an `UnmaterializedArtifact`.
        RuntimeError: If the input value is not valid for the type
            annotation provided for the function parameter.
        StepInterfaceError: If the input is a parameter and not JSON
            serializable.
    """
    from zenml.artifacts.external_artifact import ExternalArtifact
    from zenml.artifacts.unmaterialized_artifact import (
        UnmaterializedArtifact,
    )
    from zenml.client_lazy_loader import ClientLazyLoader
    from zenml.models import ArtifactVersionResponse

    if key not in self.inputs:
        raise KeyError(
            f"Received step entrypoint input for invalid key {key}."
        )

    parameter = self.inputs[key]

    if isinstance(
        value,
        (
            StepArtifact,
            ExternalArtifact,
            ArtifactVersionResponse,
            ClientLazyLoader,
            LazyRunMetadataResponse,
        ),
    ):
        # If we were to do any type validation for artifacts here, we
        # would not be able to leverage pydantics type coercion (e.g.
        # providing an `int` artifact for a `float` input)
        return

    # Not an artifact -> This is a parameter
    if parameter.annotation is UnmaterializedArtifact:
        raise RuntimeError(
            "Passing parameter for input of type `UnmaterializedArtifact` "
            "is not allowed."
        )

    if not yaml_utils.is_json_serializable(value):
        raise StepInterfaceError(
            f"Argument type (`{type(value)}`) for argument "
            f"'{key}' is not JSON serializable and can not be passed as "
            "a parameter. This input can either be provided by the "
            "output of another step or as an external artifact: "
            "https://docs.zenml.io/user-guides/starter-guide/manage-artifacts#managing-artifacts-not-produced-by-zenml-pipelines"
        )

    try:
        self._validate_input_value(parameter=parameter, value=value)
    except ValidationError as e:
        raise RuntimeError(
            f"Input validation failed for input '{parameter.name}': "
            f"Expected type `{parameter.annotation}` but received type "
            f"`{type(value)}`."
        ) from e
StepArtifact(invocation_id: str, output_name: str, annotation: Any, pipeline: Pipeline)

Class to represent step output artifacts.

Initialize a step artifact.

Parameters:

Name Type Description Default
invocation_id str

The ID of the invocation that produces this artifact.

required
output_name str

The name of the output that produces this artifact.

required
annotation Any

The output type annotation.

required
pipeline Pipeline

The pipeline which the invocation is part of.

required
Source code in src/zenml/steps/entrypoint_function_utils.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def __init__(
    self,
    invocation_id: str,
    output_name: str,
    annotation: Any,
    pipeline: "Pipeline",
) -> None:
    """Initialize a step artifact.

    Args:
        invocation_id: The ID of the invocation that produces this artifact.
        output_name: The name of the output that produces this artifact.
        annotation: The output type annotation.
        pipeline: The pipeline which the invocation is part of.
    """
    self.invocation_id = invocation_id
    self.output_name = output_name
    self.annotation = annotation
    self.pipeline = pipeline
Functions
Functions
validate_entrypoint_function(func: Callable[..., Any], reserved_arguments: Sequence[str] = ()) -> EntrypointFunctionDefinition

Validates a step entrypoint function.

Parameters:

Name Type Description Default
func Callable[..., Any]

The step entrypoint function to validate.

required
reserved_arguments Sequence[str]

The reserved arguments for the entrypoint function.

()

Raises:

Type Description
StepInterfaceError

If the entrypoint function has variable arguments or keyword arguments.

RuntimeError

If type annotations should be enforced and a type annotation is missing.

Returns:

Type Description
EntrypointFunctionDefinition

A validated definition of the entrypoint function.

Source code in src/zenml/steps/entrypoint_function_utils.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
def validate_entrypoint_function(
    func: Callable[..., Any],
    reserved_arguments: Sequence[str] = (),
) -> EntrypointFunctionDefinition:
    """Validates a step entrypoint function.

    Args:
        func: The step entrypoint function to validate.
        reserved_arguments: The reserved arguments for the entrypoint function.

    Raises:
        StepInterfaceError: If the entrypoint function has variable arguments
            or keyword arguments.
        RuntimeError: If type annotations should be enforced and a type
            annotation is missing.

    Returns:
        A validated definition of the entrypoint function.
    """
    signature = inspect.signature(func, follow_wrapped=True)
    validate_reserved_arguments(
        signature=signature, reserved_arguments=reserved_arguments
    )

    inputs = {}

    signature_parameters = list(signature.parameters.items())
    for key, parameter in signature_parameters:
        if parameter.kind in {parameter.VAR_POSITIONAL, parameter.VAR_KEYWORD}:
            raise StepInterfaceError(
                f"Variable args or kwargs not allowed for function "
                f"{func.__name__}."
            )

        annotation = parameter.annotation
        if annotation is parameter.empty:
            if ENFORCE_TYPE_ANNOTATIONS:
                raise RuntimeError(
                    f"Missing type annotation for input '{key}' of step "
                    f"function '{func.__name__}'."
                )

            # If a type annotation is missing, use `Any` instead
            parameter = parameter.replace(annotation=Any)

        annotation = resolve_type_annotation(annotation)
        inputs[key] = parameter

    outputs = parse_return_type_annotations(
        func=func,
        enforce_type_annotations=ENFORCE_TYPE_ANNOTATIONS,
    )

    return EntrypointFunctionDefinition(
        inputs=inputs,
        outputs=outputs,
    )
validate_reserved_arguments(signature: inspect.Signature, reserved_arguments: Sequence[str]) -> None

Validates that the signature does not contain any reserved arguments.

Parameters:

Name Type Description Default
signature Signature

The signature to validate.

required
reserved_arguments Sequence[str]

The reserved arguments for the signature.

required

Raises:

Type Description
RuntimeError

If the signature contains a reserved argument.

Source code in src/zenml/steps/entrypoint_function_utils.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def validate_reserved_arguments(
    signature: inspect.Signature, reserved_arguments: Sequence[str]
) -> None:
    """Validates that the signature does not contain any reserved arguments.

    Args:
        signature: The signature to validate.
        reserved_arguments: The reserved arguments for the signature.

    Raises:
        RuntimeError: If the signature contains a reserved argument.
    """
    for arg in reserved_arguments:
        if arg in signature.parameters:
            raise RuntimeError(f"Reserved argument name '{arg}'.")
Modules

step_context

Step context class.

Classes
StepContext(pipeline_run: PipelineRunResponse, step_run: StepRunResponse, output_materializers: Mapping[str, Sequence[Type[BaseMaterializer]]], output_artifact_uris: Mapping[str, str], output_artifact_configs: Mapping[str, Optional[ArtifactConfig]])

Provides additional context inside a step function.

This singleton class is used to access information about the current run, step run, or its outputs inside a step function.

Usage example:

from zenml.steps import get_step_context

@step
def my_trainer_step() -> Any:
    context = get_step_context()

    # get info about the current pipeline run
    current_pipeline_run = context.pipeline_run

    # get info about the current step run
    current_step_run = context.step_run

    # get info about the future output artifacts of this step
    output_artifact_uri = context.get_output_artifact_uri()

    ...

Initialize the context of the currently running step.

Parameters:

Name Type Description Default
pipeline_run PipelineRunResponse

The model of the current pipeline run.

required
step_run StepRunResponse

The model of the current step run.

required
output_materializers Mapping[str, Sequence[Type[BaseMaterializer]]]

The output materializers of the step that this context is used in.

required
output_artifact_uris Mapping[str, str]

The output artifacts of the step that this context is used in.

required
output_artifact_configs Mapping[str, Optional[ArtifactConfig]]

The outputs' ArtifactConfigs of the step that this context is used in.

required

Raises:

Type Description
StepContextError

If the keys of the output materializers and output artifacts do not match.

Source code in src/zenml/steps/step_context.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def __init__(
    self,
    pipeline_run: "PipelineRunResponse",
    step_run: "StepRunResponse",
    output_materializers: Mapping[str, Sequence[Type["BaseMaterializer"]]],
    output_artifact_uris: Mapping[str, str],
    output_artifact_configs: Mapping[str, Optional["ArtifactConfig"]],
) -> None:
    """Initialize the context of the currently running step.

    Args:
        pipeline_run: The model of the current pipeline run.
        step_run: The model of the current step run.
        output_materializers: The output materializers of the step that
            this context is used in.
        output_artifact_uris: The output artifacts of the step that this
            context is used in.
        output_artifact_configs: The outputs' ArtifactConfigs of the step that this
            context is used in.

    Raises:
        StepContextError: If the keys of the output materializers and
            output artifacts do not match.
    """
    from zenml.client import Client

    try:
        pipeline_run = Client().get_pipeline_run(pipeline_run.id)
    except KeyError:
        pass
    self.pipeline_run = pipeline_run
    try:
        step_run = Client().get_run_step(step_run.id)
    except KeyError:
        pass
    self.step_run = step_run
    self.model_version = (
        step_run.model_version or pipeline_run.model_version
    )

    self.step_name = self.step_run.name

    # set outputs
    if output_materializers.keys() != output_artifact_uris.keys():
        raise StepContextError(
            f"Mismatched keys in output materializers and output artifact "
            f"URIs for step `{self.step_name}`. Output materializer "
            f"keys: {set(output_materializers)}, output artifact URI "
            f"keys: {set(output_artifact_uris)}"
        )
    self._outputs = {
        key: StepContextOutput(
            materializer_classes=output_materializers[key],
            artifact_uri=output_artifact_uris[key],
            artifact_config=output_artifact_configs[key],
        )
        for key in output_materializers.keys()
    }
    self._cleanup_registry = CallbackRegistry()
Attributes
inputs: Dict[str, StepRunInputResponse] property

Returns the input artifacts of the current step.

Returns:

Type Description
Dict[str, StepRunInputResponse]

The input artifacts of the current step.

model: Model property

Returns configured Model.

Order of resolution to search for Model is
  1. Model from the step context
  2. Model from the pipeline context

Returns:

Type Description
Model

The Model object associated with the current step.

Raises:

Type Description
StepContextError

If no Model object was specified for the step or pipeline.

pipeline: PipelineResponse property

Returns the current pipeline.

Returns:

Type Description
PipelineResponse

The current pipeline or None.

Raises:

Type Description
StepContextError

If the pipeline run does not have a pipeline.

Functions
add_output_metadata(metadata: Dict[str, MetadataType], output_name: Optional[str] = None) -> None

Adds metadata for a given step output.

Parameters:

Name Type Description Default
metadata Dict[str, MetadataType]

The metadata to add.

required
output_name Optional[str]

Optional name of the output for which to add the metadata. If no name is given and the step only has a single output, the metadata of this output will be added. If the step has multiple outputs, an exception will be raised.

None
Source code in src/zenml/steps/step_context.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
def add_output_metadata(
    self,
    metadata: Dict[str, "MetadataType"],
    output_name: Optional[str] = None,
) -> None:
    """Adds metadata for a given step output.

    Args:
        metadata: The metadata to add.
        output_name: Optional name of the output for which to add the
            metadata. If no name is given and the step only has a single
            output, the metadata of this output will be added. If the
            step has multiple outputs, an exception will be raised.
    """
    output = self._get_output(output_name)
    if not output.run_metadata:
        output.run_metadata = {}
    output.run_metadata.update(**metadata)
add_output_tags(tags: List[str], output_name: Optional[str] = None) -> None

Adds tags for a given step output.

Parameters:

Name Type Description Default
tags List[str]

The tags to add.

required
output_name Optional[str]

Optional name of the output for which to add the tags. If no name is given and the step only has a single output, the tags of this output will be added. If the step has multiple outputs, an exception will be raised.

None
Source code in src/zenml/steps/step_context.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def add_output_tags(
    self,
    tags: List[str],
    output_name: Optional[str] = None,
) -> None:
    """Adds tags for a given step output.

    Args:
        tags: The tags to add.
        output_name: Optional name of the output for which to add the
            tags. If no name is given and the step only has a single
            output, the tags of this output will be added. If the
            step has multiple outputs, an exception will be raised.
    """
    output = self._get_output(output_name)
    if not output.tags:
        output.tags = []
    output.tags += tags
get_output_artifact_uri(output_name: Optional[str] = None) -> str

Returns the artifact URI for a given step output.

Parameters:

Name Type Description Default
output_name Optional[str]

Optional name of the output for which to get the URI. If no name is given and the step only has a single output, the URI of this output will be returned. If the step has multiple outputs, an exception will be raised.

None

Returns:

Type Description
str

Artifact URI for the given output.

Source code in src/zenml/steps/step_context.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def get_output_artifact_uri(
    self, output_name: Optional[str] = None
) -> str:
    """Returns the artifact URI for a given step output.

    Args:
        output_name: Optional name of the output for which to get the URI.
            If no name is given and the step only has a single output,
            the URI of this output will be returned. If the step has
            multiple outputs, an exception will be raised.

    Returns:
        Artifact URI for the given output.
    """
    return self._get_output(output_name).artifact_uri
get_output_materializer(output_name: Optional[str] = None, custom_materializer_class: Optional[Type[BaseMaterializer]] = None, data_type: Optional[Type[Any]] = None) -> BaseMaterializer

Returns a materializer for a given step output.

Parameters:

Name Type Description Default
output_name Optional[str]

Optional name of the output for which to get the materializer. If no name is given and the step only has a single output, the materializer of this output will be returned. If the step has multiple outputs, an exception will be raised.

None
custom_materializer_class Optional[Type[BaseMaterializer]]

If given, this BaseMaterializer subclass will be initialized with the output artifact instead of the materializer that was registered for this step output.

None
data_type Optional[Type[Any]]

If the output annotation is of type Union and the step therefore has multiple materializers configured, you can provide a data type for the output which will be used to select the correct materializer. If not provided, the first materializer will be used.

None

Returns:

Type Description
BaseMaterializer

A materializer initialized with the output artifact for

BaseMaterializer

the given output.

Source code in src/zenml/steps/step_context.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def get_output_materializer(
    self,
    output_name: Optional[str] = None,
    custom_materializer_class: Optional[Type["BaseMaterializer"]] = None,
    data_type: Optional[Type[Any]] = None,
) -> "BaseMaterializer":
    """Returns a materializer for a given step output.

    Args:
        output_name: Optional name of the output for which to get the
            materializer. If no name is given and the step only has a
            single output, the materializer of this output will be
            returned. If the step has multiple outputs, an exception
            will be raised.
        custom_materializer_class: If given, this `BaseMaterializer`
            subclass will be initialized with the output artifact instead
            of the materializer that was registered for this step output.
        data_type: If the output annotation is of type `Union` and the step
            therefore has multiple materializers configured, you can provide
            a data type for the output which will be used to select the
            correct materializer. If not provided, the first materializer
            will be used.

    Returns:
        A materializer initialized with the output artifact for
        the given output.
    """
    from zenml.utils import materializer_utils

    output = self._get_output(output_name)
    materializer_classes = output.materializer_classes
    artifact_uri = output.artifact_uri

    if custom_materializer_class:
        materializer_class = custom_materializer_class
    elif len(materializer_classes) == 1 or not data_type:
        materializer_class = materializer_classes[0]
    else:
        materializer_class = materializer_utils.select_materializer(
            data_type=data_type, materializer_classes=materializer_classes
        )

    return materializer_class(artifact_uri)
get_output_metadata(output_name: Optional[str] = None) -> Dict[str, MetadataType]

Returns the metadata for a given step output.

Parameters:

Name Type Description Default
output_name Optional[str]

Optional name of the output for which to get the metadata. If no name is given and the step only has a single output, the metadata of this output will be returned. If the step has multiple outputs, an exception will be raised.

None

Returns:

Type Description
Dict[str, MetadataType]

Metadata for the given output.

Source code in src/zenml/steps/step_context.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def get_output_metadata(
    self, output_name: Optional[str] = None
) -> Dict[str, "MetadataType"]:
    """Returns the metadata for a given step output.

    Args:
        output_name: Optional name of the output for which to get the
            metadata. If no name is given and the step only has a single
            output, the metadata of this output will be returned. If the
            step has multiple outputs, an exception will be raised.

    Returns:
        Metadata for the given output.
    """
    output = self._get_output(output_name)
    custom_metadata = output.run_metadata or {}
    if output.artifact_config:
        custom_metadata.update(
            **(output.artifact_config.run_metadata or {})
        )
    return custom_metadata
get_output_tags(output_name: Optional[str] = None) -> List[str]

Returns the tags for a given step output.

Parameters:

Name Type Description Default
output_name Optional[str]

Optional name of the output for which to get the metadata. If no name is given and the step only has a single output, the metadata of this output will be returned. If the step has multiple outputs, an exception will be raised.

None

Returns:

Type Description
List[str]

Tags for the given output.

Source code in src/zenml/steps/step_context.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def get_output_tags(self, output_name: Optional[str] = None) -> List[str]:
    """Returns the tags for a given step output.

    Args:
        output_name: Optional name of the output for which to get the
            metadata. If no name is given and the step only has a single
            output, the metadata of this output will be returned. If the
            step has multiple outputs, an exception will be raised.

    Returns:
        Tags for the given output.
    """
    output = self._get_output(output_name)
    custom_tags = set(output.tags or [])
    if output.artifact_config:
        return list(
            set(output.artifact_config.tags or []).union(custom_tags)
        )
    return list(custom_tags)
remove_output_tags(tags: List[str], output_name: Optional[str] = None) -> None

Removes tags for a given step output.

Parameters:

Name Type Description Default
tags List[str]

The tags to remove.

required
output_name Optional[str]

Optional name of the output for which to remove the tags. If no name is given and the step only has a single output, the tags of this output will be removed. If the step has multiple outputs, an exception will be raised.

None
Source code in src/zenml/steps/step_context.py
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
def remove_output_tags(
    self,
    tags: List[str],
    output_name: Optional[str] = None,
) -> None:
    """Removes tags for a given step output.

    Args:
        tags: The tags to remove.
        output_name: Optional name of the output for which to remove the
            tags. If no name is given and the step only has a single
            output, the tags of this output will be removed. If the
            step has multiple outputs, an exception will be raised.
    """
    output = self._get_output(output_name)
    if not output.tags:
        return
    output.tags = [tag for tag in output.tags if tag not in tags]
StepContextOutput(materializer_classes: Sequence[Type[BaseMaterializer]], artifact_uri: str, artifact_config: Optional[ArtifactConfig])

Represents a step output in the step context.

Initialize the step output.

Parameters:

Name Type Description Default
materializer_classes Sequence[Type[BaseMaterializer]]

The materializer classes for the output.

required
artifact_uri str

The artifact URI for the output.

required
artifact_config Optional[ArtifactConfig]

The ArtifactConfig object of the output.

required
Source code in src/zenml/steps/step_context.py
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
def __init__(
    self,
    materializer_classes: Sequence[Type["BaseMaterializer"]],
    artifact_uri: str,
    artifact_config: Optional["ArtifactConfig"],
):
    """Initialize the step output.

    Args:
        materializer_classes: The materializer classes for the output.
        artifact_uri: The artifact URI for the output.
        artifact_config: The ArtifactConfig object of the output.
    """
    self.materializer_classes = materializer_classes
    self.artifact_uri = artifact_uri
    self.artifact_config = artifact_config
Functions
Functions
get_step_context() -> StepContext

Get the context of the currently running step.

Returns:

Type Description
StepContext

The context of the currently running step.

Raises:

Type Description
RuntimeError

If no step is currently running.

Source code in src/zenml/steps/step_context.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def get_step_context() -> "StepContext":
    """Get the context of the currently running step.

    Returns:
        The context of the currently running step.

    Raises:
        RuntimeError: If no step is currently running.
    """
    if StepContext._exists():
        return StepContext()  # type: ignore
    raise RuntimeError(
        "The step context is only available inside a step function."
    )

step_decorator

Step decorator function.

Classes
Functions
step(_func: Optional[F] = None, *, name: Optional[str] = None, enable_cache: Optional[bool] = None, enable_artifact_metadata: Optional[bool] = None, enable_artifact_visualization: Optional[bool] = None, enable_step_logs: Optional[bool] = None, experiment_tracker: Optional[str] = None, step_operator: Optional[str] = None, output_materializers: Optional[OutputMaterializersSpecification] = None, settings: Optional[Dict[str, SettingsOrDict]] = None, extra: Optional[Dict[str, Any]] = None, on_failure: Optional[HookSpecification] = None, on_success: Optional[HookSpecification] = None, model: Optional[Model] = None, retry: Optional[StepRetryConfig] = None, substitutions: Optional[Dict[str, str]] = None) -> Union[BaseStep, Callable[[F], BaseStep]]
step(_func: F) -> BaseStep
step(
    *,
    name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    experiment_tracker: Optional[str] = None,
    step_operator: Optional[str] = None,
    output_materializers: Optional[
        OutputMaterializersSpecification
    ] = None,
    settings: Optional[Dict[str, SettingsOrDict]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional[HookSpecification] = None,
    on_success: Optional[HookSpecification] = None,
    model: Optional[Model] = None,
    retry: Optional[StepRetryConfig] = None,
    substitutions: Optional[Dict[str, str]] = None,
) -> Callable[[F], BaseStep]

Decorator to create a ZenML step.

Parameters:

Name Type Description Default
_func Optional[F]

The decorated function.

None
name Optional[str]

The name of the step. If left empty, the name of the decorated function will be used as a fallback.

None
enable_cache Optional[bool]

Specify whether caching is enabled for this step. If no value is passed, caching is enabled by default.

None
enable_artifact_metadata Optional[bool]

Specify whether metadata is enabled for this step. If no value is passed, metadata is enabled by default.

None
enable_artifact_visualization Optional[bool]

Specify whether visualization is enabled for this step. If no value is passed, visualization is enabled by default.

None
enable_step_logs Optional[bool]

Specify whether step logs are enabled for this step.

None
experiment_tracker Optional[str]

The experiment tracker to use for this step.

None
step_operator Optional[str]

The step operator to use for this step.

None
output_materializers Optional[OutputMaterializersSpecification]

Output materializers for this step. If given as a dict, the keys must be a subset of the output names of this step. If a single value (type or string) is given, the materializer will be used for all outputs.

None
settings Optional[Dict[str, SettingsOrDict]]

Settings for this step.

None
extra Optional[Dict[str, Any]]

Extra configurations for this step.

None
on_failure Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with a single argument of type BaseException, or a source path to such a function (e.g. module.my_function).

None
on_success Optional[HookSpecification]

Callback function in event of success of the step. Can be a function with no arguments, or a source path to such a function (e.g. module.my_function).

None
model Optional[Model]

configuration of the model in the Model Control Plane.

None
retry Optional[StepRetryConfig]

configuration of step retry in case of step failure.

None
substitutions Optional[Dict[str, str]]

Extra placeholders for the step name.

None

Returns:

Type Description
Union[BaseStep, Callable[[F], BaseStep]]

The step instance.

Source code in src/zenml/steps/step_decorator.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def step(
    _func: Optional["F"] = None,
    *,
    name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    experiment_tracker: Optional[str] = None,
    step_operator: Optional[str] = None,
    output_materializers: Optional["OutputMaterializersSpecification"] = None,
    settings: Optional[Dict[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
    model: Optional["Model"] = None,
    retry: Optional["StepRetryConfig"] = None,
    substitutions: Optional[Dict[str, str]] = None,
) -> Union["BaseStep", Callable[["F"], "BaseStep"]]:
    """Decorator to create a ZenML step.

    Args:
        _func: The decorated function.
        name: The name of the step. If left empty, the name of the decorated
            function will be used as a fallback.
        enable_cache: Specify whether caching is enabled for this step. If no
            value is passed, caching is enabled by default.
        enable_artifact_metadata: Specify whether metadata is enabled for this
            step. If no value is passed, metadata is enabled by default.
        enable_artifact_visualization: Specify whether visualization is enabled
            for this step. If no value is passed, visualization is enabled by
            default.
        enable_step_logs: Specify whether step logs are enabled for this step.
        experiment_tracker: The experiment tracker to use for this step.
        step_operator: The step operator to use for this step.
        output_materializers: Output materializers for this step. If
            given as a dict, the keys must be a subset of the output names
            of this step. If a single value (type or string) is given, the
            materializer will be used for all outputs.
        settings: Settings for this step.
        extra: Extra configurations for this step.
        on_failure: Callback function in event of failure of the step. Can be a
            function with a single argument of type `BaseException`, or a source
            path to such a function (e.g. `module.my_function`).
        on_success: Callback function in event of success of the step. Can be a
            function with no arguments, or a source path to such a function
            (e.g. `module.my_function`).
        model: configuration of the model in the Model Control Plane.
        retry: configuration of step retry in case of step failure.
        substitutions: Extra placeholders for the step name.

    Returns:
        The step instance.
    """

    def inner_decorator(func: "F") -> "BaseStep":
        from zenml.steps.decorated_step import _DecoratedStep

        class_: Type["BaseStep"] = type(
            func.__name__,
            (_DecoratedStep,),
            {
                "entrypoint": staticmethod(func),
                "__module__": func.__module__,
                "__doc__": func.__doc__,
            },
        )

        step_instance = class_(
            name=name or func.__name__,
            enable_cache=enable_cache,
            enable_artifact_metadata=enable_artifact_metadata,
            enable_artifact_visualization=enable_artifact_visualization,
            enable_step_logs=enable_step_logs,
            experiment_tracker=experiment_tracker,
            step_operator=step_operator,
            output_materializers=output_materializers,
            settings=settings,
            extra=extra,
            on_failure=on_failure,
            on_success=on_success,
            model=model,
            retry=retry,
            substitutions=substitutions,
        )

        return step_instance

    if _func is None:
        return inner_decorator
    else:
        return inner_decorator(_func)

step_invocation

Step invocation class definition.

Classes
StepInvocation(id: str, step: BaseStep, input_artifacts: Dict[str, StepArtifact], external_artifacts: Dict[str, Union[ExternalArtifact, ArtifactVersionResponse]], model_artifacts_or_metadata: Dict[str, ModelVersionDataLazyLoader], client_lazy_loaders: Dict[str, ClientLazyLoader], parameters: Dict[str, Any], default_parameters: Dict[str, Any], upstream_steps: Set[str], pipeline: Pipeline)

Step invocation class.

Initialize a step invocation.

Parameters:

Name Type Description Default
id str

The invocation ID.

required
step BaseStep

The step that is represented by the invocation.

required
input_artifacts Dict[str, StepArtifact]

The input artifacts for the invocation.

required
external_artifacts Dict[str, Union[ExternalArtifact, ArtifactVersionResponse]]

The external artifacts for the invocation.

required
model_artifacts_or_metadata Dict[str, ModelVersionDataLazyLoader]

The model artifacts or metadata for the invocation.

required
client_lazy_loaders Dict[str, ClientLazyLoader]

The client lazy loaders for the invocation.

required
parameters Dict[str, Any]

The parameters for the invocation.

required
default_parameters Dict[str, Any]

The default parameters for the invocation.

required
upstream_steps Set[str]

The upstream steps for the invocation.

required
pipeline Pipeline

The parent pipeline of the invocation.

required
Source code in src/zenml/steps/step_invocation.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def __init__(
    self,
    id: str,
    step: "BaseStep",
    input_artifacts: Dict[str, "StepArtifact"],
    external_artifacts: Dict[
        str, Union["ExternalArtifact", "ArtifactVersionResponse"]
    ],
    model_artifacts_or_metadata: Dict[str, "ModelVersionDataLazyLoader"],
    client_lazy_loaders: Dict[str, "ClientLazyLoader"],
    parameters: Dict[str, Any],
    default_parameters: Dict[str, Any],
    upstream_steps: Set[str],
    pipeline: "Pipeline",
) -> None:
    """Initialize a step invocation.

    Args:
        id: The invocation ID.
        step: The step that is represented by the invocation.
        input_artifacts: The input artifacts for the invocation.
        external_artifacts: The external artifacts for the invocation.
        model_artifacts_or_metadata: The model artifacts or metadata for
            the invocation.
        client_lazy_loaders: The client lazy loaders for the invocation.
        parameters: The parameters for the invocation.
        default_parameters: The default parameters for the invocation.
        upstream_steps: The upstream steps for the invocation.
        pipeline: The parent pipeline of the invocation.
    """
    self.id = id
    self.step = step
    self.input_artifacts = input_artifacts
    self.external_artifacts = external_artifacts
    self.model_artifacts_or_metadata = model_artifacts_or_metadata
    self.client_lazy_loaders = client_lazy_loaders
    self.parameters = parameters
    self.default_parameters = default_parameters
    self.upstream_steps = upstream_steps
    self.pipeline = pipeline
Functions
finalize(parameters_to_ignore: Set[str]) -> StepConfiguration

Finalizes a step invocation.

It will validate the upstream steps and run final configurations on the step that is represented by the invocation.

Parameters:

Name Type Description Default
parameters_to_ignore Set[str]

Set of parameters that should not be applied to the step instance.

required

Returns:

Type Description
StepConfiguration

The finalized step configuration.

Source code in src/zenml/steps/step_invocation.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def finalize(self, parameters_to_ignore: Set[str]) -> "StepConfiguration":
    """Finalizes a step invocation.

    It will validate the upstream steps and run final configurations on the
    step that is represented by the invocation.

    Args:
        parameters_to_ignore: Set of parameters that should not be applied
            to the step instance.

    Returns:
        The finalized step configuration.
    """
    from zenml.artifacts.external_artifact_config import (
        ExternalArtifactConfiguration,
    )

    parameters_to_apply = {
        key: value
        for key, value in self.parameters.items()
        if key not in parameters_to_ignore
    }
    parameters_to_apply.update(
        {
            key: value
            for key, value in self.default_parameters.items()
            if key not in parameters_to_ignore
            and key not in parameters_to_apply
        }
    )
    self.step.configure(parameters=parameters_to_apply)

    external_artifacts: Dict[str, ExternalArtifactConfiguration] = {}
    for key, artifact in self.external_artifacts.items():
        if isinstance(artifact, ArtifactVersionResponse):
            external_artifacts[key] = ExternalArtifactConfiguration(
                id=artifact.id
            )
        else:
            if artifact.value is not None:
                artifact.upload_by_value()
            external_artifacts[key] = artifact.config

    return self.step._finalize_configuration(
        input_artifacts=self.input_artifacts,
        external_artifacts=external_artifacts,
        model_artifacts_or_metadata=self.model_artifacts_or_metadata,
        client_lazy_loaders=self.client_lazy_loaders,
    )

utils

Utility functions and classes to run ZenML steps.

Classes
OnlyNoneReturnsVisitor()

Bases: ReturnVisitor

Checks whether a function AST contains only None returns.

Initializes a visitor instance.

Source code in src/zenml/steps/utils.py
321
322
323
324
def __init__(self) -> None:
    """Initializes a visitor instance."""
    super().__init__()
    self.has_only_none_returns = True
Functions
visit_Return(node: ast.Return) -> None

Visit a return statement.

Parameters:

Name Type Description Default
node Return

The return statement to visit.

required
Source code in src/zenml/steps/utils.py
326
327
328
329
330
331
332
333
334
335
336
337
def visit_Return(self, node: ast.Return) -> None:
    """Visit a return statement.

    Args:
        node: The return statement to visit.
    """
    if node.value is not None:
        if isinstance(node.value, (ast.Constant, ast.NameConstant)):
            if node.value.value is None:
                return

        self.has_only_none_returns = False
OutputSignature

Bases: BaseModel

The signature of an output artifact.

Functions
get_output_types() -> Tuple[Any, ...]

Get all output types that match the type annotation.

Returns:

Type Description
Tuple[Any, ...]

All output types that match the type annotation.

Source code in src/zenml/steps/utils.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def get_output_types(self) -> Tuple[Any, ...]:
    """Get all output types that match the type annotation.

    Returns:
        All output types that match the type annotation.
    """
    if self.resolved_annotation is Any:
        return ()

    if typing_utils.is_union(
        typing_utils.get_origin(self.resolved_annotation)
        or self.resolved_annotation
    ):
        return tuple(
            type(None)
            if typing_utils.is_none_type(output_type)
            else output_type
            for output_type in get_args(self.resolved_annotation)
        )
    else:
        return (self.resolved_annotation,)
ReturnVisitor(ignore_nested_functions: bool = True)

Bases: NodeVisitor

AST visitor class that can be subclassed to visit function returns.

Initializes a return visitor instance.

Parameters:

Name Type Description Default
ignore_nested_functions bool

If True, will skip visiting nested functions.

True
Source code in src/zenml/steps/utils.py
288
289
290
291
292
293
294
295
296
def __init__(self, ignore_nested_functions: bool = True) -> None:
    """Initializes a return visitor instance.

    Args:
        ignore_nested_functions: If `True`, will skip visiting nested
            functions.
    """
    self._ignore_nested_functions = ignore_nested_functions
    self._inside_function = False
Functions
TupleReturnVisitor()

Bases: ReturnVisitor

Checks whether a function AST contains tuple returns.

Initializes a visitor instance.

Source code in src/zenml/steps/utils.py
343
344
345
346
def __init__(self) -> None:
    """Initializes a visitor instance."""
    super().__init__()
    self.has_tuple_return = False
Functions
visit_Return(node: ast.Return) -> None

Visit a return statement.

Parameters:

Name Type Description Default
node Return

The return statement to visit.

required
Source code in src/zenml/steps/utils.py
348
349
350
351
352
353
354
355
def visit_Return(self, node: ast.Return) -> None:
    """Visit a return statement.

    Args:
        node: The return statement to visit.
    """
    if isinstance(node.value, ast.Tuple) and len(node.value.elts) > 1:
        self.has_tuple_return = True
Functions
get_args(obj: Any) -> Tuple[Any, ...]

Get arguments of a type annotation.

Example

get_args(Union[int, str]) == (int, str)

Parameters:

Name Type Description Default
obj Any

The annotation.

required

Returns:

Type Description
Tuple[Any, ...]

The args of the annotation.

Source code in src/zenml/steps/utils.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def get_args(obj: Any) -> Tuple[Any, ...]:
    """Get arguments of a type annotation.

    Example:
        `get_args(Union[int, str]) == (int, str)`

    Args:
        obj: The annotation.

    Returns:
        The args of the annotation.
    """
    return tuple(
        typing_utils.get_origin(v) or v for v in typing_utils.get_args(obj)
    )
get_artifact_config_from_annotation_metadata(annotation: Any) -> Optional[ArtifactConfig]

Get the artifact config from the annotation metadata of a step output.

Example:

get_output_name_from_annotation_metadata(int)  # None
get_output_name_from_annotation_metadata(Annotated[int, "name"]  # ArtifactConfig(name="name")
get_output_name_from_annotation_metadata(Annotated[int, ArtifactConfig(name="name", model_name="foo")]  # ArtifactConfig(name="name", model_name="foo")

Parameters:

Name Type Description Default
annotation Any

The type annotation.

required

Raises:

Type Description
ValueError

If the annotation is not following the expected format or if the name was specified multiple times or is an empty string.

Returns:

Type Description
Optional[ArtifactConfig]

The artifact config.

Source code in src/zenml/steps/utils.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def get_artifact_config_from_annotation_metadata(
    annotation: Any,
) -> Optional[ArtifactConfig]:
    """Get the artifact config from the annotation metadata of a step output.

    Example:
    ```python
    get_output_name_from_annotation_metadata(int)  # None
    get_output_name_from_annotation_metadata(Annotated[int, "name"]  # ArtifactConfig(name="name")
    get_output_name_from_annotation_metadata(Annotated[int, ArtifactConfig(name="name", model_name="foo")]  # ArtifactConfig(name="name", model_name="foo")
    ```

    Args:
        annotation: The type annotation.

    Raises:
        ValueError: If the annotation is not following the expected format
            or if the name was specified multiple times or is an empty string.

    Returns:
        The artifact config.
    """
    if (typing_utils.get_origin(annotation) or annotation) is not Annotated:
        return None

    annotation, *metadata = typing_utils.get_args(annotation)

    error_message = (
        "Artifact annotation should only contain two elements: the artifact "
        "type, and one of the following: { a static or dynamic name || "
        "an `ArtifactConfig` }, e.g.: "
        "`Annotated[int, 'output_name']` || "
        "`Annotated[int, 'output_{placeholder}']` || "
        "`Annotated[int, ArtifactConfig(name='output_{placeholder}')]` ||"
        "`Annotated[int, ArtifactConfig(name='output_name')]`."
    )

    if len(metadata) > 2:
        raise ValueError(error_message)

    # Loop over all values to also support legacy annotations of the form
    # `Annotated[int, 'output_name', ArtifactConfig(...)]`
    output_name = None
    artifact_config = None
    for metadata_instance in metadata:
        if isinstance(metadata_instance, str):
            if output_name is not None:
                raise ValueError(error_message)
            output_name = metadata_instance
        elif isinstance(metadata_instance, ArtifactConfig):
            if artifact_config is not None:
                raise ValueError(error_message)
            artifact_config = metadata_instance
        else:
            raise ValueError(error_message)

    # Consolidate output name
    if artifact_config and artifact_config.name:
        if output_name is not None:
            raise ValueError(error_message)
    elif output_name:
        if not artifact_config:
            artifact_config = ArtifactConfig(name=output_name)
        elif not artifact_config.name:
            artifact_config = artifact_config.model_copy()
            artifact_config.name = output_name

    if artifact_config and artifact_config.name == "":
        raise ValueError("Output name cannot be an empty string.")

    return artifact_config
has_only_none_returns(func: Callable[..., Any]) -> bool

Checks whether a function contains only None returns.

A None return could be either an explicit return None or an empty return statement.

Example:

def f1():
  return None

def f2():
  return

def f3(condition):
  if condition:
    return None
  else:
    return 1

has_only_none_returns(f1)  # True
has_only_none_returns(f2)  # True
has_only_none_returns(f3)  # False

Parameters:

Name Type Description Default
func Callable[..., Any]

The function to check.

required

Returns:

Type Description
bool

Whether the function contains only None returns.

Source code in src/zenml/steps/utils.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
def has_only_none_returns(func: Callable[..., Any]) -> bool:
    """Checks whether a function contains only `None` returns.

    A `None` return could be either an explicit `return None` or an empty
    `return` statement.

    Example:
    ```python
    def f1():
      return None

    def f2():
      return

    def f3(condition):
      if condition:
        return None
      else:
        return 1

    has_only_none_returns(f1)  # True
    has_only_none_returns(f2)  # True
    has_only_none_returns(f3)  # False
    ```

    Args:
        func: The function to check.

    Returns:
        Whether the function contains only `None` returns.
    """
    source = textwrap.dedent(source_code_utils.get_source_code(func))
    tree = ast.parse(source)

    visitor = OnlyNoneReturnsVisitor()
    visitor.visit(tree)

    return visitor.has_only_none_returns
has_tuple_return(func: Callable[..., Any]) -> bool

Checks whether a function returns multiple values.

Multiple values means that the return statement is followed by a tuple (with or without brackets).

Example:

def f1():
  return 1, 2

def f2():
  return (1, 2)

def f3():
  var = (1, 2)
  return var

has_tuple_return(f1)  # True
has_tuple_return(f2)  # True
has_tuple_return(f3)  # False

Parameters:

Name Type Description Default
func Callable[..., Any]

The function to check.

required

Returns:

Type Description
bool

Whether the function returns multiple values.

Source code in src/zenml/steps/utils.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
def has_tuple_return(func: Callable[..., Any]) -> bool:
    """Checks whether a function returns multiple values.

    Multiple values means that the `return` statement is followed by a tuple
    (with or without brackets).

    Example:
    ```python
    def f1():
      return 1, 2

    def f2():
      return (1, 2)

    def f3():
      var = (1, 2)
      return var

    has_tuple_return(f1)  # True
    has_tuple_return(f2)  # True
    has_tuple_return(f3)  # False
    ```

    Args:
        func: The function to check.

    Returns:
        Whether the function returns multiple values.
    """
    source = textwrap.dedent(source_code_utils.get_source_code(func))
    tree = ast.parse(source)

    visitor = TupleReturnVisitor()
    visitor.visit(tree)

    return visitor.has_tuple_return
log_step_metadata(metadata: Dict[str, MetadataType], step_name: Optional[str] = None, pipeline_name_id_or_prefix: Optional[Union[str, UUID]] = None, run_id: Optional[str] = None) -> None

Logs step metadata.

Parameters:

Name Type Description Default
metadata Dict[str, MetadataType]

The metadata to log.

required
step_name Optional[str]

The name of the step to log metadata for. Can be omitted when being called inside a step.

None
pipeline_name_id_or_prefix Optional[Union[str, UUID]]

The name of the pipeline to log metadata for. Can be omitted when being called inside a step.

None
run_id Optional[str]

The ID of the run to log metadata for. Can be omitted when being called inside a step.

None

Raises:

Type Description
ValueError

If no step name is provided and the function is not called from within a step or if no pipeline name or ID is provided and the function is not called from within a step.

Source code in src/zenml/steps/utils.py
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
def log_step_metadata(
    metadata: Dict[str, "MetadataType"],
    step_name: Optional[str] = None,
    pipeline_name_id_or_prefix: Optional[Union[str, UUID]] = None,
    run_id: Optional[str] = None,
) -> None:
    """Logs step metadata.

    Args:
        metadata: The metadata to log.
        step_name: The name of the step to log metadata for. Can be omitted
            when being called inside a step.
        pipeline_name_id_or_prefix: The name of the pipeline to log metadata
            for. Can be omitted when being called inside a step.
        run_id: The ID of the run to log metadata for. Can be omitted when
            being called inside a step.

    Raises:
        ValueError: If no step name is provided and the function is not called
            from within a step or if no pipeline name or ID is provided and
            the function is not called from within a step.
    """
    logger.warning(
        "The `log_step_metadata` function is deprecated and will soon be "
        "removed. Please use `log_metadata` instead."
    )

    step_context = None
    if not step_name:
        with contextlib.suppress(RuntimeError):
            step_context = get_step_context()
            step_name = step_context.step_name
    # not running within a step and no user-provided step name
    if not step_name:
        raise ValueError(
            "No step name provided and you are not running "
            "within a step. Please provide a step name."
        )

    client = Client()
    if step_context:
        step_run_id = step_context.step_run.id
    elif run_id:
        step_run_id = UUID(int=int(run_id))
    else:
        if not pipeline_name_id_or_prefix:
            raise ValueError(
                "No pipeline name or ID provided and you are not running "
                "within a step. Please provide a pipeline name or ID, or "
                "provide a run ID."
            )
        pipeline_run = client.get_pipeline(
            name_id_or_prefix=pipeline_name_id_or_prefix,
        ).last_run
        step_run_id = pipeline_run.steps[step_name].id
    client.create_run_metadata(
        metadata=metadata,
        resources=[
            RunMetadataResource(
                id=step_run_id, type=MetadataResourceTypes.STEP_RUN
            )
        ],
    )
parse_return_type_annotations(func: Callable[..., Any], enforce_type_annotations: bool = False) -> Dict[str, OutputSignature]

Parse the return type annotation of a step function.

Parameters:

Name Type Description Default
func Callable[..., Any]

The step function.

required
enforce_type_annotations bool

If True, raises an exception if a type annotation is missing.

False

Raises:

Type Description
RuntimeError

If the output annotation has variable length or contains duplicate output names.

RuntimeError

If type annotations should be enforced and a type annotation is missing.

Returns:

Type Description
Dict[str, OutputSignature]
  • A dictionary mapping output names to their output signatures.
Source code in src/zenml/steps/utils.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def parse_return_type_annotations(
    func: Callable[..., Any],
    enforce_type_annotations: bool = False,
) -> Dict[str, OutputSignature]:
    """Parse the return type annotation of a step function.

    Args:
        func: The step function.
        enforce_type_annotations: If `True`, raises an exception if a type
            annotation is missing.

    Raises:
        RuntimeError: If the output annotation has variable length or contains
            duplicate output names.
        RuntimeError: If type annotations should be enforced and a type
            annotation is missing.

    Returns:
        - A dictionary mapping output names to their output signatures.
    """
    signature = inspect.signature(func, follow_wrapped=True)
    return_annotation = signature.return_annotation
    output_name: Optional[str]

    # Return type annotated as `None`
    if return_annotation is None:
        return {}

    # Return type not annotated -> check whether `None` or `Any` should be used
    if return_annotation is signature.empty:
        if enforce_type_annotations:
            raise RuntimeError(
                "Missing return type annotation for step function "
                f"'{func.__name__}'."
            )
        elif has_only_none_returns(func):
            return {}
        else:
            return_annotation = Any

    if typing_utils.get_origin(return_annotation) is tuple:
        requires_multiple_artifacts = has_tuple_return(func)
        if requires_multiple_artifacts:
            output_signature: Dict[str, Any] = {}
            args = typing_utils.get_args(return_annotation)
            if args[-1] is Ellipsis:
                raise RuntimeError(
                    "Variable length output annotations are not allowed."
                )
            for i, annotation in enumerate(args):
                resolved_annotation = resolve_type_annotation(annotation)
                artifact_config = get_artifact_config_from_annotation_metadata(
                    annotation
                )
                output_name = artifact_config.name if artifact_config else None
                has_custom_name = output_name is not None
                output_name = output_name or f"output_{i}"
                if output_name in output_signature:
                    raise RuntimeError(f"Duplicate output name {output_name}.")

                output_signature[output_name] = OutputSignature(
                    resolved_annotation=resolved_annotation,
                    artifact_config=artifact_config,
                    has_custom_name=has_custom_name,
                )
            return output_signature

    # Return type is annotated as single value or is a tuple
    resolved_annotation = resolve_type_annotation(return_annotation)
    artifact_config = get_artifact_config_from_annotation_metadata(
        return_annotation
    )
    output_name = artifact_config.name if artifact_config else None
    has_custom_name = output_name is not None
    output_name = output_name or SINGLE_RETURN_OUT_NAME
    return {
        output_name: OutputSignature(
            resolved_annotation=resolved_annotation,
            artifact_config=artifact_config,
            has_custom_name=has_custom_name,
        )
    }
resolve_type_annotation(obj: Any) -> Any

Returns the non-generic class for generic aliases of the typing module.

Example: if the input object is typing.Dict, this method will return the concrete class dict.

Parameters:

Name Type Description Default
obj Any

The object to resolve.

required

Returns:

Type Description
Any

The non-generic class for generic aliases of the typing module.

Source code in src/zenml/steps/utils.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def resolve_type_annotation(obj: Any) -> Any:
    """Returns the non-generic class for generic aliases of the typing module.

    Example: if the input object is `typing.Dict`, this method will return the
    concrete class `dict`.

    Args:
        obj: The object to resolve.

    Returns:
        The non-generic class for generic aliases of the typing module.
    """
    origin = typing_utils.get_origin(obj) or obj

    if origin is Annotated:
        annotation, *_ = typing_utils.get_args(obj)
        return resolve_type_annotation(annotation)
    elif typing_utils.is_union(origin):
        return obj

    return origin
run_as_single_step_pipeline(__step: BaseStep, *args: Any, **kwargs: Any) -> Any

Runs the step as a single step pipeline.

  • All inputs that are not JSON serializable will be uploaded to the artifact store before the pipeline is being executed.
  • All output artifacts of the step will be loaded using the materializer that was used to store them.

Parameters:

Name Type Description Default
*args Any

Entrypoint function arguments.

()
**kwargs Any

Entrypoint function keyword arguments.

{}

Raises:

Type Description
RuntimeError

If the step execution failed.

StepInterfaceError

If the arguments to the entrypoint function are invalid.

Returns:

Type Description
Any

The output of the step entrypoint function.

Source code in src/zenml/steps/utils.py
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
def run_as_single_step_pipeline(
    __step: "BaseStep", *args: Any, **kwargs: Any
) -> Any:
    """Runs the step as a single step pipeline.

    - All inputs that are not JSON serializable will be uploaded to the
    artifact store before the pipeline is being executed.
    - All output artifacts of the step will be loaded using the materializer
    that was used to store them.

    Args:
        *args: Entrypoint function arguments.
        **kwargs: Entrypoint function keyword arguments.

    Raises:
        RuntimeError: If the step execution failed.
        StepInterfaceError: If the arguments to the entrypoint function are
            invalid.

    Returns:
        The output of the step entrypoint function.
    """
    from zenml import ExternalArtifact, pipeline
    from zenml.config.base_settings import BaseSettings
    from zenml.pipelines.run_utils import (
        wait_for_pipeline_run_to_finish,
    )

    logger.info(
        "Running single step pipeline to execute step `%s`", __step.name
    )

    try:
        validated_arguments = (
            inspect.signature(__step.entrypoint)
            .bind(*args, **kwargs)
            .arguments
        )
    except TypeError as e:
        raise StepInterfaceError(
            "Invalid step function entrypoint arguments. Check out the "
            "error above for more details."
        ) from e

    inputs: Dict[str, Any] = {}
    for key, value in validated_arguments.items():
        try:
            __step.entrypoint_definition.validate_input(key=key, value=value)
            inputs[key] = value
        except Exception:
            inputs[key] = ExternalArtifact(value=value)

    orchestrator = Client().active_stack.orchestrator

    pipeline_settings: Any = {}
    if "synchronous" in type(orchestrator.config).model_fields:
        # Make sure the orchestrator runs sync so we stream the logs
        key = settings_utils.get_stack_component_setting_key(orchestrator)
        pipeline_settings[key] = BaseSettings(synchronous=True)

    @pipeline(name=__step.name, enable_cache=False, settings=pipeline_settings)
    def single_step_pipeline() -> None:
        __step(**inputs)

    run = single_step_pipeline.with_options(unlisted=True)()
    assert run
    run = wait_for_pipeline_run_to_finish(run.id)

    if run.status != ExecutionStatus.COMPLETED:
        raise RuntimeError("Failed to execute step %s.", __step.name)

    # 4. Load output artifacts
    step_run = next(iter(run.steps.values()))
    outputs = [
        artifact_version.load()
        for output_name in step_run.config.outputs.keys()
        for artifact_version in step_run.outputs[output_name]
        if artifact_version.save_type == ArtifactSaveType.STEP_OUTPUT
    ]

    if len(outputs) == 0:
        return None
    elif len(outputs) == 1:
        return outputs[0]
    else:
        return tuple(outputs)
Modules