Skip to content

Pipelines

PipelineContext

Provides pipeline configuration context.

Usage example:

from zenml import get_pipeline_context

...

@pipeline(
    extra={
        "complex_parameter": [
            ("sklearn.tree", "DecisionTreeClassifier"),
            ("sklearn.ensemble", "RandomForestClassifier"),
        ]
    }
)
def my_pipeline():
    context = get_pipeline_context()

    after = []
    search_steps_prefix = "hp_tuning_search_"
    for i, model_search_configuration in enumerate(
        context.extra["complex_parameter"]
    ):
        step_name = f"{search_steps_prefix}{i}"
        cross_validation(
            model_package=model_search_configuration[0],
            model_class=model_search_configuration[1],
            id=step_name
        )
        after.append(step_name)
    select_best_model(
        search_steps_prefix=search_steps_prefix,
        after=after,
    )
Source code in src/zenml/pipelines/pipeline_context.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class PipelineContext:
    """Provides pipeline configuration context.

    Usage example:

    ```python
    from zenml import get_pipeline_context

    ...

    @pipeline(
        extra={
            "complex_parameter": [
                ("sklearn.tree", "DecisionTreeClassifier"),
                ("sklearn.ensemble", "RandomForestClassifier"),
            ]
        }
    )
    def my_pipeline():
        context = get_pipeline_context()

        after = []
        search_steps_prefix = "hp_tuning_search_"
        for i, model_search_configuration in enumerate(
            context.extra["complex_parameter"]
        ):
            step_name = f"{search_steps_prefix}{i}"
            cross_validation(
                model_package=model_search_configuration[0],
                model_class=model_search_configuration[1],
                id=step_name
            )
            after.append(step_name)
        select_best_model(
            search_steps_prefix=search_steps_prefix,
            after=after,
        )
    ```
    """

    def __init__(self, pipeline_configuration: "PipelineConfiguration"):
        """Initialize the context of the current pipeline.

        Args:
            pipeline_configuration: The configuration of the pipeline derived
                from Pipeline class.
        """
        self.name = pipeline_configuration.name
        self.enable_cache = pipeline_configuration.enable_cache
        self.enable_artifact_metadata = (
            pipeline_configuration.enable_artifact_metadata
        )
        self.enable_artifact_visualization = (
            pipeline_configuration.enable_artifact_visualization
        )
        self.enable_step_logs = pipeline_configuration.enable_step_logs
        self.settings = pipeline_configuration.settings
        self.extra = pipeline_configuration.extra
        self.model = pipeline_configuration.model

__init__(pipeline_configuration)

Initialize the context of the current pipeline.

Parameters:

Name Type Description Default
pipeline_configuration PipelineConfiguration

The configuration of the pipeline derived from Pipeline class.

required
Source code in src/zenml/pipelines/pipeline_context.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def __init__(self, pipeline_configuration: "PipelineConfiguration"):
    """Initialize the context of the current pipeline.

    Args:
        pipeline_configuration: The configuration of the pipeline derived
            from Pipeline class.
    """
    self.name = pipeline_configuration.name
    self.enable_cache = pipeline_configuration.enable_cache
    self.enable_artifact_metadata = (
        pipeline_configuration.enable_artifact_metadata
    )
    self.enable_artifact_visualization = (
        pipeline_configuration.enable_artifact_visualization
    )
    self.enable_step_logs = pipeline_configuration.enable_step_logs
    self.settings = pipeline_configuration.settings
    self.extra = pipeline_configuration.extra
    self.model = pipeline_configuration.model

Schedule

Bases: BaseModel

Class for defining a pipeline schedule.

Attributes:

Name Type Description
name Optional[str]

Optional name to give to the schedule. If not set, a default name will be generated based on the pipeline name and the current date and time.

cron_expression Optional[str]

Cron expression for the pipeline schedule. If a value for this is set it takes precedence over the start time + interval.

start_time Optional[datetime]

When the schedule should start. If this is a datetime object without any timezone, it is treated as a datetime in the local timezone.

end_time Optional[datetime]

When the schedule should end. If this is a datetime object without any timezone, it is treated as a datetime in the local timezone.

interval_second Optional[timedelta]

datetime timedelta indicating the seconds between two recurring runs for a periodic schedule.

catchup bool

