Skip to content

Pipelines

zenml.pipelines

Attributes

__all__ = ['pipeline', 'Schedule', 'PipelineContext', 'get_pipeline_context'] module-attribute

Classes

PipelineContext(pipeline_configuration: PipelineConfiguration)

Provides pipeline configuration context.

Usage example:

from zenml import get_pipeline_context

...

@pipeline(
    extra={
        "complex_parameter": [
            ("sklearn.tree", "DecisionTreeClassifier"),
            ("sklearn.ensemble", "RandomForestClassifier"),
        ]
    }
)
def my_pipeline():
    context = get_pipeline_context()

    after = []
    search_steps_prefix = "hp_tuning_search_"
    for i, model_search_configuration in enumerate(
        context.extra["complex_parameter"]
    ):
        step_name = f"{search_steps_prefix}{i}"
        cross_validation(
            model_package=model_search_configuration[0],
            model_class=model_search_configuration[1],
            id=step_name
        )
        after.append(step_name)
    select_best_model(
        search_steps_prefix=search_steps_prefix,
        after=after,
    )

Initialize the context of the current pipeline.

Parameters:

Name Type Description Default
pipeline_configuration PipelineConfiguration

The configuration of the pipeline derived from Pipeline class.

required
Source code in src/zenml/pipelines/pipeline_context.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def __init__(self, pipeline_configuration: "PipelineConfiguration"):
    """Initialize the context of the current pipeline.

    Args:
        pipeline_configuration: The configuration of the pipeline derived
            from Pipeline class.
    """
    self.name = pipeline_configuration.name
    self.enable_cache = pipeline_configuration.enable_cache
    self.enable_artifact_metadata = (
        pipeline_configuration.enable_artifact_metadata
    )
    self.enable_artifact_visualization = (
        pipeline_configuration.enable_artifact_visualization
    )
    self.enable_step_logs = pipeline_configuration.enable_step_logs
    self.enable_pipeline_logs = pipeline_configuration.enable_pipeline_logs
    self.settings = pipeline_configuration.settings
    self.extra = pipeline_configuration.extra
    self.model = pipeline_configuration.model
Functions

Schedule

Bases: BaseModel

Class for defining a pipeline schedule.

Attributes:

Name Type Description
name Optional[str]

Optional name to give to the schedule. If not set, a default name will be generated based on the pipeline name and the current date and time.

cron_expression Optional[str]

Cron expression for the pipeline schedule. If a value for this is set it takes precedence over the start time + interval.

start_time Optional[datetime]

When the schedule should start. If this is a datetime object without any timezone, it is treated as a datetime in the local timezone.

end_time Optional[datetime]

When the schedule should end. If this is a datetime object without any timezone, it is treated as a datetime in the local timezone.

interval_second Optional[timedelta]

datetime timedelta indicating the seconds between two recurring runs for a periodic schedule.

catchup bool

Whether the recurring run should catch up if behind schedule. For example, if the recurring run is paused for a while and re-enabled afterward. If catchup=True, the scheduler will catch up on (backfill) each missed interval. Otherwise, it only schedules the latest interval if more than one interval is ready to be scheduled. Usually, if your pipeline handles backfill internally, you should turn catchup off to avoid duplicate backfill.

run_once_start_time Optional[datetime]

When to run the pipeline once. If this is a datetime object without any timezone, it is treated as a datetime in the local timezone.

Functions

get_pipeline_context() -> PipelineContext

Get the context of the current pipeline.

Returns:

Type Description
PipelineContext

The context of the current pipeline.

Raises:

Type Description
RuntimeError

If no active pipeline is found.

RuntimeError

If inside a running step.

Source code in src/zenml/pipelines/pipeline_context.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def get_pipeline_context() -> "PipelineContext":
    """Get the context of the current pipeline.

    Returns:
        The context of the current pipeline.

    Raises:
        RuntimeError: If no active pipeline is found.
        RuntimeError: If inside a running step.
    """
    from zenml.pipelines.pipeline_definition import Pipeline

    if Pipeline.ACTIVE_PIPELINE is None:
        try:
            from zenml.steps.step_context import get_step_context

            get_step_context()
        except RuntimeError:
            raise RuntimeError("No active pipeline found.")
        else:
            raise RuntimeError(
                "Inside a step use `from zenml import get_step_context` "
                "instead."
            )

    return PipelineContext(
        pipeline_configuration=Pipeline.ACTIVE_PIPELINE.configuration
    )

pipeline(_func: Optional[F] = None, *, name: Optional[str] = None, enable_cache: Optional[bool] = None, enable_artifact_metadata: Optional[bool] = None, enable_step_logs: Optional[bool] = None, environment: Optional[Dict[str, Any]] = None, secrets: Optional[List[Union[UUID, str]]] = None, enable_pipeline_logs: Optional[bool] = None, settings: Optional[Dict[str, SettingsOrDict]] = None, tags: Optional[List[Union[str, Tag]]] = None, extra: Optional[Dict[str, Any]] = None, on_failure: Optional[HookSpecification] = None, on_success: Optional[HookSpecification] = None, on_init: Optional[InitHookSpecification] = None, on_init_kwargs: Optional[Dict[str, Any]] = None, on_cleanup: Optional[HookSpecification] = None, model: Optional[Model] = None, retry: Optional[StepRetryConfig] = None, substitutions: Optional[Dict[str, str]] = None, execution_mode: Optional[ExecutionMode] = None, cache_policy: Optional[CachePolicyOrString] = None) -> Union[Pipeline, Callable[[F], Pipeline]]

