Pipelines
PipelineContext
Provides pipeline configuration context.
Usage example:
from zenml import get_pipeline_context
...
@pipeline(
extra={
"complex_parameter": [
("sklearn.tree", "DecisionTreeClassifier"),
("sklearn.ensemble", "RandomForestClassifier"),
]
}
)
def my_pipeline():
context = get_pipeline_context()
after = []
search_steps_prefix = "hp_tuning_search_"
for i, model_search_configuration in enumerate(
context.extra["complex_parameter"]
):
step_name = f"{search_steps_prefix}{i}"
cross_validation(
model_package=model_search_configuration[0],
model_class=model_search_configuration[1],
id=step_name
)
after.append(step_name)
select_best_model(
search_steps_prefix=search_steps_prefix,
after=after,
)
Source code in src/zenml/pipelines/pipeline_context.py
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
|
__init__(pipeline_configuration)
Initialize the context of the current pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_configuration
|
PipelineConfiguration
|
The configuration of the pipeline derived from Pipeline class. |
required |
Source code in src/zenml/pipelines/pipeline_context.py
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
|
Schedule
Bases: BaseModel
Class for defining a pipeline schedule.
Attributes:
Name | Type | Description |
---|---|---|
name |
Optional[str]
|
Optional name to give to the schedule. If not set, a default name will be generated based on the pipeline name and the current date and time. |
cron_expression |
Optional[str]
|
Cron expression for the pipeline schedule. If a value for this is set it takes precedence over the start time + interval. |
start_time |
Optional[datetime]
|
When the schedule should start. If this is a datetime object without any timezone, it is treated as a datetime in the local timezone. |
end_time |
Optional[datetime]
|
When the schedule should end. If this is a datetime object without any timezone, it is treated as a datetime in the local timezone. |
interval_second |
Optional[timedelta]
|
datetime timedelta indicating the seconds between two recurring runs for a periodic schedule. |
catchup |
bool
|
Whether the recurring run should catch up if behind schedule. For example, if the recurring run is paused for a while and re-enabled afterward. If catchup=True, the scheduler will catch up on (backfill) each missed interval. Otherwise, it only schedules the latest interval if more than one interval is ready to be scheduled. Usually, if your pipeline handles backfill internally, you should turn catchup off to avoid duplicate backfill. |
run_once_start_time |
Optional[datetime]
|
When to run the pipeline once. If this is a datetime object without any timezone, it is treated as a datetime in the local timezone. |
Source code in src/zenml/config/schedule.py
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
|
get_pipeline_context()
Get the context of the current pipeline.
Returns:
Type | Description |
---|---|
PipelineContext
|
The context of the current pipeline. |
Raises:
Type | Description |
---|---|
RuntimeError
|
If no active pipeline is found. |
RuntimeError
|
If inside a running step. |
Source code in src/zenml/pipelines/pipeline_context.py
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
|
pipeline(_func=None, *, name=None, enable_cache=None, enable_artifact_metadata=None, enable_step_logs=None, settings=None, tags=None, extra=None, on_failure=None, on_success=None, model=None, substitutions=None)
pipeline(_func: F) -> Pipeline
pipeline(
*,
name: Optional[str] = None,
enable_cache: Optional[bool] = None,
enable_artifact_metadata: Optional[bool] = None,
enable_step_logs: Optional[bool] = None,
settings: Optional[Dict[str, SettingsOrDict]] = None,
tags: Optional[List[str]] = None,
extra: Optional[Dict[str, Any]] = None,
on_failure: Optional[HookSpecification] = None,
on_success: Optional[HookSpecification] = None,
model: Optional[Model] = None,
substitutions: Optional[Dict[str, str]] = None,
) -> Callable[[F], Pipeline]
Decorator to create a pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
_func
|
Optional[F]
|
The decorated function. |
None
|
name
|
Optional[str]
|
The name of the pipeline. If left empty, the name of the decorated function will be used as a fallback. |
None
|
enable_cache
|
Optional[bool]
|
Whether to use caching or not. |
None
|
enable_artifact_metadata
|
Optional[bool]
|
Whether to enable artifact metadata or not. |
None
|
enable_step_logs
|
Optional[bool]
|
If step logs should be enabled for this pipeline. |
None
|
settings
|
Optional[Dict[str, SettingsOrDict]]
|
Settings for this pipeline. |
None
|
tags
|
Optional[List[str]]
|
Tags to apply to runs of the pipeline. |
None
|
extra
|
Optional[Dict[str, Any]]
|
Extra configurations for this pipeline. |
None
|
on_failure
|
Optional[HookSpecification]
|
Callback function in event of failure of the step. Can be a
function with a single argument of type |
None
|
on_success
|
Optional[HookSpecification]
|
Callback function in event of success of the step. Can be a
function with no arguments, or a source path to such a function
(e.g. |
None
|
model
|
Optional[Model]
|
configuration of the model in the Model Control Plane. |
None
|
substitutions
|
Optional[Dict[str, str]]
|
Extra placeholders to use in the name templates. |
None
|
Returns:
Source code in src/zenml/pipelines/pipeline_decorator.py
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
|