Whether the recurring run should catch up if behind schedule. For example, if the recurring run is paused for a while and re-enabled afterward. If catchup=True, the scheduler will catch up on (backfill) each missed interval. Otherwise, it only schedules the latest interval if more than one interval is ready to be scheduled. Usually, if your pipeline handles backfill internally, you should turn catchup off to avoid duplicate backfill.

run_once_start_time Optional[datetime]

When to run the pipeline once. If this is a datetime object without any timezone, it is treated as a datetime in the local timezone.

Source code in src/zenml/config/schedule.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class Schedule(BaseModel):
    """Class for defining a pipeline schedule.

    Attributes:
        name: Optional name to give to the schedule. If not set, a default name
            will be generated based on the pipeline name and the current date
            and time.
        cron_expression: Cron expression for the pipeline schedule. If a value
            for this is set it takes precedence over the start time + interval.
        start_time: When the schedule should start. If this is a datetime object
            without any timezone, it is treated as a datetime in the local
            timezone.
        end_time: When the schedule should end. If this is a datetime object
            without any timezone, it is treated as a datetime in the local
            timezone.
        interval_second: datetime timedelta indicating the seconds between two
            recurring runs for a periodic schedule.
        catchup: Whether the recurring run should catch up if behind schedule.
            For example, if the recurring run is paused for a while and
            re-enabled afterward. If catchup=True, the scheduler will catch
            up on (backfill) each missed interval. Otherwise, it only
            schedules the latest interval if more than one interval is ready to
            be scheduled. Usually, if your pipeline handles backfill
            internally, you should turn catchup off to avoid duplicate backfill.
        run_once_start_time: When to run the pipeline once. If this is a
            datetime object without any timezone, it is treated as a datetime
            in the local timezone.
    """

    name: Optional[str] = None
    cron_expression: Optional[str] = None
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None
    interval_second: Optional[timedelta] = None
    catchup: bool = False
    run_once_start_time: Optional[datetime] = None

    @field_validator(
        "start_time", "end_time", "run_once_start_time", mode="after"
    )
    @classmethod
    def _ensure_timezone(
        cls, value: Optional[datetime], info: ValidationInfo
    ) -> Optional[datetime]:
        """Ensures that all datetimes are timezone aware.

        Args:
            value: The datetime.
            info: The validation info.

        Returns:
            A timezone aware datetime or None.
        """
        if value and value.tzinfo is None:
            assert info.field_name
            logger.warning(
                "Your schedule `%s` is missing a timezone. It will be treated "
                "as a datetime in your local timezone.",
                info.field_name,
            )
            value = value.astimezone()

        return value

    @model_validator(mode="after")
    def _ensure_cron_or_periodic_schedule_configured(self) -> "Schedule":
        """Ensures that the cron expression or start time + interval are set.

        Returns:
            All schedule attributes.

        Raises:
            ValueError: If no cron expression or start time + interval were
                provided.
        """
        periodic_schedule = self.start_time and self.interval_second

        if self.cron_expression and periodic_schedule:
            logger.warning(
                "This schedule was created with a cron expression as well as "
                "values for `start_time` and `interval_seconds`. The resulting "
                "behavior depends on the concrete orchestrator implementation "
                "but will usually ignore the interval and use the cron "
                "expression."
            )

            return self
        elif self.cron_expression and self.run_once_start_time:
            logger.warning(
                "This schedule was created with a cron expression as well as "
                "a value for `run_once_start_time`. The resulting behavior "
                "depends on the concrete orchestrator implementation but will "
                "usually ignore the `run_once_start_time`."
            )
            return self
        elif (
            self.cron_expression
            or periodic_schedule
            or self.run_once_start_time
        ):
            return self
        else:
            raise ValueError(
                "Either a cron expression, a start time and interval seconds "
                "or a run once start time "
                "need to be set for a valid schedule."
            )

get_pipeline_context()

Get the context of the current pipeline.

Returns:

Type Description
PipelineContext

The context of the current pipeline.

Raises:

Type Description
RuntimeError

If no active pipeline is found.

RuntimeError

If inside a running step.