pipeline(_func: F) -> Pipeline
pipeline(
    *,
    name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    environment: Optional[Dict[str, Any]] = None,
    secrets: Optional[List[Union[UUID, str]]] = None,
    enable_pipeline_logs: Optional[bool] = None,
    settings: Optional[Dict[str, SettingsOrDict]] = None,
    tags: Optional[List[Union[str, Tag]]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional[HookSpecification] = None,
    on_success: Optional[HookSpecification] = None,
    on_init: Optional[InitHookSpecification] = None,
    on_init_kwargs: Optional[Dict[str, Any]] = None,
    on_cleanup: Optional[HookSpecification] = None,
    model: Optional[Model] = None,
    retry: Optional[StepRetryConfig] = None,
    substitutions: Optional[Dict[str, str]] = None,
    execution_mode: Optional[ExecutionMode] = None,
    cache_policy: Optional[CachePolicyOrString] = None,
) -> Callable[[F], Pipeline]

Decorator to create a pipeline.

Parameters:

Name Type Description Default
_func Optional[F]

The decorated function.

None
name Optional[str]

The name of the pipeline. If left empty, the name of the decorated function will be used as a fallback.

None
enable_cache Optional[bool]

Whether to use caching or not.

None
enable_artifact_metadata Optional[bool]

Whether to enable artifact metadata or not.

None
enable_step_logs Optional[bool]

If step logs should be enabled for this pipeline.

None
environment Optional[Dict[str, Any]]

Environment variables to set when running this pipeline.

None
secrets Optional[List[Union[UUID, str]]]

Secrets to set as environment variables when running this pipeline.

None
enable_pipeline_logs Optional[bool]

If pipeline logs should be enabled for this pipeline.

None
settings Optional[Dict[str, SettingsOrDict]]

Settings for this pipeline.

None
tags Optional[List[Union[str, Tag]]]

Tags to apply to runs of the pipeline.

None
extra Optional[Dict[str, Any]]

Extra configurations for this pipeline.

None
on_failure Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with a single argument of type BaseException, or a source path to such a function (e.g. module.my_function).

None
on_success Optional[HookSpecification]

Callback function in event of success of the step. Can be a function with no arguments, or a source path to such a function (e.g. module.my_function).

None
on_init Optional[InitHookSpecification]

Callback function to run on initialization of the pipeline. Can be a function with no arguments, or a source path to such a function (e.g. module.my_function) if the function returns a value, it will be stored as the pipeline state.

None
on_init_kwargs Optional[Dict[str, Any]]

Arguments for the init hook.

None
on_cleanup Optional[HookSpecification]

Callback function to run on cleanup of the pipeline. Can be a function with no arguments, or a source path to such a function (e.g. module.my_function).

None
model Optional[Model]

configuration of the model in the Model Control Plane.

None
retry Optional[StepRetryConfig]

Retry configuration for the pipeline steps.

None
substitutions Optional[Dict[str, str]]

Extra placeholders to use in the name templates.

None
execution_mode Optional[ExecutionMode]

The execution mode to use for the pipeline.

None
cache_policy Optional[CachePolicyOrString]

Cache policy for this pipeline.

None

Returns:

Type Description
Union[Pipeline, Callable[[F], Pipeline]]

A pipeline instance.

Source code in src/zenml/pipelines/pipeline_decorator.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def pipeline(
    _func: Optional["F"] = None,
    *,
    name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    environment: Optional[Dict[str, Any]] = None,
    secrets: Optional[List[Union[UUID, str]]] = None,
    enable_pipeline_logs: Optional[bool] = None,
    settings: Optional[Dict[str, "SettingsOrDict"]] = None,
    tags: Optional[List[Union[str, "Tag"]]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
    on_init: Optional["InitHookSpecification"] = None,
    on_init_kwargs: Optional[Dict[str, Any]] = None,
    on_cleanup: Optional["HookSpecification"] = None,
    model: Optional["Model"] = None,
    retry: Optional["StepRetryConfig"] = None,
    substitutions: Optional[Dict[str, str]] = None,
    execution_mode: Optional["ExecutionMode"] = None,
    cache_policy: Optional["CachePolicyOrString"] = None,
) -> Union["Pipeline", Callable[["F"], "Pipeline"]]:
    """Decorator to create a pipeline.

    Args:
        _func: The decorated function.
        name: The name of the pipeline. If left empty, the name of the
            decorated function will be used as a fallback.
        enable_cache: Whether to use caching or not.
        enable_artifact_metadata: Whether to enable artifact metadata or not.
        enable_step_logs: If step logs should be enabled for this pipeline.
        environment: Environment variables to set when running this pipeline.
        secrets: Secrets to set as environment variables when running this
            pipeline.
        enable_pipeline_logs: If pipeline logs should be enabled for this pipeline.
        settings: Settings for this pipeline.
        tags: Tags to apply to runs of the pipeline.
        extra: Extra configurations for this pipeline.
        on_failure: Callback function in event of failure of the step. Can be a
            function with a single argument of type `BaseException`, or a source
            path to such a function (e.g. `module.my_function`).
        on_success: Callback function in event of success of the step. Can be a
            function with no arguments, or a source path to such a function
            (e.g. `module.my_function`).
        on_init: Callback function to run on initialization of the pipeline. Can
            be a function with no arguments, or a source path to such a function
            (e.g. `module.my_function`) if the function returns a value, it will
            be stored as the pipeline state.
        on_init_kwargs: Arguments for the init hook.
        on_cleanup: Callback function to run on cleanup of the pipeline. Can be a
            function with no arguments, or a source path to such a function
            (e.g. `module.my_function`).
        model: configuration of the model in the Model Control Plane.
        retry: Retry configuration for the pipeline steps.
        substitutions: Extra placeholders to use in the name templates.
        execution_mode: The execution mode to use for the pipeline.
        cache_policy: Cache policy for this pipeline.

    Returns:
        A pipeline instance.
    """

    def inner_decorator(func: "F") -> "Pipeline":
        from zenml.pipelines.pipeline_definition import Pipeline

        p = Pipeline(
            name=name or func.__name__,
            entrypoint=func,
            enable_cache=enable_cache,
            enable_artifact_metadata=enable_artifact_metadata,
            enable_step_logs=enable_step_logs,
            environment=environment,
            secrets=secrets,
            enable_pipeline_logs=enable_pipeline_logs,
            settings=settings,
            tags=tags,
            extra=extra,
            on_failure=on_failure,
            on_success=on_success,
            on_init=on_init,
            on_init_kwargs=on_init_kwargs,
            on_cleanup=on_cleanup,
            model=model,
            retry=retry,
            substitutions=substitutions,
            execution_mode=execution_mode,
            cache_policy=cache_policy,
        )

        p.__doc__ = func.__doc__
        return p

    return inner_decorator if _func is None else inner_decorator(_func)

Modules

build_utils

Pipeline build utilities.

Classes
Functions
allows_download_from_code_repository(snapshot: PipelineSnapshotBase) -> bool

Checks whether a code repository can be used to download code.

Parameters:

Name Type Description Default
snapshot PipelineSnapshotBase

The snapshot.

required

Returns:

Type Description
bool

Whether a code repository can be used to download code.

Source code in src/zenml/pipelines/build_utils.py
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
def allows_download_from_code_repository(
    snapshot: PipelineSnapshotBase,
) -> bool:
    """Checks whether a code repository can be used to download code.

    Args:
        snapshot: The snapshot.

    Returns:
        Whether a code repository can be used to download code.
    """
    for step in snapshot.step_configurations.values():
        docker_settings = step.config.docker_settings

        if docker_settings.allow_download_from_code_repository:
            return True

    return False
build_required(snapshot: PipelineSnapshotBase) -> bool

Checks whether a build is required for the snapshot and active stack.

Parameters:

Name Type Description Default
snapshot PipelineSnapshotBase

The snapshot for which to check.

required

Returns:

Type Description
bool

If a build is required.

Source code in src/zenml/pipelines/build_utils.py
53
54
55
56
57
58
59
60
61
62
63
def build_required(snapshot: "PipelineSnapshotBase") -> bool:
    """Checks whether a build is required for the snapshot and active stack.

    Args:
        snapshot: The snapshot for which to check.

    Returns:
        If a build is required.
    """
    stack = Client().active_stack
    return bool(stack.get_docker_builds(snapshot=snapshot))
code_download_possible(snapshot: PipelineSnapshotBase, code_repository: Optional[BaseCodeRepository] = None) -> bool

Checks whether code download is possible for the snapshot.

Parameters:

Name Type Description Default
snapshot PipelineSnapshotBase

The snapshot.

required
code_repository Optional[BaseCodeRepository]

If provided, this code repository can be used to download the code inside the container images.

None

Returns:

Type Description
bool

Whether code download is possible for the snapshot.

Source code in src/zenml/pipelines/build_utils.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def code_download_possible(
    snapshot: "PipelineSnapshotBase",
    code_repository: Optional["BaseCodeRepository"] = None,
) -> bool:
    """Checks whether code download is possible for the snapshot.

    Args:
        snapshot: The snapshot.
        code_repository: If provided, this code repository can be used to
            download the code inside the container images.

    Returns:
        Whether code download is possible for the snapshot.
    """
    for step in snapshot.step_configurations.values():
        if step.config.docker_settings.local_project_install_command:
            return False

        if step.config.docker_settings.allow_download_from_artifact_store:
            continue

        if (
            step.config.docker_settings.allow_download_from_code_repository
            and code_repository
        ):
            continue

        return False

    return True
compute_build_checksum(items: List[BuildConfiguration], stack: Stack, code_repository: Optional[BaseCodeRepository] = None) -> str

Compute an overall checksum for a pipeline build.

Parameters:

Name Type Description Default
items List[BuildConfiguration]

Items of the build.

required
stack Stack

The stack associated with the build. Will be used to gather its requirements.

required
code_repository Optional[BaseCodeRepository]

The code repository that will be used to download files inside the build. Will be used for its dependency specification.

None

Returns:

Type Description
str

The build checksum.

Source code in src/zenml/pipelines/build_utils.py
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
def compute_build_checksum(
    items: List["BuildConfiguration"],
    stack: "Stack",
    code_repository: Optional["BaseCodeRepository"] = None,
) -> str:
    """Compute an overall checksum for a pipeline build.

    Args:
        items: Items of the build.
        stack: The stack associated with the build. Will be used to gather
            its requirements.
        code_repository: The code repository that will be used to download
            files inside the build. Will be used for its dependency
            specification.

    Returns:
        The build checksum.
    """
    hash_ = hashlib.md5()  # nosec

    for item in items:
        key = PipelineBuildBase.get_image_key(
            component_key=item.key, step=item.step_name
        )

        settings_checksum = item.compute_settings_checksum(
            stack=stack,
            code_repository=code_repository,
        )

        hash_.update(key.encode())
        hash_.update(settings_checksum.encode())

    return hash_.hexdigest()
compute_stack_checksum(stack: StackResponse) -> str

Compute a stack checksum.

Parameters:

Name Type Description Default
stack StackResponse

The stack for which to compute the checksum.

required

Returns:

Type Description
str

The checksum.

Source code in src/zenml/pipelines/build_utils.py
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
def compute_stack_checksum(stack: StackResponse) -> str:
    """Compute a stack checksum.

    Args:
        stack: The stack for which to compute the checksum.

    Returns:
        The checksum.
    """
    hash_ = hashlib.md5()  # nosec

    # This checksum is used to see if the stack has been updated since a build
    # was created for it. We create this checksum not with specific requirements
    # as these might change with new ZenML releases, but they don't actually
    # invalidate those Docker images.
    required_integrations = sorted(
        {
            component.integration
            for components in stack.components.values()
            for component in components
            if component.integration and component.integration != "built-in"
        }
    )
    for integration in required_integrations:
        hash_.update(integration.encode())

    return hash_.hexdigest()
create_pipeline_build(snapshot: PipelineSnapshotBase, pipeline_id: Optional[UUID] = None, code_repository: Optional[BaseCodeRepository] = None) -> Optional[PipelineBuildResponse]

Builds images and registers the output in the server.

Parameters:

Name Type Description Default
snapshot PipelineSnapshotBase

The pipeline snapshot.

required
pipeline_id Optional[UUID]

The ID of the pipeline.

None
code_repository Optional[BaseCodeRepository]

If provided, this code repository will be used to download inside the build images.

None

Returns:

Type Description
Optional[PipelineBuildResponse]

The build output.

Raises:

Type Description
RuntimeError

If multiple builds with the same key but different settings were specified.

Source code in src/zenml/pipelines/build_utils.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
def create_pipeline_build(
    snapshot: "PipelineSnapshotBase",
    pipeline_id: Optional[UUID] = None,
    code_repository: Optional["BaseCodeRepository"] = None,
) -> Optional["PipelineBuildResponse"]:
    """Builds images and registers the output in the server.

    Args:
        snapshot: The pipeline snapshot.
        pipeline_id: The ID of the pipeline.
        code_repository: If provided, this code repository will be used to
            download inside the build images.

    Returns:
        The build output.

    Raises:
        RuntimeError: If multiple builds with the same key but different
            settings were specified.
    """
    client = Client()
    stack_model = Client().active_stack_model
    stack = client.active_stack
    required_builds = stack.get_docker_builds(snapshot=snapshot)

    if not required_builds:
        logger.debug("No docker builds required.")
        return None

    logger.info(
        "Building Docker image(s) for pipeline `%s`.",
        snapshot.pipeline_configuration.name,
    )
    start_time = time.time()

    docker_image_builder = PipelineDockerImageBuilder()
    images: Dict[str, BuildItem] = {}
    checksums: Dict[str, str] = {}

    for build_config in required_builds:
        combined_key = PipelineBuildBase.get_image_key(
            component_key=build_config.key, step=build_config.step_name
        )
        checksum = build_config.compute_settings_checksum(
            stack=stack, code_repository=code_repository
        )

        if combined_key in images:
            previous_checksum = images[combined_key].settings_checksum

            if previous_checksum != checksum:
                raise RuntimeError(
                    f"Trying to build image for key `{combined_key}` but "
                    "an image for this key was already built with a "
                    "different configuration. This happens if multiple "
                    "stack components specified Docker builds for the same "
                    "key in the `StackComponent.get_docker_builds(...)` "
                    "method. If you're using custom components, make sure "
                    "to provide unique keys when returning your build "
                    "configurations to avoid this error."
                )
            else:
                continue

        if checksum in checksums:
            item_key = checksums[checksum]
            image_name_or_digest = images[item_key].image
            contains_code = images[item_key].contains_code
            requires_code_download = images[item_key].requires_code_download
            dockerfile = images[item_key].dockerfile
            requirements = images[item_key].requirements
        else:
            tag = snapshot.pipeline_configuration.name
            if build_config.step_name:
                tag += f"-{build_config.step_name}"
            tag += f"-{build_config.key}"
            tag = docker_utils.sanitize_tag(tag)

            include_files = build_config.should_include_files(
                code_repository=code_repository,
            )
            requires_code_download = build_config.should_download_files(
                code_repository=code_repository,
            )
            pass_code_repo = (
                build_config.should_download_files_from_code_repository(
                    code_repository=code_repository
                )
            )

            (
                image_name_or_digest,
                dockerfile,
                requirements,
            ) = docker_image_builder.build_docker_image(
                docker_settings=build_config.settings,
                tag=tag,
                stack=stack,
                include_files=include_files,
                entrypoint=build_config.entrypoint,
                extra_files=build_config.extra_files,
                code_repository=code_repository if pass_code_repo else None,
            )
            contains_code = include_files

        images[combined_key] = BuildItem(
            image=image_name_or_digest,
            dockerfile=dockerfile,
            requirements=requirements,
            settings_checksum=checksum,
            contains_code=contains_code,
            requires_code_download=requires_code_download,
        )
        checksums[checksum] = combined_key

    logger.info("Finished building Docker image(s).")

    duration = round(time.time() - start_time)
    is_local = stack.container_registry is None
    contains_code = any(item.contains_code for item in images.values())
    build_checksum = compute_build_checksum(
        required_builds, stack=stack, code_repository=code_repository
    )
    stack_checksum = compute_stack_checksum(stack=stack_model)
    build_request = PipelineBuildRequest(
        project=client.active_project.id,
        stack=stack_model.id,
        pipeline=pipeline_id,
        is_local=is_local,
        contains_code=contains_code,
        images=images,
        zenml_version=zenml.__version__,
        python_version=platform.python_version(),
        checksum=build_checksum,
        stack_checksum=stack_checksum,
        duration=duration,
    )
    return client.zen_store.create_build(build_request)
find_existing_build(snapshot: PipelineSnapshotBase, code_repository: Optional[BaseCodeRepository] = None) -> Optional[PipelineBuildResponse]

Find an existing build for a snapshot.

Parameters:

Name Type Description Default
snapshot PipelineSnapshotBase

The snapshot for which to find an existing build.

required
code_repository Optional[BaseCodeRepository]

The code repository that will be used to download files in the images.

None

Returns:

Type Description
Optional[PipelineBuildResponse]

The existing build to reuse if found.

Source code in src/zenml/pipelines/build_utils.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def find_existing_build(
    snapshot: "PipelineSnapshotBase",
    code_repository: Optional["BaseCodeRepository"] = None,
) -> Optional["PipelineBuildResponse"]:
    """Find an existing build for a snapshot.

    Args:
        snapshot: The snapshot for which to find an existing build.
        code_repository: The code repository that will be used to download
            files in the images.

    Returns:
        The existing build to reuse if found.
    """
    client = Client()
    stack = client.active_stack

    if not stack.container_registry:
        # There can be no non-local builds that we can reuse if there is no
        # container registry in the stack.
        return None

    python_version_prefix = ".".join(platform.python_version_tuple()[:2])
    required_builds = stack.get_docker_builds(snapshot=snapshot)

    if not required_builds:
        return None

    build_checksum = compute_build_checksum(
        required_builds, stack=stack, code_repository=code_repository
    )

    matches = client.list_builds(
        sort_by="desc:created",
        size=1,
        stack_id=stack.id,
        # Until we implement stack versioning, users can still update their
        # stack to update/remove the container registry. In that case, we might
        # try to pull an image from a container registry that we don't have
        # access to. This is why we add an additional check for the container
        # registry ID here. (This is still not perfect as users can update the
        # container registry URI or config, but the best we can do)
        container_registry_id=stack.container_registry.id,
        # The build is local and it's not clear whether the images
        # exist on the current machine or if they've been overwritten.
        # TODO: Should we support this by storing the unique Docker ID for
        #   the image and checking if an image with that ID exists locally?
        is_local=False,
        # The build contains some code which might be different from the
        # local code the user is expecting to run
        contains_code=False,
        zenml_version=zenml.__version__,
        # Match all patch versions of the same Python major + minor
        python_version=f"startswith:{python_version_prefix}",
        checksum=build_checksum,
    )

    if not matches.items:
        return None

    return matches[0]
log_code_repository_usage(snapshot: PipelineSnapshotBase, local_repo_context: LocalRepositoryContext) -> None

Log what the code repository can (not) be used for given a snapshot.

Parameters:

Name Type Description Default
snapshot PipelineSnapshotBase

The snapshot.

required
local_repo_context LocalRepositoryContext

The local repository context.

required
Source code in src/zenml/pipelines/build_utils.py
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
def log_code_repository_usage(
    snapshot: PipelineSnapshotBase,
    local_repo_context: "LocalRepositoryContext",
) -> None:
    """Log what the code repository can (not) be used for given a snapshot.

    Args:
        snapshot: The snapshot.
        local_repo_context: The local repository context.
    """
    if build_required(snapshot) and allows_download_from_code_repository(
        snapshot
    ):
        if local_repo_context.is_dirty:
            logger.warning(
                "Unable to use code repository `%s` to download code or track "
                "the commit hash as there are uncommitted or untracked files.",
                local_repo_context.code_repository.name,
            )
        elif local_repo_context.has_local_changes:
            logger.warning(
                "Unable to use code repository `%s` to download code as there "
                "are unpushed commits.",
                local_repo_context.code_repository.name,
            )
        else:
            logger.info(
                "Using code repository `%s` to download code for this run.",
                local_repo_context.code_repository.name,
            )
    elif local_repo_context.is_dirty:
        logger.warning(
            "Unable to use code repository `%s` to track the commit hash as "
            "there are uncommitted or untracked files.",
            local_repo_context.code_repository.name,
        )
requires_download_from_code_repository(snapshot: PipelineSnapshotBase) -> bool

Checks whether the snapshot needs to download code from a repository.

Parameters:

Name Type Description Default
snapshot PipelineSnapshotBase

The snapshot.

required

Returns:

Type Description
bool

If the snapshot needs to download code from a code repository.

Source code in src/zenml/pipelines/build_utils.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def requires_download_from_code_repository(
    snapshot: "PipelineSnapshotBase",
) -> bool:
    """Checks whether the snapshot needs to download code from a repository.

    Args:
        snapshot: The snapshot.

    Returns:
        If the snapshot needs to download code from a code repository.
    """
    for step in snapshot.step_configurations.values():
        docker_settings = step.config.docker_settings

        if docker_settings.allow_download_from_artifact_store:
            return False

        if docker_settings.allow_including_files_in_images:
            return False

        if docker_settings.allow_download_from_code_repository:
            # The other two options are false, which means download from a
            # code repo is required.
            return True

    return False
requires_included_code(snapshot: PipelineSnapshotBase, code_repository: Optional[BaseCodeRepository] = None) -> bool

Checks whether the snapshot requires included code.

Parameters:

Name Type Description Default
snapshot PipelineSnapshotBase

The snapshot.

required
code_repository Optional[BaseCodeRepository]

If provided, this code repository can be used to download the code inside the container images.

None

Returns:

Type Description
bool

If the snapshot requires code included in the container images.

Source code in src/zenml/pipelines/build_utils.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def requires_included_code(
    snapshot: "PipelineSnapshotBase",
    code_repository: Optional["BaseCodeRepository"] = None,
) -> bool:
    """Checks whether the snapshot requires included code.

    Args:
        snapshot: The snapshot.
        code_repository: If provided, this code repository can be used to
            download the code inside the container images.

    Returns:
        If the snapshot requires code included in the container images.
    """
    for step in snapshot.step_configurations.values():
        docker_settings = step.config.docker_settings

        if docker_settings.local_project_install_command:
            # When installing a local package, we need to include the code
            # files in the container image.
            return True

        if docker_settings.allow_download_from_artifact_store:
            return False

        if docker_settings.allow_download_from_code_repository:
            if code_repository:
                continue

        if docker_settings.allow_including_files_in_images:
            return True

    return False
reuse_or_create_pipeline_build(snapshot: PipelineSnapshotBase, allow_build_reuse: bool, pipeline_id: Optional[UUID] = None, build: Union[UUID, PipelineBuildBase, None] = None, code_repository: Optional[BaseCodeRepository] = None) -> Optional[PipelineBuildResponse]

Loads or creates a pipeline build.

Parameters:

Name Type Description Default
snapshot PipelineSnapshotBase

The pipeline snapshot for which to load or create the build.

required
allow_build_reuse bool

If True, the build is allowed to reuse an existing build.

required
pipeline_id Optional[UUID]

Optional ID of the pipeline to reference in the build.

None
build Union[UUID, PipelineBuildBase, None]

Optional existing build. If given, the build will be fetched (or registered) in the database. If not given, a new build will be created.

None
code_repository Optional[BaseCodeRepository]

If provided, this code repository can be used to download code inside the container images.

None

Returns:

Type Description
Optional[PipelineBuildResponse]

The build response.

Source code in src/zenml/pipelines/build_utils.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def reuse_or_create_pipeline_build(
    snapshot: "PipelineSnapshotBase",
    allow_build_reuse: bool,
    pipeline_id: Optional[UUID] = None,
    build: Union["UUID", "PipelineBuildBase", None] = None,
    code_repository: Optional["BaseCodeRepository"] = None,
) -> Optional["PipelineBuildResponse"]:
    """Loads or creates a pipeline build.

    Args:
        snapshot: The pipeline snapshot for which to load or create the
            build.
        allow_build_reuse: If True, the build is allowed to reuse an
            existing build.
        pipeline_id: Optional ID of the pipeline to reference in the build.
        build: Optional existing build. If given, the build will be fetched
            (or registered) in the database. If not given, a new build will
            be created.
        code_repository: If provided, this code repository can be used to
            download code inside the container images.

    Returns:
        The build response.
    """
    if not build:
        if (
            allow_build_reuse
            and not snapshot.should_prevent_build_reuse
            and not requires_included_code(
                snapshot=snapshot, code_repository=code_repository
            )
            and build_required(snapshot=snapshot)
        ):
            existing_build = find_existing_build(
                snapshot=snapshot, code_repository=code_repository
            )

            if existing_build:
                logger.info(
                    "Reusing existing build `%s` for stack `%s`.",
                    existing_build.id,
                    Client().active_stack.name,
                )
                return existing_build
            else:
                logger.info(
                    "Unable to find a build to reuse. A previous build can be "
                    "reused when the following conditions are met:\n"
                    "  * The existing build was created for the same stack, "
                    "ZenML version and Python version\n"
                    "  * The stack contains a container registry\n"
                    "  * The Docker settings of the pipeline and all its steps "
                    "are the same as for the existing build."
                )

        return create_pipeline_build(
            snapshot=snapshot,
            pipeline_id=pipeline_id,
            code_repository=code_repository,
        )

    if isinstance(build, UUID):
        build_model = Client().zen_store.get_build(build_id=build)
    else:
        build_request = PipelineBuildRequest(
            project=Client().active_project.id,
            stack=Client().active_stack_model.id,
            pipeline=pipeline_id,
            **build.model_dump(),
        )
        build_model = Client().zen_store.create_build(build=build_request)

    verify_custom_build(
        build=build_model,
        snapshot=snapshot,
        code_repository=code_repository,
    )

    return build_model
should_upload_code(snapshot: PipelineSnapshotBase, build: Optional[PipelineBuildResponse], can_download_from_code_repository: bool) -> bool

Checks whether the current code should be uploaded for the snapshot.

Parameters:

Name Type Description Default
snapshot PipelineSnapshotBase

The snapshot.

required
build Optional[PipelineBuildResponse]

The build for the snapshot.

required
can_download_from_code_repository bool

Whether the code can be downloaded from a code repository.

required

Returns:

Type Description
bool

Whether the current code should be uploaded for the snapshot.

Source code in src/zenml/pipelines/build_utils.py
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
def should_upload_code(
    snapshot: PipelineSnapshotBase,
    build: Optional[PipelineBuildResponse],
    can_download_from_code_repository: bool,
) -> bool:
    """Checks whether the current code should be uploaded for the snapshot.

    Args:
        snapshot: The snapshot.
        build: The build for the snapshot.
        can_download_from_code_repository: Whether the code can be downloaded
            from a code repository.

    Returns:
        Whether the current code should be uploaded for the snapshot.
    """
    if not build:
        # No build means we don't need to download code into a Docker container
        # for step execution. In other remote orchestrators that don't use
        # Docker containers but instead use e.g. Wheels to run, the code should
        # already be included.
        return False

    if build.contains_code:
        return False

    for step in snapshot.step_configurations.values():
        docker_settings = step.config.docker_settings

        if (
            can_download_from_code_repository
            and docker_settings.allow_download_from_code_repository
        ):
            # No upload needed for this step
            continue

        if docker_settings.allow_download_from_artifact_store:
            return True

    return False
verify_custom_build(build: PipelineBuildResponse, snapshot: PipelineSnapshotBase, code_repository: Optional[BaseCodeRepository] = None) -> None

Verify a custom build for a pipeline snapshot.

Parameters:

Name Type Description Default
build PipelineBuildResponse

The build to verify.

required
snapshot PipelineSnapshotBase

The snapshot for which to verify the build.

required
code_repository Optional[BaseCodeRepository]

Code repository that will be used to download files for the snapshot.

None

Raises:

Type Description
RuntimeError

If the build can't be used for the snapshot.

Source code in src/zenml/pipelines/build_utils.py
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
def verify_custom_build(
    build: "PipelineBuildResponse",
    snapshot: "PipelineSnapshotBase",
    code_repository: Optional["BaseCodeRepository"] = None,
) -> None:
    """Verify a custom build for a pipeline snapshot.

    Args:
        build: The build to verify.
        snapshot: The snapshot for which to verify the build.
        code_repository: Code repository that will be used to download files
            for the snapshot.

    Raises:
        RuntimeError: If the build can't be used for the snapshot.
    """
    stack = Client().active_stack
    required_builds = stack.get_docker_builds(snapshot=snapshot)

    if build.stack and build.stack.id != stack.id:
        logger.warning(
            "The stack `%s` used for the build `%s` is not the same as the "
            "stack `%s` that the pipeline will run on. This could lead "
            "to issues if the stacks have different build requirements.",
            build.stack.name,
            build.id,
            stack.name,
        )

    if build.contains_code:
        logger.warning(
            "The build you specified for this run contains code and will run "
            "with the step code that was included in the Docker images which "
            "might differ from the local code in your client environment."
        )

    if build.requires_code_download:
        if requires_included_code(
            snapshot=snapshot, code_repository=code_repository
        ):
            raise RuntimeError(
                "The `DockerSettings` of the pipeline or one of its "
                "steps specify that code should be included in the Docker "
                "image, but the build you "
                "specified requires code download. Either update your "
                "`DockerSettings` or specify a different build and try "
                "again."
            )

        if (
            requires_download_from_code_repository(snapshot=snapshot)
            and not code_repository
        ):
            raise RuntimeError(
                "The `DockerSettings` of the pipeline or one of its "
                "steps specify that code should be downloaded from a "
                "code repository but "
                "there is no code repository active at your current source "
                f"root `{source_utils.get_source_root()}`."
            )

        if not code_download_possible(
            snapshot=snapshot, code_repository=code_repository
        ):
            raise RuntimeError(
                "The `DockerSettings` of the pipeline or one of its "
                "steps specify that code can not be downloaded from the "
                "artifact store, but the build you specified requires code "
                "download. Either update your `DockerSettings` or specify a "
                "different build and try again."
            )

    if build.checksum:
        build_checksum = compute_build_checksum(
            required_builds, stack=stack, code_repository=code_repository
        )
        if build_checksum != build.checksum:
            logger.warning(
                "The Docker settings used for the build `%s` are "
                "not the same as currently specified for your pipeline. "
                "This means that the build you specified to run this "
                "pipeline might be outdated and most likely contains "
                "outdated requirements.",
                build.id,
            )
    else:
        # No checksum given for the entire build, we manually check that
        # all the images exist and the setting match
        for build_config in required_builds:
            try:
                image = build.get_image(
                    component_key=build_config.key,
                    step=build_config.step_name,
                )
            except KeyError:
                raise RuntimeError(
                    "The build you specified is missing an image for key: "
                    f"{build_config.key}."
                )

            if build_config.compute_settings_checksum(
                stack=stack, code_repository=code_repository
            ) != build.get_settings_checksum(
                component_key=build_config.key, step=build_config.step_name
            ):
                logger.warning(
                    "The Docker settings used to build the image `%s` are "
                    "not the same as currently specified for your pipeline. "
                    "This means that the build you specified to run this "
                    "pipeline might be outdated and most likely contains "
                    "outdated code or requirements.",
                    image,
                )

    if build.is_local:
        logger.warning(
            "You manually specified a local build to run your pipeline. "
            "This might lead to errors if the images don't exist on "
            "your local machine or the image tags have been "
            "overwritten since the original build happened."
        )
verify_local_repository_context(snapshot: PipelineSnapshotBase, local_repo_context: Optional[LocalRepositoryContext]) -> Optional[BaseCodeRepository]

Verifies the local repository.

If the local repository exists and has no local changes, code download inside the images is possible.

Parameters:

Name Type Description Default
snapshot PipelineSnapshotBase

The pipeline snapshot.

required
local_repo_context Optional[LocalRepositoryContext]

The local repository active at the source root.

required

Raises:

Type Description
RuntimeError

If the snapshot requires code download but code download is not possible.

Returns:

Type Description
Optional[BaseCodeRepository]

The code repository from which to download files for the runs of the

Optional[BaseCodeRepository]

snapshot, or None if code download is not possible.

Source code in src/zenml/pipelines/build_utils.py
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
def verify_local_repository_context(
    snapshot: "PipelineSnapshotBase",
    local_repo_context: Optional["LocalRepositoryContext"],
) -> Optional[BaseCodeRepository]:
    """Verifies the local repository.

    If the local repository exists and has no local changes, code download
    inside the images is possible.

    Args:
        snapshot: The pipeline snapshot.
        local_repo_context: The local repository active at the source root.

    Raises:
        RuntimeError: If the snapshot requires code download but code download
            is not possible.

    Returns:
        The code repository from which to download files for the runs of the
        snapshot, or None if code download is not possible.
    """
    if build_required(snapshot=snapshot):
        if requires_download_from_code_repository(snapshot=snapshot):
            if not local_repo_context:
                raise RuntimeError(
                    "The `DockerSettings` of the pipeline or one of its "
                    "steps specify that code should be downloaded from a "
                    "code repository, but "
                    "there is no code repository active at your current source "
                    f"root `{source_utils.get_source_root()}`."
                )
            elif local_repo_context.is_dirty:
                raise RuntimeError(
                    "The `DockerSettings` of the pipeline or one of its "
                    "steps specify that code should be downloaded from a "
                    "code repository, but "
                    "the code repository active at your current source root "
                    f"`{source_utils.get_source_root()}` has uncommitted "
                    "changes."
                )
            elif local_repo_context.has_local_changes:
                raise RuntimeError(
                    "The `DockerSettings` of the pipeline or one of its "
                    "steps specify that code should be downloaded from a "
                    "code repository, but "
                    "the code repository active at your current source root "
                    f"`{source_utils.get_source_root()}` has unpushed "
                    "changes."
                )

    code_repository = None
    if local_repo_context and not local_repo_context.has_local_changes:
        code_repository = local_repo_context.code_repository

    return code_repository
Modules

pipeline_context

Pipeline context class.

Classes
PipelineContext(pipeline_configuration: PipelineConfiguration)

Provides pipeline configuration context.

Usage example:

from zenml import get_pipeline_context

...

@pipeline(
    extra={
        "complex_parameter": [
            ("sklearn.tree", "DecisionTreeClassifier"),
            ("sklearn.ensemble", "RandomForestClassifier"),
        ]
    }
)
def my_pipeline():
    context = get_pipeline_context()

    after = []
    search_steps_prefix = "hp_tuning_search_"
    for i, model_search_configuration in enumerate(
        context.extra["complex_parameter"]
    ):
        step_name = f"{search_steps_prefix}{i}"
        cross_validation(
            model_package=model_search_configuration[0],
            model_class=model_search_configuration[1],
            id=step_name
        )
        after.append(step_name)
    select_best_model(
        search_steps_prefix=search_steps_prefix,
        after=after,
    )

Initialize the context of the current pipeline.

Parameters:

Name Type Description Default
pipeline_configuration PipelineConfiguration

The configuration of the pipeline derived from Pipeline class.

required
Source code in src/zenml/pipelines/pipeline_context.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def __init__(self, pipeline_configuration: "PipelineConfiguration"):
    """Initialize the context of the current pipeline.

    Args:
        pipeline_configuration: The configuration of the pipeline derived
            from Pipeline class.
    """
    self.name = pipeline_configuration.name
    self.enable_cache = pipeline_configuration.enable_cache
    self.enable_artifact_metadata = (
        pipeline_configuration.enable_artifact_metadata
    )
    self.enable_artifact_visualization = (
        pipeline_configuration.enable_artifact_visualization
    )
    self.enable_step_logs = pipeline_configuration.enable_step_logs
    self.enable_pipeline_logs = pipeline_configuration.enable_pipeline_logs
    self.settings = pipeline_configuration.settings
    self.extra = pipeline_configuration.extra
    self.model = pipeline_configuration.model
Functions
Functions
get_pipeline_context() -> PipelineContext

Get the context of the current pipeline.

Returns:

Type Description
PipelineContext

The context of the current pipeline.

Raises:

Type Description
RuntimeError

If no active pipeline is found.

RuntimeError

If inside a running step.

Source code in src/zenml/pipelines/pipeline_context.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def get_pipeline_context() -> "PipelineContext":
    """Get the context of the current pipeline.

    Returns:
        The context of the current pipeline.

    Raises:
        RuntimeError: If no active pipeline is found.
        RuntimeError: If inside a running step.
    """
    from zenml.pipelines.pipeline_definition import Pipeline

    if Pipeline.ACTIVE_PIPELINE is None:
        try:
            from zenml.steps.step_context import get_step_context

            get_step_context()
        except RuntimeError:
            raise RuntimeError("No active pipeline found.")
        else:
            raise RuntimeError(
                "Inside a step use `from zenml import get_step_context` "
                "instead."
            )

    return PipelineContext(
        pipeline_configuration=Pipeline.ACTIVE_PIPELINE.configuration
    )

pipeline_decorator

ZenML pipeline decorator definition.

Classes
Functions
pipeline(_func: Optional[F] = None, *, name: Optional[str] = None, enable_cache: Optional[bool] = None, enable_artifact_metadata: Optional[bool] = None, enable_step_logs: Optional[bool] = None, environment: Optional[Dict[str, Any]] = None, secrets: Optional[List[Union[UUID, str]]] = None, enable_pipeline_logs: Optional[bool] = None, settings: Optional[Dict[str, SettingsOrDict]] = None, tags: Optional[List[Union[str, Tag]]] = None, extra: Optional[Dict[str, Any]] = None, on_failure: Optional[HookSpecification] = None, on_success: Optional[HookSpecification] = None, on_init: Optional[InitHookSpecification] = None, on_init_kwargs: Optional[Dict[str, Any]] = None, on_cleanup: Optional[HookSpecification] = None, model: Optional[Model] = None, retry: Optional[StepRetryConfig] = None, substitutions: Optional[Dict[str, str]] = None, execution_mode: Optional[ExecutionMode] = None, cache_policy: Optional[CachePolicyOrString] = None) -> Union[Pipeline, Callable[[F], Pipeline]]
pipeline(_func: F) -> Pipeline
pipeline(
    *,
    name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    environment: Optional[Dict[str, Any]] = None,
    secrets: Optional[List[Union[UUID, str]]] = None,
    enable_pipeline_logs: Optional[bool] = None,
    settings: Optional[Dict[str, SettingsOrDict]] = None,
    tags: Optional[List[Union[str, Tag]]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional[HookSpecification] = None,
    on_success: Optional[HookSpecification] = None,
    on_init: Optional[InitHookSpecification] = None,
    on_init_kwargs: Optional[Dict[str, Any]] = None,
    on_cleanup: Optional[HookSpecification] = None,
    model: Optional[Model] = None,
    retry: Optional[StepRetryConfig] = None,
    substitutions: Optional[Dict[str, str]] = None,
    execution_mode: Optional[ExecutionMode] = None,
    cache_policy: Optional[CachePolicyOrString] = None,
) -> Callable[[F], Pipeline]

Decorator to create a pipeline.

Parameters:

Name Type Description Default
_func Optional[F]

The decorated function.

None
name Optional[str]

The name of the pipeline. If left empty, the name of the decorated function will be used as a fallback.

None
enable_cache Optional[bool]

Whether to use caching or not.

None
enable_artifact_metadata Optional[bool]

Whether to enable artifact metadata or not.

None
enable_step_logs Optional[bool]

If step logs should be enabled for this pipeline.

None
environment Optional[Dict[str, Any]]

Environment variables to set when running this pipeline.

None
secrets Optional[List[Union[UUID, str]]]

Secrets to set as environment variables when running this pipeline.

None
enable_pipeline_logs Optional[bool]

If pipeline logs should be enabled for this pipeline.

None
settings Optional[Dict[str, SettingsOrDict]]

Settings for this pipeline.

None
tags Optional[List[Union[str, Tag]]]

Tags to apply to runs of the pipeline.

None
extra Optional[Dict[str, Any]]

Extra configurations for this pipeline.

None
on_failure Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with a single argument of type BaseException, or a source path to such a function (e.g. module.my_function).

None
on_success Optional[HookSpecification]

Callback function in event of success of the step. Can be a function with no arguments, or a source path to such a function (e.g. module.my_function).

None
on_init Optional[InitHookSpecification]

Callback function to run on initialization of the pipeline. Can be a function with no arguments, or a source path to such a function (e.g. module.my_function) if the function returns a value, it will be stored as the pipeline state.

None
on_init_kwargs Optional[Dict[str, Any]]

Arguments for the init hook.

None
on_cleanup Optional[HookSpecification]

Callback function to run on cleanup of the pipeline. Can be a function with no arguments, or a source path to such a function (e.g. module.my_function).

None
model Optional[Model]

configuration of the model in the Model Control Plane.

None
retry Optional[StepRetryConfig]

Retry configuration for the pipeline steps.

None
substitutions Optional[Dict[str, str]]

Extra placeholders to use in the name templates.

None
execution_mode Optional[ExecutionMode]

The execution mode to use for the pipeline.

None
cache_policy Optional[CachePolicyOrString]

Cache policy for this pipeline.

None

Returns:

Type Description
Union[Pipeline, Callable[[F], Pipeline]]

A pipeline instance.

Source code in src/zenml/pipelines/pipeline_decorator.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def pipeline(
    _func: Optional["F"] = None,
    *,
    name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    environment: Optional[Dict[str, Any]] = None,
    secrets: Optional[List[Union[UUID, str]]] = None,
    enable_pipeline_logs: Optional[bool] = None,
    settings: Optional[Dict[str, "SettingsOrDict"]] = None,
    tags: Optional[List[Union[str, "Tag"]]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
    on_init: Optional["InitHookSpecification"] = None,
    on_init_kwargs: Optional[Dict[str, Any]] = None,
    on_cleanup: Optional["HookSpecification"] = None,
    model: Optional["Model"] = None,
    retry: Optional["StepRetryConfig"] = None,
    substitutions: Optional[Dict[str, str]] = None,
    execution_mode: Optional["ExecutionMode"] = None,
    cache_policy: Optional["CachePolicyOrString"] = None,
) -> Union["Pipeline", Callable[["F"], "Pipeline"]]:
    """Decorator to create a pipeline.

    Args:
        _func: The decorated function.
        name: The name of the pipeline. If left empty, the name of the
            decorated function will be used as a fallback.
        enable_cache: Whether to use caching or not.
        enable_artifact_metadata: Whether to enable artifact metadata or not.
        enable_step_logs: If step logs should be enabled for this pipeline.
        environment: Environment variables to set when running this pipeline.
        secrets: Secrets to set as environment variables when running this
            pipeline.
        enable_pipeline_logs: If pipeline logs should be enabled for this pipeline.
        settings: Settings for this pipeline.
        tags: Tags to apply to runs of the pipeline.
        extra: Extra configurations for this pipeline.
        on_failure: Callback function in event of failure of the step. Can be a
            function with a single argument of type `BaseException`, or a source
            path to such a function (e.g. `module.my_function`).
        on_success: Callback function in event of success of the step. Can be a
            function with no arguments, or a source path to such a function
            (e.g. `module.my_function`).
        on_init: Callback function to run on initialization of the pipeline. Can
            be a function with no arguments, or a source path to such a function
            (e.g. `module.my_function`) if the function returns a value, it will
            be stored as the pipeline state.
        on_init_kwargs: Arguments for the init hook.
        on_cleanup: Callback function to run on cleanup of the pipeline. Can be a
            function with no arguments, or a source path to such a function
            (e.g. `module.my_function`).
        model: configuration of the model in the Model Control Plane.
        retry: Retry configuration for the pipeline steps.
        substitutions: Extra placeholders to use in the name templates.
        execution_mode: The execution mode to use for the pipeline.
        cache_policy: Cache policy for this pipeline.

    Returns:
        A pipeline instance.
    """

    def inner_decorator(func: "F") -> "Pipeline":
        from zenml.pipelines.pipeline_definition import Pipeline

        p = Pipeline(
            name=name or func.__name__,
            entrypoint=func,
            enable_cache=enable_cache,
            enable_artifact_metadata=enable_artifact_metadata,
            enable_step_logs=enable_step_logs,
            environment=environment,
            secrets=secrets,
            enable_pipeline_logs=enable_pipeline_logs,
            settings=settings,
            tags=tags,
            extra=extra,
            on_failure=on_failure,
            on_success=on_success,
            on_init=on_init,
            on_init_kwargs=on_init_kwargs,
            on_cleanup=on_cleanup,
            model=model,
            retry=retry,
            substitutions=substitutions,
            execution_mode=execution_mode,
            cache_policy=cache_policy,
        )

        p.__doc__ = func.__doc__
        return p

    return inner_decorator if _func is None else inner_decorator(_func)

pipeline_definition

Definition of a ZenML pipeline.

Classes
Pipeline(name: str, entrypoint: F, enable_cache: Optional[bool] = None, enable_artifact_metadata: Optional[bool] = None, enable_artifact_visualization: Optional[bool] = None, enable_step_logs: Optional[bool] = None, environment: Optional[Dict[str, Any]] = None, secrets: Optional[List[Union[UUID, str]]] = None, enable_pipeline_logs: Optional[bool] = None, settings: Optional[Mapping[str, SettingsOrDict]] = None, tags: Optional[List[Union[str, Tag]]] = None, extra: Optional[Dict[str, Any]] = None, on_failure: Optional[HookSpecification] = None, on_success: Optional[HookSpecification] = None, on_init: Optional[InitHookSpecification] = None, on_init_kwargs: Optional[Dict[str, Any]] = None, on_cleanup: Optional[HookSpecification] = None, model: Optional[Model] = None, retry: Optional[StepRetryConfig] = None, substitutions: Optional[Dict[str, str]] = None, execution_mode: Optional[ExecutionMode] = None, cache_policy: Optional[CachePolicyOrString] = None)

ZenML pipeline class.

Initializes a pipeline.

Parameters:

Name Type Description Default
name str

The name of the pipeline.

required
entrypoint F

The entrypoint function of the pipeline.

required
enable_cache Optional[bool]

If caching should be enabled for this pipeline.

None
enable_artifact_metadata Optional[bool]

If artifact metadata should be enabled for this pipeline.

None
enable_artifact_visualization Optional[bool]

If artifact visualization should be enabled for this pipeline.

None
enable_step_logs Optional[bool]

If step logs should be enabled for this pipeline.

None
environment Optional[Dict[str, Any]]

Environment variables to set when running this pipeline.

None
secrets Optional[List[Union[UUID, str]]]

Secrets to set as environment variables when running this pipeline.

None
enable_pipeline_logs Optional[bool]

If pipeline logs should be enabled for this pipeline.

None
settings Optional[Mapping[str, SettingsOrDict]]

Settings for this pipeline.

None
tags Optional[List[Union[str, Tag]]]

Tags to apply to runs of this pipeline.

None
extra Optional[Dict[str, Any]]

Extra configurations for this pipeline.

None
on_failure Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with a single argument of type BaseException, or a source path to such a function (e.g. module.my_function).

None
on_success Optional[HookSpecification]

Callback function in event of success of the step. Can be a function with no arguments, or a source path to such a function (e.g. module.my_function).

None
on_init Optional[InitHookSpecification]

Callback function to run on initialization of the pipeline. Can be a function with no arguments, or a source path to such a function (e.g. module.my_function) if the function returns a value, it will be stored as the pipeline state.

None
on_init_kwargs Optional[Dict[str, Any]]

Arguments for the init hook.

None
on_cleanup Optional[HookSpecification]

Callback function to run on cleanup of the pipeline. Can be a function with no arguments, or a source path to such a function with no arguments (e.g. module.my_function).

None
model Optional[Model]

configuration of the model in the Model Control Plane.

None
retry Optional[StepRetryConfig]

Retry configuration for the pipeline steps.

None
substitutions Optional[Dict[str, str]]

Extra placeholders to use in the name templates.

None
execution_mode Optional[ExecutionMode]

The execution mode of the pipeline.

None
cache_policy Optional[CachePolicyOrString]

Cache policy for this pipeline.

None
Source code in src/zenml/pipelines/pipeline_definition.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def __init__(
    self,
    name: str,
    entrypoint: F,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    environment: Optional[Dict[str, Any]] = None,
    secrets: Optional[List[Union[UUID, str]]] = None,
    enable_pipeline_logs: Optional[bool] = None,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    tags: Optional[List[Union[str, "Tag"]]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
    on_init: Optional["InitHookSpecification"] = None,
    on_init_kwargs: Optional[Dict[str, Any]] = None,
    on_cleanup: Optional["HookSpecification"] = None,
    model: Optional["Model"] = None,
    retry: Optional["StepRetryConfig"] = None,
    substitutions: Optional[Dict[str, str]] = None,
    execution_mode: Optional["ExecutionMode"] = None,
    cache_policy: Optional["CachePolicyOrString"] = None,
) -> None:
    """Initializes a pipeline.

    Args:
        name: The name of the pipeline.
        entrypoint: The entrypoint function of the pipeline.
        enable_cache: If caching should be enabled for this pipeline.
        enable_artifact_metadata: If artifact metadata should be enabled for
            this pipeline.
        enable_artifact_visualization: If artifact visualization should be
            enabled for this pipeline.
        enable_step_logs: If step logs should be enabled for this pipeline.
        environment: Environment variables to set when running this
            pipeline.
        secrets: Secrets to set as environment variables when running this
            pipeline.
        enable_pipeline_logs: If pipeline logs should be enabled for this pipeline.
        settings: Settings for this pipeline.
        tags: Tags to apply to runs of this pipeline.
        extra: Extra configurations for this pipeline.
        on_failure: Callback function in event of failure of the step. Can
            be a function with a single argument of type `BaseException`, or
            a source path to such a function (e.g. `module.my_function`).
        on_success: Callback function in event of success of the step. Can
            be a function with no arguments, or a source path to such a
            function (e.g. `module.my_function`).
        on_init: Callback function to run on initialization of the pipeline.
            Can be a function with no arguments, or a source path to such a
            function (e.g. `module.my_function`) if the function returns a
            value, it will be stored as the pipeline state.
        on_init_kwargs: Arguments for the init hook.
        on_cleanup: Callback function to run on cleanup of the pipeline. Can
            be a function with no arguments, or a source path to such a
            function with no arguments (e.g. `module.my_function`).
        model: configuration of the model in the Model Control Plane.
        retry: Retry configuration for the pipeline steps.
        substitutions: Extra placeholders to use in the name templates.
        execution_mode: The execution mode of the pipeline.
        cache_policy: Cache policy for this pipeline.
    """
    self._invocations: Dict[str, StepInvocation] = {}
    self._run_args: Dict[str, Any] = {}

    self._configuration = PipelineConfiguration(
        name=name,
    )
    self._from_config_file: Dict[str, Any] = {}
    with self.__suppress_configure_warnings__():
        self.configure(
            enable_cache=enable_cache,
            enable_artifact_metadata=enable_artifact_metadata,
            enable_artifact_visualization=enable_artifact_visualization,
            enable_step_logs=enable_step_logs,
            environment=environment,
            secrets=secrets,
            enable_pipeline_logs=enable_pipeline_logs,
            settings=settings,
            tags=tags,
            extra=extra,
            on_failure=on_failure,
            on_success=on_success,
            on_init=on_init,
            on_init_kwargs=on_init_kwargs,
            on_cleanup=on_cleanup,
            model=model,
            retry=retry,
            substitutions=substitutions,
            execution_mode=execution_mode,
            cache_policy=cache_policy,
        )
    self.entrypoint = entrypoint
    self._parameters: Dict[str, Any] = {}
    self._output_artifacts: List[StepArtifact] = []

    self.__suppress_warnings_flag__ = False
Attributes
configuration: PipelineConfiguration property

The configuration of the pipeline.

Returns:

Type Description
PipelineConfiguration

The configuration of the pipeline.

enable_cache: Optional[bool] property

If caching is enabled for the pipeline.

Returns:

Type Description
Optional[bool]

If caching is enabled for the pipeline.

invocations: Dict[str, StepInvocation] property

Returns the step invocations of this pipeline.

This dictionary will only be populated once the pipeline has been called.

Returns:

Type Description
Dict[str, StepInvocation]

The step invocations.

is_prepared: bool property

If the pipeline is prepared.

Prepared means that the pipeline entrypoint has been called and the pipeline is fully defined.

Returns:

Type Description
bool

If the pipeline is prepared.

missing_parameters: List[str] property

List of missing parameters for the pipeline entrypoint.

Returns:

Type Description
List[str]

List of missing parameters for the pipeline entrypoint.

model: PipelineResponse property

Gets the registered pipeline model for this instance.

Returns:

Type Description
PipelineResponse

The registered pipeline model.

Raises:

Type Description
RuntimeError

If the pipeline has not been registered yet.

name: str property

The name of the pipeline.

Returns:

Type Description
str

The name of the pipeline.

required_parameters: List[str] property

List of required parameters for the pipeline entrypoint.

Returns:

Type Description
List[str]

List of required parameters for the pipeline entrypoint.

source_code: str property

The source code of this pipeline.

Returns:

Type Description
str

The source code of this pipeline.

source_object: Any property

The source object of this pipeline.

Returns:

Type Description
Any

The source object of this pipeline.

Functions
add_step_invocation(step: BaseStep, input_artifacts: Dict[str, StepArtifact], external_artifacts: Dict[str, Union[ExternalArtifact, ArtifactVersionResponse]], model_artifacts_or_metadata: Dict[str, ModelVersionDataLazyLoader], client_lazy_loaders: Dict[str, ClientLazyLoader], parameters: Dict[str, Any], default_parameters: Dict[str, Any], upstream_steps: Set[str], custom_id: Optional[str] = None, allow_id_suffix: bool = True) -> str

Adds a step invocation to the pipeline.

Parameters:

Name Type Description Default
step BaseStep

The step for which to add an invocation.

required
input_artifacts Dict[str, StepArtifact]

The input artifacts for the invocation.

required
external_artifacts Dict[str, Union[ExternalArtifact, ArtifactVersionResponse]]

The external artifacts for the invocation.

required
model_artifacts_or_metadata Dict[str, ModelVersionDataLazyLoader]

The model artifacts or metadata for the invocation.

required
client_lazy_loaders Dict[str, ClientLazyLoader]

The client lazy loaders for the invocation.

required
parameters Dict[str, Any]

The parameters for the invocation.

required
default_parameters Dict[str, Any]

The default parameters for the invocation.

required
upstream_steps Set[str]

The upstream steps for the invocation.

required
custom_id Optional[str]

Custom ID to use for the invocation.

None
allow_id_suffix bool

Whether a suffix can be appended to the invocation ID.

True

Raises:

Type Description
RuntimeError

If the method is called on an inactive pipeline.

RuntimeError

If the invocation was called with an artifact from a different pipeline.

Returns:

Type Description
str

The step invocation ID.

Source code in src/zenml/pipelines/pipeline_definition.py
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
def add_step_invocation(
    self,
    step: "BaseStep",
    input_artifacts: Dict[str, StepArtifact],
    external_artifacts: Dict[
        str, Union["ExternalArtifact", "ArtifactVersionResponse"]
    ],
    model_artifacts_or_metadata: Dict[str, "ModelVersionDataLazyLoader"],
    client_lazy_loaders: Dict[str, "ClientLazyLoader"],
    parameters: Dict[str, Any],
    default_parameters: Dict[str, Any],
    upstream_steps: Set[str],
    custom_id: Optional[str] = None,
    allow_id_suffix: bool = True,
) -> str:
    """Adds a step invocation to the pipeline.

    Args:
        step: The step for which to add an invocation.
        input_artifacts: The input artifacts for the invocation.
        external_artifacts: The external artifacts for the invocation.
        model_artifacts_or_metadata: The model artifacts or metadata for
            the invocation.
        client_lazy_loaders: The client lazy loaders for the invocation.
        parameters: The parameters for the invocation.
        default_parameters: The default parameters for the invocation.
        upstream_steps: The upstream steps for the invocation.
        custom_id: Custom ID to use for the invocation.
        allow_id_suffix: Whether a suffix can be appended to the invocation
            ID.

    Raises:
        RuntimeError: If the method is called on an inactive pipeline.
        RuntimeError: If the invocation was called with an artifact from
            a different pipeline.

    Returns:
        The step invocation ID.
    """
    if Pipeline.ACTIVE_PIPELINE != self:
        raise RuntimeError(
            "A step invocation can only be added to an active pipeline."
        )

    for artifact in input_artifacts.values():
        if artifact.pipeline is not self:
            raise RuntimeError(
                "Got invalid input artifact for invocation of step "
                f"{step.name}: The input artifact was produced by a step "
                f"inside a different pipeline {artifact.pipeline.name}."
            )

    invocation_id = self._compute_invocation_id(
        step=step, custom_id=custom_id, allow_suffix=allow_id_suffix
    )
    invocation = StepInvocation(
        id=invocation_id,
        step=step,
        input_artifacts=input_artifacts,
        external_artifacts=external_artifacts,
        model_artifacts_or_metadata=model_artifacts_or_metadata,
        client_lazy_loaders=client_lazy_loaders,
        parameters=parameters,
        default_parameters=default_parameters,
        upstream_steps=upstream_steps,
        pipeline=self,
    )
    self._invocations[invocation_id] = invocation
    return invocation_id
build(settings: Optional[Mapping[str, SettingsOrDict]] = None, step_configurations: Optional[Mapping[str, StepConfigurationUpdateOrDict]] = None, config_path: Optional[str] = None) -> Optional[PipelineBuildResponse]

Builds Docker images for the pipeline.

Parameters:

Name Type Description Default
settings Optional[Mapping[str, SettingsOrDict]]

Settings for the pipeline.

None
step_configurations Optional[Mapping[str, StepConfigurationUpdateOrDict]]

Configurations for steps of the pipeline.

None
config_path Optional[str]

Path to a yaml configuration file. This file will be parsed as a zenml.config.pipeline_configurations.PipelineRunConfiguration object. Options provided in this file will be overwritten by options provided in code using the other arguments of this method.

None

Returns:

Type Description
Optional[PipelineBuildResponse]

The build output.

Source code in src/zenml/pipelines/pipeline_definition.py
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
def build(
    self,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    step_configurations: Optional[
        Mapping[str, "StepConfigurationUpdateOrDict"]
    ] = None,
    config_path: Optional[str] = None,
) -> Optional["PipelineBuildResponse"]:
    """Builds Docker images for the pipeline.

    Args:
        settings: Settings for the pipeline.
        step_configurations: Configurations for steps of the pipeline.
        config_path: Path to a yaml configuration file. This file will
            be parsed as a
            `zenml.config.pipeline_configurations.PipelineRunConfiguration`
            object. Options provided in this file will be overwritten by
            options provided in code using the other arguments of this
            method.

    Returns:
        The build output.
    """
    with track_handler(event=AnalyticsEvent.BUILD_PIPELINE):
        self._prepare_if_possible()

        compile_args = self._run_args.copy()
        compile_args.pop("prevent_build_reuse", None)
        if config_path:
            compile_args["config_path"] = config_path
        if step_configurations:
            compile_args["step_configurations"] = step_configurations
        if settings:
            compile_args["settings"] = settings

        snapshot, _, _ = self._compile(**compile_args)
        pipeline_id = self._register().id

        local_repo = code_repository_utils.find_active_code_repository()
        code_repository = build_utils.verify_local_repository_context(
            snapshot=snapshot, local_repo_context=local_repo
        )

        return build_utils.create_pipeline_build(
            snapshot=snapshot,
            pipeline_id=pipeline_id,
            code_repository=code_repository,
        )
configure(enable_cache: Optional[bool] = None, enable_artifact_metadata: Optional[bool] = None, enable_artifact_visualization: Optional[bool] = None, enable_step_logs: Optional[bool] = None, environment: Optional[Dict[str, Any]] = None, secrets: Optional[Sequence[Union[UUID, str]]] = None, enable_pipeline_logs: Optional[bool] = None, settings: Optional[Mapping[str, SettingsOrDict]] = None, tags: Optional[List[Union[str, Tag]]] = None, extra: Optional[Dict[str, Any]] = None, on_failure: Optional[HookSpecification] = None, on_success: Optional[HookSpecification] = None, on_init: Optional[InitHookSpecification] = None, on_init_kwargs: Optional[Dict[str, Any]] = None, on_cleanup: Optional[HookSpecification] = None, model: Optional[Model] = None, retry: Optional[StepRetryConfig] = None, parameters: Optional[Dict[str, Any]] = None, substitutions: Optional[Dict[str, str]] = None, execution_mode: Optional[ExecutionMode] = None, cache_policy: Optional[CachePolicyOrString] = None, merge: bool = True) -> Self

Configures the pipeline.

Configuration merging example: * merge==True: pipeline.configure(extra={"key1": 1}) pipeline.configure(extra={"key2": 2}, merge=True) pipeline.configuration.extra # {"key1": 1, "key2": 2} * merge==False: pipeline.configure(extra={"key1": 1}) pipeline.configure(extra={"key2": 2}, merge=False) pipeline.configuration.extra # {"key2": 2}

Parameters:

Name Type Description Default
enable_cache Optional[bool]

If caching should be enabled for this pipeline.

None
enable_artifact_metadata Optional[bool]

If artifact metadata should be enabled for this pipeline.

None
enable_artifact_visualization Optional[bool]

If artifact visualization should be enabled for this pipeline.

None
enable_step_logs Optional[bool]

If step logs should be enabled for this pipeline.

None
environment Optional[Dict[str, Any]]

Environment variables to set when running this pipeline.

None
secrets Optional[Sequence[Union[UUID, str]]]

Secrets to set as environment variables when running this pipeline.

None
settings Optional[Mapping[str, SettingsOrDict]]

Settings for this pipeline.

None
enable_pipeline_logs Optional[bool]

If pipeline logs should be enabled for this pipeline.

None
settings Optional[Mapping[str, SettingsOrDict]]

settings for this pipeline.

None
tags Optional[List[Union[str, Tag]]]

Tags to apply to runs of this pipeline.

None
extra Optional[Dict[str, Any]]

Extra configurations for this pipeline.

None
on_failure Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with a single argument of type BaseException, or a source path to such a function (e.g. module.my_function).

None
on_success Optional[HookSpecification]

Callback function in event of success of the step. Can be a function with no arguments, or a source path to such a function (e.g. module.my_function).

None
on_init Optional[InitHookSpecification]

Callback function to run on initialization of the pipeline. Can be a function with no arguments, or a source path to such a function (e.g. module.my_function) if the function returns a value, it will be stored as the pipeline state.

None
on_init_kwargs Optional[Dict[str, Any]]

Arguments for the init hook.

None
on_cleanup Optional[HookSpecification]

Callback function to run on cleanup of the pipeline. Can be a function with no arguments, or a source path to such a function with no arguments (e.g. module.my_function).

None
model Optional[Model]

configuration of the model version in the Model Control Plane.

None
retry Optional[StepRetryConfig]

Retry configuration for the pipeline steps.

None
parameters Optional[Dict[str, Any]]

input parameters for the pipeline.

None
substitutions Optional[Dict[str, str]]

Extra placeholders to use in the name templates.

None
execution_mode Optional[ExecutionMode]

The execution mode of the pipeline.

None
cache_policy Optional[CachePolicyOrString]

Cache policy for this pipeline.

None
merge bool

If True, will merge the given dictionary configurations like extra and settings with existing configurations. If False the given configurations will overwrite all existing ones. See the general description of this method for an example.

True

Returns:

Type Description
Self

The pipeline instance that this method was called on.

Raises:

Type Description
ValueError

If on_init_kwargs is provided but on_init is not and the init hook source is found in the current pipeline configuration.

Source code in src/zenml/pipelines/pipeline_definition.py
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
def configure(
    self,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    environment: Optional[Dict[str, Any]] = None,
    secrets: Optional[Sequence[Union[UUID, str]]] = None,
    enable_pipeline_logs: Optional[bool] = None,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    tags: Optional[List[Union[str, "Tag"]]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
    on_init: Optional["InitHookSpecification"] = None,
    on_init_kwargs: Optional[Dict[str, Any]] = None,
    on_cleanup: Optional["HookSpecification"] = None,
    model: Optional["Model"] = None,
    retry: Optional["StepRetryConfig"] = None,
    parameters: Optional[Dict[str, Any]] = None,
    substitutions: Optional[Dict[str, str]] = None,
    execution_mode: Optional["ExecutionMode"] = None,
    cache_policy: Optional["CachePolicyOrString"] = None,
    merge: bool = True,
) -> Self:
    """Configures the pipeline.

    Configuration merging example:
    * `merge==True`:
        pipeline.configure(extra={"key1": 1})
        pipeline.configure(extra={"key2": 2}, merge=True)
        pipeline.configuration.extra # {"key1": 1, "key2": 2}
    * `merge==False`:
        pipeline.configure(extra={"key1": 1})
        pipeline.configure(extra={"key2": 2}, merge=False)
        pipeline.configuration.extra # {"key2": 2}

    Args:
        enable_cache: If caching should be enabled for this pipeline.
        enable_artifact_metadata: If artifact metadata should be enabled for
            this pipeline.
        enable_artifact_visualization: If artifact visualization should be
            enabled for this pipeline.
        enable_step_logs: If step logs should be enabled for this pipeline.
        environment: Environment variables to set when running this
            pipeline.
        secrets: Secrets to set as environment variables when running this
            pipeline.
        settings: Settings for this pipeline.
        enable_pipeline_logs: If pipeline logs should be enabled for this pipeline.
        settings: settings for this pipeline.
        tags: Tags to apply to runs of this pipeline.
        extra: Extra configurations for this pipeline.
        on_failure: Callback function in event of failure of the step. Can
            be a function with a single argument of type `BaseException`, or
            a source path to such a function (e.g. `module.my_function`).
        on_success: Callback function in event of success of the step. Can
            be a function with no arguments, or a source path to such a
            function (e.g. `module.my_function`).
        on_init: Callback function to run on initialization of the pipeline.
            Can be a function with no arguments, or a source path to such a
            function (e.g. `module.my_function`) if the function returns a
            value, it will be stored as the pipeline state.
        on_init_kwargs: Arguments for the init hook.
        on_cleanup: Callback function to run on cleanup of the pipeline. Can
            be a function with no arguments, or a source path to such a
            function with no arguments (e.g. `module.my_function`).
        model: configuration of the model version in the Model Control Plane.
        retry: Retry configuration for the pipeline steps.
        parameters: input parameters for the pipeline.
        substitutions: Extra placeholders to use in the name templates.
        execution_mode: The execution mode of the pipeline.
        cache_policy: Cache policy for this pipeline.
        merge: If `True`, will merge the given dictionary configurations
            like `extra` and `settings` with existing
            configurations. If `False` the given configurations will
            overwrite all existing ones. See the general description of this
            method for an example.

    Returns:
        The pipeline instance that this method was called on.

    Raises:
        ValueError: If on_init_kwargs is provided but on_init is not and
            the init hook source is found in the current pipeline
            configuration.
    """
    failure_hook_source = None
    if on_failure:
        # string of on_failure hook function to be used for this pipeline
        failure_hook_source, _ = resolve_and_validate_hook(
            on_failure, allow_exception_arg=True
        )

    success_hook_source = None
    if on_success:
        # string of on_success hook function to be used for this pipeline
        success_hook_source, _ = resolve_and_validate_hook(on_success)

    init_hook_kwargs = None
    init_hook_source = None
    if on_init or on_init_kwargs:
        if not on_init and self.configuration.init_hook_source:
            # load the init hook source from the existing configuration if
            # not provided; this is needed for partial updates
            on_init = source_utils.load(
                self.configuration.init_hook_source
            )
        if not on_init:
            raise ValueError(
                "on_init is not provided and no init hook source is found "
                "in the existing configuration"
            )

        # string of on_init hook function and JSON-able arguments to be used
        # for this pipeline
        init_hook_source, init_hook_kwargs = resolve_and_validate_hook(
            on_init, on_init_kwargs
        )

    cleanup_hook_source = None
    if on_cleanup:
        # string of on_cleanup hook function to be used for this pipeline
        cleanup_hook_source, _ = resolve_and_validate_hook(on_cleanup)

    if merge and tags and self._configuration.tags:
        # Merge tags explicitly here as the recursive update later only
        # merges dicts
        tags = self._configuration.tags + tags

    if merge and secrets and self._configuration.secrets:
        secrets = self._configuration.secrets + list(secrets)

    values = dict_utils.remove_none_values(
        {
            "enable_cache": enable_cache,
            "enable_artifact_metadata": enable_artifact_metadata,
            "enable_artifact_visualization": enable_artifact_visualization,
            "enable_step_logs": enable_step_logs,
            "environment": environment,
            "secrets": secrets,
            "enable_pipeline_logs": enable_pipeline_logs,
            "settings": settings,
            "tags": tags,
            "extra": extra,
            "failure_hook_source": failure_hook_source,
            "success_hook_source": success_hook_source,
            "init_hook_source": init_hook_source,
            "init_hook_kwargs": init_hook_kwargs,
            "cleanup_hook_source": cleanup_hook_source,
            "model": model,
            "retry": retry,
            "parameters": parameters,
            "substitutions": substitutions,
            "execution_mode": execution_mode,
            "cache_policy": cache_policy,
        }
    )
    if not self.__suppress_warnings_flag__:
        to_be_reapplied = []
        for param_, value_ in values.items():
            if (
                param_ in PipelineRunConfiguration.model_fields
                and param_ in self._from_config_file
                and value_ != self._from_config_file[param_]
            ):
                to_be_reapplied.append(
                    (param_, self._from_config_file[param_], value_)
                )
        if to_be_reapplied:
            msg = ""
            reapply_during_run_warning = (
                "The value of parameter '{name}' has changed from "
                "'{file_value}' to '{new_value}' set in your configuration "
                "file.\n"
            )
            for name, file_value, new_value in to_be_reapplied:
                msg += reapply_during_run_warning.format(
                    name=name, file_value=file_value, new_value=new_value
                )
            msg += (
                "Configuration file value will be used during pipeline "
                "run, so you change will not be efficient. Consider "
                "updating your configuration file instead."
            )
            logger.warning(msg)

    config = PipelineConfigurationUpdate(**values)
    self._apply_configuration(config, merge=merge)
    return self
copy() -> Pipeline

Copies the pipeline.

Returns:

Type Description
Pipeline

The pipeline copy.

Source code in src/zenml/pipelines/pipeline_definition.py
1560
1561
1562
1563
1564
1565
1566
def copy(self) -> "Pipeline":
    """Copies the pipeline.

    Returns:
        The pipeline copy.
    """
    return copy.deepcopy(self)
create_run_template(name: str, **kwargs: Any) -> RunTemplateResponse

DEPRECATED: Create a run template for the pipeline.

Parameters:

Name Type Description Default
name str

The name of the run template.

required
**kwargs Any

Keyword arguments for the client method to create a run template.

{}

Returns:

Type Description
RunTemplateResponse

The created run template.

Source code in src/zenml/pipelines/pipeline_definition.py
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
def create_run_template(
    self, name: str, **kwargs: Any
) -> RunTemplateResponse:
    """DEPRECATED: Create a run template for the pipeline.

    Args:
        name: The name of the run template.
        **kwargs: Keyword arguments for the client method to create a run
            template.

    Returns:
        The created run template.
    """
    logger.warning(
        "The `pipeline.create_run_template(...)` method is deprecated and "
        "will be removed in a future version. Please use "
        "`pipeline.create_snapshot(..)` instead."
    )
    self._prepare_if_possible()
    snapshot = self._create_snapshot(
        **self._run_args, skip_schedule_registration=True
    )

    return Client().create_run_template(
        name=name, snapshot_id=snapshot.id, **kwargs
    )
create_snapshot(name: str, description: Optional[str] = None, replace: Optional[bool] = None, tags: Optional[List[str]] = None) -> PipelineSnapshotResponse

Create a snapshot of the pipeline.

Parameters:

Name Type Description Default
name str

The name of the snapshot.

required
description Optional[str]

The description of the snapshot.

None
replace Optional[bool]

Whether to replace the existing snapshot with the same name.

None
tags Optional[List[str]]

The tags to add to the snapshot.

None

Returns:

Type Description
PipelineSnapshotResponse

The created snapshot.

Source code in src/zenml/pipelines/pipeline_definition.py
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
def create_snapshot(
    self,
    name: str,
    description: Optional[str] = None,
    replace: Optional[bool] = None,
    tags: Optional[List[str]] = None,
) -> PipelineSnapshotResponse:
    """Create a snapshot of the pipeline.

    Args:
        name: The name of the snapshot.
        description: The description of the snapshot.
        replace: Whether to replace the existing snapshot with the same
            name.
        tags: The tags to add to the snapshot.

    Returns:
        The created snapshot.
    """
    self._prepare_if_possible()
    return self._create_snapshot(
        skip_schedule_registration=True,
        name=name,
        description=description,
        replace=replace,
        tags=tags,
        **self._run_args,
    )
deploy(deployment_name: str, timeout: Optional[int] = None, *args: Any, **kwargs: Any) -> DeploymentResponse

Deploy the pipeline for online inference.

Parameters:

Name Type Description Default
deployment_name str

The name to use for the deployment.

required
timeout Optional[int]

The maximum time in seconds to wait for the pipeline to be deployed.

None
*args Any

Pipeline entrypoint input arguments.

()
**kwargs Any

Pipeline entrypoint input keyword arguments.

{}

Returns:

Type Description
DeploymentResponse

The deployment response.

Source code in src/zenml/pipelines/pipeline_definition.py
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
def deploy(
    self,
    deployment_name: str,
    timeout: Optional[int] = None,
    *args: Any,
    **kwargs: Any,
) -> DeploymentResponse:
    """Deploy the pipeline for online inference.

    Args:
        deployment_name: The name to use for the deployment.
        timeout: The maximum time in seconds to wait for the pipeline to be
            deployed.
        *args: Pipeline entrypoint input arguments.
        **kwargs: Pipeline entrypoint input keyword arguments.

    Returns:
        The deployment response.
    """
    self.prepare(*args, **kwargs)
    snapshot = self._create_snapshot(**self._run_args)

    stack = Client().active_stack

    stack.prepare_pipeline_submission(snapshot=snapshot)
    return stack.deploy_pipeline(
        snapshot=snapshot,
        deployment_name=deployment_name,
        timeout=timeout,
    )
log_pipeline_snapshot_metadata(snapshot: PipelineSnapshotResponse) -> None staticmethod

Displays logs based on the snapshot model upon running a pipeline.

Parameters:

Name Type Description Default
snapshot PipelineSnapshotResponse

The model for the pipeline snapshot

required
Source code in src/zenml/pipelines/pipeline_definition.py
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
@staticmethod
def log_pipeline_snapshot_metadata(
    snapshot: PipelineSnapshotResponse,
) -> None:
    """Displays logs based on the snapshot model upon running a pipeline.

    Args:
        snapshot: The model for the pipeline snapshot
    """
    try:
        # Log about the caching status
        if snapshot.pipeline_configuration.enable_cache is False:
            logger.info(
                f"Caching is disabled by default for "
                f"`{snapshot.pipeline_configuration.name}`."
            )

        # Log about the used builds
        if snapshot.build:
            logger.info("Using a build:")
            logger.info(
                " Image(s): "
                f"{', '.join([i.image for i in snapshot.build.images.values()])}"
            )

            # Log about version mismatches between local and build
            from zenml import __version__

            if snapshot.build.zenml_version != __version__:
                logger.info(
                    f"ZenML version (different than the local version): "
                    f"{snapshot.build.zenml_version}"
                )

            import platform

            if snapshot.build.python_version != platform.python_version():
                logger.info(
                    f"Python version (different than the local version): "
                    f"{snapshot.build.python_version}"
                )

        # Log about the user, stack and components
        if snapshot.user is not None:
            logger.info(f"Using user: `{snapshot.user.name}`")

        if snapshot.stack is not None:
            logger.info(f"Using stack: `{snapshot.stack.name}`")

            for (
                component_type,
                component_models,
            ) in snapshot.stack.components.items():
                logger.info(
                    f"  {component_type.value}: `{component_models[0].name}`"
                )
    except Exception as e:
        logger.debug(f"Logging pipeline snapshot metadata failed: {e}")
prepare(*args: Any, **kwargs: Any) -> None

Prepares the pipeline.

Parameters:

Name Type Description Default
*args Any

Pipeline entrypoint input arguments.

()
**kwargs Any

Pipeline entrypoint input keyword arguments.

{}

Raises:

Type Description
RuntimeError

If the pipeline has parameters configured differently in configuration file and code.

Source code in src/zenml/pipelines/pipeline_definition.py
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
    def prepare(self, *args: Any, **kwargs: Any) -> None:
        """Prepares the pipeline.

        Args:
            *args: Pipeline entrypoint input arguments.
            **kwargs: Pipeline entrypoint input keyword arguments.

        Raises:
            RuntimeError: If the pipeline has parameters configured differently in
                configuration file and code.
        """
        self._parameters = {}
        self._invocations = {}
        self._output_artifacts = []

        conflicting_parameters = {}
        parameters_ = (self.configuration.parameters or {}).copy()
        if from_file_ := self._from_config_file.get("parameters", None):
            parameters_ = dict_utils.recursive_update(parameters_, from_file_)
        if parameters_:
            for k, v_runtime in kwargs.items():
                if k in parameters_:
                    v_config = parameters_[k]
                    if v_config != v_runtime:
                        conflicting_parameters[k] = (v_config, v_runtime)
            if conflicting_parameters:
                is_plural = "s" if len(conflicting_parameters) > 1 else ""
                msg = f"Configured parameter{is_plural} for the pipeline `{self.name}` conflict{'' if not is_plural else 's'} with parameter{is_plural} passed in runtime:\n"
                for key, values in conflicting_parameters.items():
                    msg += f"`{key}`: config=`{values[0]}` | runtime=`{values[1]}`\n"
                msg += """This happens, if you define values for pipeline parameters in configuration file and pass same parameters from the code. Example:
```
# config.yaml
    parameters:
        param_name: value1


# pipeline.py
@pipeline
def pipeline_(param_name: str):
    step_name()

if __name__=="__main__":
    pipeline_.with_options(config_path="config.yaml")(param_name="value2")
```
To avoid this consider setting pipeline parameters only in one place (config or code).
"""
                raise RuntimeError(msg)
            for k, v_config in parameters_.items():
                if k not in kwargs:
                    kwargs[k] = v_config

        with self:
            # Enter the context manager, so we become the active pipeline. This
            # means that all steps that get called while the entrypoint function
            # is executed will be added as invocation to this pipeline instance.
            self._call_entrypoint(*args, **kwargs)
register() -> PipelineResponse

Register the pipeline in the server.

Returns:

Type Description
PipelineResponse

The registered pipeline model.

Source code in src/zenml/pipelines/pipeline_definition.py
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
def register(self) -> "PipelineResponse":
    """Register the pipeline in the server.

    Returns:
        The registered pipeline model.
    """
    # Activating the built-in integrations to load all materializers
    from zenml.integrations.registry import integration_registry

    self._prepare_if_possible()
    integration_registry.activate_integrations()

    if self.configuration.model_dump(
        exclude_defaults=True, exclude={"name"}
    ):
        logger.warning(
            f"The pipeline `{self.name}` that you're registering has "
            "custom configurations applied to it. These will not be "
            "registered with the pipeline and won't be set when you build "
            "images or run the pipeline from the CLI. To provide these "
            "configurations, use the `--config` option of the `zenml "
            "pipeline build/run` commands."
        )

    return self._register()
resolve() -> Source

Resolves the pipeline.

Returns:

Type Description
Source

The pipeline source.

Source code in src/zenml/pipelines/pipeline_definition.py
274
275
276
277
278
279
280
def resolve(self) -> "Source":
    """Resolves the pipeline.

    Returns:
        The pipeline source.
    """
    return source_utils.resolve(self.entrypoint, skip_validation=True)
with_options(run_name: Optional[str] = None, schedule: Optional[Schedule] = None, build: Union[str, UUID, PipelineBuildBase, None] = None, step_configurations: Optional[Mapping[str, StepConfigurationUpdateOrDict]] = None, steps: Optional[Mapping[str, StepConfigurationUpdateOrDict]] = None, config_path: Optional[str] = None, unlisted: bool = False, prevent_build_reuse: bool = False, **kwargs: Any) -> Pipeline

Copies the pipeline and applies the given configurations.

Parameters:

Name Type Description Default
run_name Optional[str]

Name of the pipeline run.

None
schedule Optional[Schedule]

Optional schedule to use for the run.

None
build Union[str, UUID, PipelineBuildBase, None]

Optional build to use for the run.

None
step_configurations Optional[Mapping[str, StepConfigurationUpdateOrDict]]

Configurations for steps of the pipeline.

None
steps Optional[Mapping[str, StepConfigurationUpdateOrDict]]

Configurations for steps of the pipeline. This is equivalent to step_configurations, and will be ignored if step_configurations is set as well.

None
config_path Optional[str]

Path to a yaml configuration file. This file will be parsed as a zenml.config.pipeline_configurations.PipelineRunConfiguration object. Options provided in this file will be overwritten by options provided in code using the other arguments of this method.

None
unlisted bool

DEPRECATED. This option is no longer supported.

False
prevent_build_reuse bool

DEPRECATED: Use DockerSettings.prevent_build_reuse instead.

False
**kwargs Any

Pipeline configuration options. These will be passed to the pipeline.configure(...) method.

{}

Returns:

Type Description
Pipeline

The copied pipeline instance.

Source code in src/zenml/pipelines/pipeline_definition.py
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
def with_options(
    self,
    run_name: Optional[str] = None,
    schedule: Optional[Schedule] = None,
    build: Union[str, "UUID", "PipelineBuildBase", None] = None,
    step_configurations: Optional[
        Mapping[str, "StepConfigurationUpdateOrDict"]
    ] = None,
    steps: Optional[Mapping[str, "StepConfigurationUpdateOrDict"]] = None,
    config_path: Optional[str] = None,
    unlisted: bool = False,
    prevent_build_reuse: bool = False,
    **kwargs: Any,
) -> "Pipeline":
    """Copies the pipeline and applies the given configurations.

    Args:
        run_name: Name of the pipeline run.
        schedule: Optional schedule to use for the run.
        build: Optional build to use for the run.
        step_configurations: Configurations for steps of the pipeline.
        steps: Configurations for steps of the pipeline. This is equivalent
            to `step_configurations`, and will be ignored if
            `step_configurations` is set as well.
        config_path: Path to a yaml configuration file. This file will
            be parsed as a
            `zenml.config.pipeline_configurations.PipelineRunConfiguration`
            object. Options provided in this file will be overwritten by
            options provided in code using the other arguments of this
            method.
        unlisted: DEPRECATED. This option is no longer supported.
        prevent_build_reuse: DEPRECATED: Use
            `DockerSettings.prevent_build_reuse` instead.
        **kwargs: Pipeline configuration options. These will be passed
            to the `pipeline.configure(...)` method.

    Returns:
        The copied pipeline instance.
    """
    if steps and step_configurations:
        logger.warning(
            "Step configurations were passed using both the "
            "`step_configurations` and `steps` keywords, ignoring the "
            "values passed using the `steps` keyword."
        )

    if unlisted:
        logger.warning(
            "The `unlisted` option is deprecated and will be removed in a "
            "future version. Every run will always be associated with a "
            "pipeline."
        )

    pipeline_copy = self.copy()

    pipeline_copy._reconfigure_from_file_with_overrides(
        config_path=config_path, **kwargs
    )

    run_args = dict_utils.remove_none_values(
        {
            "run_name": run_name,
            "schedule": schedule,
            "build": build,
            "step_configurations": step_configurations or steps,
            "config_path": config_path,
            "prevent_build_reuse": prevent_build_reuse,
        }
    )
    pipeline_copy._run_args.update(run_args)
    return pipeline_copy
write_run_configuration_template(path: str, stack: Optional[Stack] = None) -> None

Writes a run configuration yaml template.

Parameters:

Name Type Description Default
path str

The path where the template will be written.

required
stack Optional[Stack]

The stack for which the template should be generated. If not given, the active stack will be used.

None
Source code in src/zenml/pipelines/pipeline_definition.py
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
def write_run_configuration_template(
    self, path: str, stack: Optional["Stack"] = None
) -> None:
    """Writes a run configuration yaml template.

    Args:
        path: The path where the template will be written.
        stack: The stack for which the template should be generated. If
            not given, the active stack will be used.
    """
    from zenml.config.base_settings import ConfigurationLevel
    from zenml.config.step_configurations import (
        PartialArtifactConfiguration,
    )

    self._prepare_if_possible()

    stack = stack or Client().active_stack

    setting_classes = stack.setting_classes
    setting_classes.update(settings_utils.get_general_settings())

    pipeline_settings = {}
    step_settings = {}
    for key, setting_class in setting_classes.items():
        fields = pydantic_utils.TemplateGenerator(setting_class).run()
        if ConfigurationLevel.PIPELINE in setting_class.LEVEL:
            pipeline_settings[key] = fields
        if ConfigurationLevel.STEP in setting_class.LEVEL:
            step_settings[key] = fields

    steps = {}
    for step_name, invocation in self.invocations.items():
        step = invocation.step
        outputs = {
            name: PartialArtifactConfiguration()
            for name in step.entrypoint_definition.outputs
        }
        step_template = StepConfigurationUpdate(
            parameters={},
            settings=step_settings,
            outputs=outputs,
        )
        steps[step_name] = step_template

    run_config = PipelineRunConfiguration(
        settings=pipeline_settings, steps=steps
    )
    template = pydantic_utils.TemplateGenerator(run_config).run()
    yaml_string = yaml.dump(template)
    yaml_string = yaml_utils.comment_out_yaml(yaml_string)

    with open(path, "w") as f:
        f.write(yaml_string)
Functions
Modules

run_utils

Utility functions for running pipelines.

Classes
Functions
create_placeholder_run(snapshot: PipelineSnapshotResponse, orchestrator_run_id: Optional[str] = None, logs: Optional[LogsRequest] = None, trigger_info: Optional[PipelineRunTriggerInfo] = None) -> PipelineRunResponse

Create a placeholder run for the snapshot.

Parameters:

Name Type Description Default
snapshot PipelineSnapshotResponse

The snapshot for which to create the placeholder run.

required
orchestrator_run_id Optional[str]

The orchestrator run ID for the run.

None
logs Optional[LogsRequest]

The logs for the run.

None
trigger_info Optional[PipelineRunTriggerInfo]

The trigger information for the run.

None

Returns:

Type Description
PipelineRunResponse

The placeholder run.

Source code in src/zenml/pipelines/run_utils.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def create_placeholder_run(
    snapshot: "PipelineSnapshotResponse",
    orchestrator_run_id: Optional[str] = None,
    logs: Optional["LogsRequest"] = None,
    trigger_info: Optional[PipelineRunTriggerInfo] = None,
) -> "PipelineRunResponse":
    """Create a placeholder run for the snapshot.

    Args:
        snapshot: The snapshot for which to create the placeholder run.
        orchestrator_run_id: The orchestrator run ID for the run.
        logs: The logs for the run.
        trigger_info: The trigger information for the run.

    Returns:
        The placeholder run.
    """
    start_time = utc_now()
    run_request = PipelineRunRequest(
        name=string_utils.format_name_template(
            name_template=snapshot.run_name_template,
            substitutions=snapshot.pipeline_configuration.finalize_substitutions(
                start_time=start_time,
            ),
        ),
        # We set the start time on the placeholder run already to
        # make it consistent with the {time} placeholder in the
        # run name. This means the placeholder run will usually
        # have longer durations than scheduled runs, as for them
        # the start_time is only set once the first step starts
        # running.
        start_time=start_time,
        orchestrator_run_id=orchestrator_run_id,
        project=snapshot.project_id,
        snapshot=snapshot.id,
        pipeline=snapshot.pipeline.id if snapshot.pipeline else None,
        status=ExecutionStatus.INITIALIZING,
        tags=snapshot.pipeline_configuration.tags,
        logs=logs,
        trigger_info=trigger_info,
    )
    run, _ = Client().zen_store.get_or_create_run(run_request)
    return run
get_all_sources_from_value(value: Any) -> List[Source]

Get all source objects from a value.

Parameters:

Name Type Description Default
value Any

The value from which to get all the source objects.

required

Returns:

Type Description
List[Source]

List of source objects for the given value.

Source code in src/zenml/pipelines/run_utils.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
def get_all_sources_from_value(value: Any) -> List[Source]:
    """Get all source objects from a value.

    Args:
        value: The value from which to get all the source objects.

    Returns:
        List of source objects for the given value.
    """
    sources = []
    if isinstance(value, Source):
        sources.append(value)
    elif isinstance(value, BaseModel):
        for v in value.__dict__.values():
            sources.extend(get_all_sources_from_value(v))
    elif isinstance(value, Dict):
        for v in value.values():
            sources.extend(get_all_sources_from_value(v))
    elif isinstance(value, (List, Set, tuple)):
        for v in value:
            sources.extend(get_all_sources_from_value(v))

    return sources
get_default_run_name(pipeline_name: str) -> str

Gets the default name for a pipeline run.

Parameters:

Name Type Description Default
pipeline_name str

Name of the pipeline which will be run.

required

Returns:

Type Description
str

Run name.

Source code in src/zenml/pipelines/run_utils.py
41
42
43
44
45
46
47
48
49
50
def get_default_run_name(pipeline_name: str) -> str:
    """Gets the default name for a pipeline run.

    Args:
        pipeline_name: Name of the pipeline which will be run.

    Returns:
        Run name.
    """
    return f"{pipeline_name}-{{date}}-{{time}}"
submit_pipeline(snapshot: PipelineSnapshotResponse, stack: Stack, placeholder_run: Optional[PipelineRunResponse] = None) -> None

Submit a snapshot for execution.

Parameters:

Name Type Description Default
snapshot PipelineSnapshotResponse

The snapshot to submit.

required
stack Stack

The stack on which to submit the snapshot.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the snapshot.

None
noqa: DAR401

Raises: BaseException: Any exception that happened while submitting or running (in case it happens synchronously) the pipeline.

Source code in src/zenml/pipelines/run_utils.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def submit_pipeline(
    snapshot: "PipelineSnapshotResponse",
    stack: "Stack",
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> None:
    """Submit a snapshot for execution.

    Args:
        snapshot: The snapshot to submit.
        stack: The stack on which to submit the snapshot.
        placeholder_run: An optional placeholder run for the snapshot.

    # noqa: DAR401
    Raises:
        BaseException: Any exception that happened while submitting or running
            (in case it happens synchronously) the pipeline.
    """
    # Prevent execution of nested pipelines which might lead to
    # unexpected behavior
    previous_value = constants.SHOULD_PREVENT_PIPELINE_EXECUTION
    constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True
    try:
        stack.prepare_pipeline_submission(snapshot=snapshot)
        stack.submit_pipeline(
            snapshot=snapshot,
            placeholder_run=placeholder_run,
        )
    except RunMonitoringError as e:
        # Don't mark the run as failed if the error happened during monitoring
        # of the run.
        raise e.original_exception from None
    except BaseException as e:
        if (
            placeholder_run
            and not Client()
            .get_pipeline_run(placeholder_run.id, hydrate=False)
            .status.is_finished
        ):
            # We failed during/before the submission of the run, so we mark the
            # run as failed if it is still in an initializing/running state.
            publish_failed_pipeline_run(placeholder_run.id)

        raise e
    finally:
        constants.SHOULD_PREVENT_PIPELINE_EXECUTION = previous_value
upload_notebook_cell_code_if_necessary(snapshot: PipelineSnapshotBase, stack: Stack) -> None

Upload notebook cell code if necessary.

This function checks if any of the steps of the pipeline that will be executed in a different process are defined in a notebook. If that is the case, it will extract that notebook cell code into python files and upload an archive of all the necessary files to the artifact store.

Parameters:

Name Type Description Default
snapshot PipelineSnapshotBase

The snapshot.

required
stack Stack

The stack on which the snapshot will happen.

required

Raises:

Type Description
RuntimeError

If the code for one of the steps that will run out of process cannot be extracted into a python file.

Source code in src/zenml/pipelines/run_utils.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def upload_notebook_cell_code_if_necessary(
    snapshot: "PipelineSnapshotBase", stack: "Stack"
) -> None:
    """Upload notebook cell code if necessary.

    This function checks if any of the steps of the pipeline that will be
    executed in a different process are defined in a notebook. If that is the
    case, it will extract that notebook cell code into python files and upload
    an archive of all the necessary files to the artifact store.

    Args:
        snapshot: The snapshot.
        stack: The stack on which the snapshot will happen.

    Raises:
        RuntimeError: If the code for one of the steps that will run out of
            process cannot be extracted into a python file.
    """
    should_upload = False
    resolved_notebook_sources = source_utils.get_resolved_notebook_sources()

    for step in snapshot.step_configurations.values():
        source = step.spec.source

        if source.type == SourceType.NOTEBOOK:
            if (
                stack.orchestrator.flavor != "local"
                or step.config.step_operator
            ):
                should_upload = True
                cell_code = resolved_notebook_sources.get(
                    source.import_path, None
                )

                # Code does not run in-process, which means we need to
                # extract the step code into a python file
                if not cell_code:
                    raise RuntimeError(
                        f"Unable to run step {step.config.name}. This step is "
                        "defined in a notebook and you're trying to run it "
                        "in a remote environment, but ZenML was not able to "
                        "detect the step code in the notebook. To fix "
                        "this error, define your step in a python file instead "
                        "of a notebook."
                    )

    if should_upload:
        logger.info("Uploading notebook code...")

        for _, cell_code in resolved_notebook_sources.items():
            notebook_utils.warn_about_notebook_cell_magic_commands(
                cell_code=cell_code
            )
            module_name = notebook_utils.compute_cell_replacement_module_name(
                cell_code=cell_code
            )
            file_name = f"{module_name}.py"

            code_utils.upload_notebook_code(
                artifact_store=stack.artifact_store,
                cell_code=cell_code,
                file_name=file_name,
            )

        all_snapshot_sources = get_all_sources_from_value(snapshot)

        for source in all_snapshot_sources:
            if source.type == SourceType.NOTEBOOK:
                setattr(source, "artifact_store_id", stack.artifact_store.id)

        logger.info("Upload finished.")
validate_run_config_is_runnable_from_server(run_configuration: PipelineRunConfiguration) -> None

Validates that the run configuration can be used to run from the server.

Parameters:

Name Type Description Default
run_configuration PipelineRunConfiguration

The run configuration to validate.

required

Raises:

Type Description
ValueError

If there are values in the run configuration that are not allowed when running a pipeline from the server.

Source code in src/zenml/pipelines/run_utils.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def validate_run_config_is_runnable_from_server(
    run_configuration: "PipelineRunConfiguration",
) -> None:
    """Validates that the run configuration can be used to run from the server.

    Args:
        run_configuration: The run configuration to validate.

    Raises:
        ValueError: If there are values in the run configuration that are not
            allowed when running a pipeline from the server.
    """
    if run_configuration.parameters:
        raise ValueError(
            "Can't set pipeline parameters when running pipeline via Rest API. "
            "This likely requires refactoring your pipeline code to use step parameters "
            "instead of pipeline parameters. For example, instead of: "
            "```yaml "
            "parameters: "
            "  param1: 1 "
            "  param2: 2 "
            "``` "
            "You'll need to modify your pipeline code to pass parameters directly to steps: "
            "```yaml "
            "steps: "
            "  step1: "
            "    parameters: "
            "      param1: 1 "
            "      param2: 2 "
            "``` "
        )

    if run_configuration.build:
        raise ValueError("Can't set build when running pipeline via Rest API.")

    if run_configuration.schedule:
        raise ValueError(
            "Can't set schedule when running pipeline via Rest API."
        )

    if run_configuration.settings and run_configuration.settings.get("docker"):
        raise ValueError(
            "Can't set DockerSettings when running pipeline via Rest API."
        )

    if run_configuration.steps:
        for step_update in run_configuration.steps.values():
            if step_update.settings and step_update.settings.get("docker"):
                raise ValueError(
                    "Can't set DockerSettings when running pipeline via "
                    "Rest API."
                )
validate_stack_is_runnable_from_server(zen_store: BaseZenStore, stack: StackResponse) -> None

Validate if a stack model is runnable from the server.

Parameters:

Name Type Description Default
zen_store BaseZenStore

ZenStore to use for listing flavors.

required
stack StackResponse

The stack to validate.

required

Raises:

Type Description
ValueError

If the stack has components of a custom flavor or local components.

Source code in src/zenml/pipelines/run_utils.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def validate_stack_is_runnable_from_server(
    zen_store: BaseZenStore, stack: StackResponse
) -> None:
    """Validate if a stack model is runnable from the server.

    Args:
        zen_store: ZenStore to use for listing flavors.
        stack: The stack to validate.

    Raises:
        ValueError: If the stack has components of a custom flavor or local
            components.
    """
    for component_list in stack.components.values():
        assert len(component_list) == 1
        component = component_list[0]
        flavors = zen_store.list_flavors(
            FlavorFilter(name=component.flavor_name, type=component.type)
        )
        assert len(flavors) == 1
        flavor_model = flavors[0]

        if flavor_model.is_custom:
            raise ValueError(
                "Unable to run pipeline from the server on a stack that "
                "includes stack components with a custom flavor."
            )

        flavor = Flavor.from_model(flavor_model)
        component_config = flavor.config_class(**component.configuration)

        if component_config.is_local:
            raise ValueError(
                "Unable to run pipeline from the server on a stack that "
                "includes local stack components."
            )
wait_for_pipeline_run_to_finish(run_id: UUID) -> PipelineRunResponse

Waits until a pipeline run is finished.

Parameters:

Name Type Description Default
run_id UUID

ID of the run for which to wait.

required

Returns:

Type Description
PipelineRunResponse

Model of the finished run.

Source code in src/zenml/pipelines/run_utils.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def wait_for_pipeline_run_to_finish(run_id: UUID) -> "PipelineRunResponse":
    """Waits until a pipeline run is finished.

    Args:
        run_id: ID of the run for which to wait.

    Returns:
        Model of the finished run.
    """
    sleep_interval = 1
    max_sleep_interval = 64

    while True:
        run = Client().get_pipeline_run(run_id)

        if run.status.is_finished:
            return run

        logger.info(
            "Waiting for pipeline run with ID %s to finish (current status: %s)",
            run_id,
            run.status,
        )
        time.sleep(sleep_interval)
        if sleep_interval < max_sleep_interval:
            sleep_interval *= 2
Modules