Source code in src/zenml/pipelines/pipeline_context.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def get_pipeline_context() -> "PipelineContext":
    """Get the context of the current pipeline.

    Returns:
        The context of the current pipeline.

    Raises:
        RuntimeError: If no active pipeline is found.
        RuntimeError: If inside a running step.
    """
    from zenml.pipelines.pipeline_definition import Pipeline

    if Pipeline.ACTIVE_PIPELINE is None:
        try:
            from zenml.steps.step_context import get_step_context

            get_step_context()
        except RuntimeError:
            raise RuntimeError("No active pipeline found.")
        else:
            raise RuntimeError(
                "Inside a step use `from zenml import get_step_context` "
                "instead."
            )

    return PipelineContext(
        pipeline_configuration=Pipeline.ACTIVE_PIPELINE.configuration
    )

pipeline(_func=None, *, name=None, enable_cache=None, enable_artifact_metadata=None, enable_step_logs=None, settings=None, tags=None, extra=None, on_failure=None, on_success=None, model=None, substitutions=None)

pipeline(_func: F) -> Pipeline
pipeline(
    *,
    name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    settings: Optional[Dict[str, SettingsOrDict]] = None,
    tags: Optional[List[str]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional[HookSpecification] = None,
    on_success: Optional[HookSpecification] = None,
    model: Optional[Model] = None,
    substitutions: Optional[Dict[str, str]] = None,
) -> Callable[[F], Pipeline]

Decorator to create a pipeline.

Parameters:

Name Type Description Default
_func Optional[F]

The decorated function.

None
name Optional[str]

The name of the pipeline. If left empty, the name of the decorated function will be used as a fallback.

None
enable_cache Optional[bool]

Whether to use caching or not.

None
enable_artifact_metadata Optional[bool]

Whether to enable artifact metadata or not.

None
enable_step_logs Optional[bool]

If step logs should be enabled for this pipeline.

None
settings Optional[Dict[str, SettingsOrDict]]

Settings for this pipeline.

None
tags Optional[List[str]]

Tags to apply to runs of the pipeline.

None
extra Optional[Dict[str, Any]]

Extra configurations for this pipeline.

None
on_failure Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with a single argument of type BaseException, or a source path to such a function (e.g. module.my_function).

None
on_success Optional[HookSpecification]

Callback function in event of success of the step. Can be a function with no arguments, or a source path to such a function (e.g. module.my_function).

None
model Optional[Model]

configuration of the model in the Model Control Plane.

None
substitutions Optional[Dict[str, str]]

Extra placeholders to use in the name templates.

None

Returns:

Type Description
Union[Pipeline, Callable[[F], Pipeline]]

A pipeline instance.

Source code in src/zenml/pipelines/pipeline_decorator.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def pipeline(
    _func: Optional["F"] = None,
    *,
    name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    settings: Optional[Dict[str, "SettingsOrDict"]] = None,
    tags: Optional[List[str]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
    model: Optional["Model"] = None,
    substitutions: Optional[Dict[str, str]] = None,
) -> Union["Pipeline", Callable[["F"], "Pipeline"]]:
    """Decorator to create a pipeline.

    Args:
        _func: The decorated function.
        name: The name of the pipeline. If left empty, the name of the
            decorated function will be used as a fallback.
        enable_cache: Whether to use caching or not.
        enable_artifact_metadata: Whether to enable artifact metadata or not.
        enable_step_logs: If step logs should be enabled for this pipeline.
        settings: Settings for this pipeline.
        tags: Tags to apply to runs of the pipeline.
        extra: Extra configurations for this pipeline.
        on_failure: Callback function in event of failure of the step. Can be a
            function with a single argument of type `BaseException`, or a source
            path to such a function (e.g. `module.my_function`).
        on_success: Callback function in event of success of the step. Can be a
            function with no arguments, or a source path to such a function
            (e.g. `module.my_function`).
        model: configuration of the model in the Model Control Plane.
        substitutions: Extra placeholders to use in the name templates.

    Returns:
        A pipeline instance.
    """

    def inner_decorator(func: "F") -> "Pipeline":
        from zenml.pipelines.pipeline_definition import Pipeline

        p = Pipeline(
            name=name or func.__name__,
            enable_cache=enable_cache,
            enable_artifact_metadata=enable_artifact_metadata,
            enable_step_logs=enable_step_logs,
            settings=settings,
            tags=tags,
            extra=extra,
            on_failure=on_failure,
            on_success=on_success,
            model=model,
            entrypoint=func,
            substitutions=substitutions,
        )

        p.__doc__ = func.__doc__
        return p

    return inner_decorator if _func is None else inner_decorator(_func)