Skip to content

Orchestrators

zenml.orchestrators

Initialization for ZenML orchestrators.

An orchestrator is a special kind of backend that manages the running of each step of the pipeline. Orchestrators administer the actual pipeline runs. You can think of it as the 'root' of any pipeline job that you run during your experimentation.

ZenML supports a local orchestrator out of the box which allows you to run your pipelines in a local environment. We also support using Apache Airflow as the orchestrator to handle the steps of your pipeline.

Attributes

__all__ = ['BaseOrchestrator', 'BaseOrchestratorConfig', 'BaseOrchestratorFlavor', 'ContainerizedOrchestrator', 'WheeledOrchestrator', 'LocalOrchestrator', 'LocalOrchestratorFlavor', 'LocalDockerOrchestrator', 'LocalDockerOrchestratorFlavor'] module-attribute

Classes

BaseOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: StackComponent, ABC

Base class for all orchestrators.

In order to implement an orchestrator you will need to subclass from this class.

How it works:

The run(...) method is the entrypoint that is executed when the pipeline's run method is called within the user code (pipeline_instance.run(...)).

This method will do some internal preparation and then call the prepare_or_run_pipeline(...) method. BaseOrchestrator subclasses must implement this method and either run the pipeline steps directly or deploy the pipeline to some remote infrastructure.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: BaseOrchestratorConfig property

Returns the BaseOrchestratorConfig config.

Returns:

Type Description
BaseOrchestratorConfig

The configuration.

Functions
fetch_status(run: PipelineRunResponse) -> ExecutionStatus

Refreshes the status of a specific pipeline run.

Parameters:

Name Type Description Default
run PipelineRunResponse

A pipeline run response to fetch its status.

required

Raises:

Type Description
NotImplementedError

If any orchestrator inheriting from the base class does not implement this logic.

Source code in src/zenml/orchestrators/base_orchestrator.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def fetch_status(self, run: "PipelineRunResponse") -> ExecutionStatus:
    """Refreshes the status of a specific pipeline run.

    Args:
        run: A pipeline run response to fetch its status.

    Raises:
        NotImplementedError: If any orchestrator inheriting from the base
            class does not implement this logic.
    """
    raise NotImplementedError(
        "The fetch status functionality is not implemented for the "
        f"'{self.__class__.__name__}' orchestrator."
    )
get_orchestrator_run_id() -> str abstractmethod

Returns the run id of the active orchestrator run.

Important: This needs to be a unique ID and return the same value for all steps of a pipeline run.

Returns:

Type Description
str

The orchestrator run id.

Source code in src/zenml/orchestrators/base_orchestrator.py
126
127
128
129
130
131
132
133
134
135
@abstractmethod
def get_orchestrator_run_id(self) -> str:
    """Returns the run id of the active orchestrator run.

    Important: This needs to be a unique ID and return the same value for
    all steps of a pipeline run.

    Returns:
        The orchestrator run id.
    """
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Optional[Iterator[Dict[str, MetadataType]]] abstractmethod

The method needs to be implemented by the respective orchestrator.

Depending on the type of orchestrator you'll have to perform slightly different operations.

Simple Case:

The Steps are run directly from within the same environment in which the orchestrator code is executed. In this case you will need to deal with implementation-specific runtime configurations (like the schedule) and then iterate through the steps and finally call self.run_step(...) to execute each step.

Advanced Case:

Most orchestrators will not run the steps directly. Instead, they build some intermediate representation of the pipeline that is then used to create and run the pipeline and its steps on the target environment. For such orchestrators this method will have to build this representation and deploy it.

Regardless of the implementation details, the orchestrator will need to run each step in the target environment. For this the self.run_step(...) method should be used.

The easiest way to make this work is by using an entrypoint configuration to run single steps (zenml.entrypoints.step_entrypoint_configuration.StepEntrypointConfiguration) or entire pipelines (zenml.entrypoints.pipeline_entrypoint_configuration.PipelineEntrypointConfiguration).

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment. These don't need to be set if running locally.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment.

None

Yields:

Type Description
Optional[Iterator[Dict[str, MetadataType]]]

Metadata for the pipeline run.

Source code in src/zenml/orchestrators/base_orchestrator.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
@abstractmethod
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Optional[Iterator[Dict[str, MetadataType]]]:
    """The method needs to be implemented by the respective orchestrator.

    Depending on the type of orchestrator you'll have to perform slightly
    different operations.

    Simple Case:
    ------------
    The Steps are run directly from within the same environment in which
    the orchestrator code is executed. In this case you will need to
    deal with implementation-specific runtime configurations (like the
    schedule) and then iterate through the steps and finally call
    `self.run_step(...)` to execute each step.

    Advanced Case:
    --------------
    Most orchestrators will not run the steps directly. Instead, they
    build some intermediate representation of the pipeline that is then
    used to create and run the pipeline and its steps on the target
    environment. For such orchestrators this method will have to build
    this representation and deploy it.

    Regardless of the implementation details, the orchestrator will need
    to run each step in the target environment. For this the
    `self.run_step(...)` method should be used.

    The easiest way to make this work is by using an entrypoint
    configuration to run single steps (`zenml.entrypoints.step_entrypoint_configuration.StepEntrypointConfiguration`)
    or entire pipelines (`zenml.entrypoints.pipeline_entrypoint_configuration.PipelineEntrypointConfiguration`).

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment. These don't need to be set if running locally.
        placeholder_run: An optional placeholder run for the deployment.

    Yields:
        Metadata for the pipeline run.
    """
requires_resources_in_orchestration_environment(step: Step) -> bool staticmethod

Checks if the orchestrator should run this step on special resources.

Parameters:

Name Type Description Default
step Step

The step that will be checked.

required

Returns:

Type Description
bool

True if the step requires special resources in the orchestration

bool

environment, False otherwise.

Source code in src/zenml/orchestrators/base_orchestrator.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
@staticmethod
def requires_resources_in_orchestration_environment(
    step: "Step",
) -> bool:
    """Checks if the orchestrator should run this step on special resources.

    Args:
        step: The step that will be checked.

    Returns:
        True if the step requires special resources in the orchestration
        environment, False otherwise.
    """
    # If the step requires custom resources and doesn't run with a step
    # operator, it would need these requirements in the orchestrator
    # environment
    if step.config.step_operator:
        return False

    return not step.config.resource_settings.empty
run(deployment: PipelineDeploymentResponse, stack: Stack, placeholder_run: Optional[PipelineRunResponse] = None) -> None

Runs a pipeline on a stack.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment.

required
stack Stack

The stack on which to run the pipeline.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment. This will be deleted in case the pipeline deployment failed.

None
Source code in src/zenml/orchestrators/base_orchestrator.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def run(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> None:
    """Runs a pipeline on a stack.

    Args:
        deployment: The pipeline deployment.
        stack: The stack on which to run the pipeline.
        placeholder_run: An optional placeholder run for the deployment.
            This will be deleted in case the pipeline deployment failed.
    """
    self._prepare_run(deployment=deployment)

    pipeline_run_id: Optional[UUID] = None
    schedule_id: Optional[UUID] = None
    if deployment.schedule:
        schedule_id = deployment.schedule.id
    if placeholder_run:
        pipeline_run_id = placeholder_run.id

    environment = get_config_environment_vars(
        schedule_id=schedule_id,
        pipeline_run_id=pipeline_run_id,
    )

    prevent_client_side_caching = handle_bool_env_var(
        ENV_ZENML_PREVENT_CLIENT_SIDE_CACHING, default=False
    )

    if (
        placeholder_run
        and self.config.supports_client_side_caching
        and not deployment.schedule
        and not prevent_client_side_caching
    ):
        from zenml.orchestrators import step_run_utils

        cached_invocations = step_run_utils.create_cached_step_runs(
            deployment=deployment,
            pipeline_run=placeholder_run,
            stack=stack,
        )

        for invocation_id in cached_invocations:
            # Remove the cached step invocations from the deployment so
            # the orchestrator does not try to run them
            deployment.step_configurations.pop(invocation_id)

        for step in deployment.step_configurations.values():
            for invocation_id in cached_invocations:
                if invocation_id in step.spec.upstream_steps:
                    step.spec.upstream_steps.remove(invocation_id)

        if len(deployment.step_configurations) == 0:
            # All steps were cached, we update the pipeline run status and
            # don't actually use the orchestrator to run the pipeline
            self._cleanup_run()
            logger.info("All steps of the pipeline run were cached.")
            return
    else:
        logger.debug("Skipping client-side caching.")

    try:
        if metadata_iterator := self.prepare_or_run_pipeline(
            deployment=deployment,
            stack=stack,
            environment=environment,
            placeholder_run=placeholder_run,
        ):
            for metadata_dict in metadata_iterator:
                try:
                    if placeholder_run:
                        publish_pipeline_run_metadata(
                            pipeline_run_id=placeholder_run.id,
                            pipeline_run_metadata={self.id: metadata_dict},
                        )
                except Exception as e:
                    logger.debug(
                        "Something went went wrong trying to publish the"
                        f"run metadata: {e}"
                    )
    finally:
        self._cleanup_run()
run_step(step: Step) -> None

Runs the given step.

Parameters:

Name Type Description Default
step Step

The step to run.

required
Source code in src/zenml/orchestrators/base_orchestrator.py
272
273
274
275
276
277
278
279
280
281
282
283
284
def run_step(self, step: "Step") -> None:
    """Runs the given step.

    Args:
        step: The step to run.
    """
    assert self._active_deployment
    launcher = StepLauncher(
        deployment=self._active_deployment,
        step=step,
        orchestrator_run_id=self.get_orchestrator_run_id(),
    )
    launcher.launch()

BaseOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: StackComponentConfig

Base orchestrator config.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_schedulable: bool property

Whether the orchestrator is schedulable or not.

Returns:

Type Description
bool

Whether the orchestrator is schedulable or not.

is_synchronous: bool property

Whether the orchestrator runs synchronous or not.

Returns:

Type Description
bool

Whether the orchestrator runs synchronous or not.

supports_client_side_caching: bool property

Whether the orchestrator supports client side caching.

Returns:

Type Description
bool

Whether the orchestrator supports client side caching.

BaseOrchestratorFlavor

Bases: Flavor

Base orchestrator flavor class.

Attributes
config_class: Type[BaseOrchestratorConfig] property

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[BaseOrchestratorConfig]

The config class.

implementation_class: Type[BaseOrchestrator] abstractmethod property

Implementation class for this flavor.

Returns:

Type Description
Type[BaseOrchestrator]

The implementation class.

type: StackComponentType property

Returns the flavor type.

Returns:

Type Description
StackComponentType

The flavor type.

ContainerizedOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseOrchestrator, ABC

Base class for containerized orchestrators.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Functions
get_docker_builds(deployment: PipelineDeploymentBase) -> List[BuildConfiguration]

Gets the Docker builds required for the component.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The pipeline deployment for which to get the builds.

required

Returns:

Type Description
List[BuildConfiguration]

The required Docker builds.

Source code in src/zenml/orchestrators/containerized_orchestrator.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def get_docker_builds(
    self, deployment: "PipelineDeploymentBase"
) -> List["BuildConfiguration"]:
    """Gets the Docker builds required for the component.

    Args:
        deployment: The pipeline deployment for which to get the builds.

    Returns:
        The required Docker builds.
    """
    pipeline_settings = deployment.pipeline_configuration.docker_settings

    included_pipeline_build = False
    builds = []

    for name, step in deployment.step_configurations.items():
        step_settings = step.config.docker_settings

        if step_settings != pipeline_settings:
            build = BuildConfiguration(
                key=ORCHESTRATOR_DOCKER_IMAGE_KEY,
                settings=step_settings,
                step_name=name,
            )
            builds.append(build)
        elif not included_pipeline_build:
            pipeline_build = BuildConfiguration(
                key=ORCHESTRATOR_DOCKER_IMAGE_KEY,
                settings=pipeline_settings,
            )
            builds.append(pipeline_build)
            included_pipeline_build = True

    return builds
get_image(deployment: PipelineDeploymentResponse, step_name: Optional[str] = None) -> str staticmethod

Gets the Docker image for the pipeline/a step.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The deployment from which to get the image.

required
step_name Optional[str]

Pipeline step name for which to get the image. If not given the generic pipeline image will be returned.

None

Raises:

Type Description
RuntimeError

If the deployment does not have an associated build.

Returns:

Type Description
str

The image name or digest.

Source code in src/zenml/orchestrators/containerized_orchestrator.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@staticmethod
def get_image(
    deployment: "PipelineDeploymentResponse",
    step_name: Optional[str] = None,
) -> str:
    """Gets the Docker image for the pipeline/a step.

    Args:
        deployment: The deployment from which to get the image.
        step_name: Pipeline step name for which to get the image. If not
            given the generic pipeline image will be returned.

    Raises:
        RuntimeError: If the deployment does not have an associated build.

    Returns:
        The image name or digest.
    """
    if not deployment.build:
        raise RuntimeError(
            f"Missing build for deployment {deployment.id}. This is "
            "probably because the build was manually deleted."
        )

    return deployment.build.get_image(
        component_key=ORCHESTRATOR_DOCKER_IMAGE_KEY, step=step_name
    )

LocalDockerOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: ContainerizedOrchestrator

Orchestrator responsible for running pipelines locally using Docker.

This orchestrator does not allow for concurrent execution of steps and also does not support running on a schedule.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
settings_class: Optional[Type[BaseSettings]] property

Settings class for the Local Docker orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property

Ensures there is an image builder in the stack.

Returns:

Type Description
Optional[StackValidator]

A StackValidator instance.

Functions
get_orchestrator_run_id() -> str

Returns the active orchestrator run id.

Raises:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in src/zenml/orchestrators/local_docker/local_docker_orchestrator.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    try:
        return os.environ[ENV_ZENML_DOCKER_ORCHESTRATOR_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_DOCKER_ORCHESTRATOR_RUN_ID}."
        )
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Any

Sequentially runs all pipeline steps in local Docker containers.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment.

None

Raises:

Type Description
RuntimeError

If a step fails.

Source code in src/zenml/orchestrators/local_docker/local_docker_orchestrator.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Any:
    """Sequentially runs all pipeline steps in local Docker containers.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment.
        placeholder_run: An optional placeholder run for the deployment.

    Raises:
        RuntimeError: If a step fails.
    """
    if deployment.schedule:
        logger.warning(
            "Local Docker Orchestrator currently does not support the "
            "use of schedules. The `schedule` will be ignored "
            "and the pipeline will be run immediately."
        )

    docker_client = docker_utils._try_get_docker_client_from_env()

    entrypoint = StepEntrypointConfiguration.get_entrypoint_command()

    # Add the local stores path as a volume mount
    stack.check_local_paths()
    local_stores_path = GlobalConfiguration().local_stores_path
    volumes = {
        local_stores_path: {
            "bind": local_stores_path,
            "mode": "rw",
        }
    }
    orchestrator_run_id = str(uuid4())
    environment[ENV_ZENML_DOCKER_ORCHESTRATOR_RUN_ID] = orchestrator_run_id
    environment[ENV_ZENML_LOCAL_STORES_PATH] = local_stores_path
    start_time = time.time()

    # Run each step
    for step_name, step in deployment.step_configurations.items():
        if self.requires_resources_in_orchestration_environment(step):
            logger.warning(
                "Specifying step resources is not supported for the local "
                "Docker orchestrator, ignoring resource configuration for "
                "step %s.",
                step_name,
            )

        arguments = StepEntrypointConfiguration.get_entrypoint_arguments(
            step_name=step_name, deployment_id=deployment.id
        )

        settings = cast(
            LocalDockerOrchestratorSettings,
            self.get_settings(step),
        )
        image = self.get_image(deployment=deployment, step_name=step_name)

        user = None
        if sys.platform != "win32":
            user = os.getuid()
        logger.info("Running step `%s` in Docker:", step_name)

        run_args = copy.deepcopy(settings.run_args)
        docker_environment = run_args.pop("environment", {})
        docker_environment.update(environment)

        docker_volumes = run_args.pop("volumes", {})
        docker_volumes.update(volumes)

        extra_hosts = run_args.pop("extra_hosts", {})
        extra_hosts["host.docker.internal"] = "host-gateway"

        try:
            logs = docker_client.containers.run(
                image=image,
                entrypoint=entrypoint,
                command=arguments,
                user=user,
                volumes=docker_volumes,
                environment=docker_environment,
                stream=True,
                extra_hosts=extra_hosts,
                **run_args,
            )

            for line in logs:
                logger.info(line.strip().decode())
        except ContainerError as e:
            error_message = e.stderr.decode()
            raise RuntimeError(error_message)

    run_duration = time.time() - start_time
    logger.info(
        "Pipeline run has finished in `%s`.",
        string_utils.get_human_readable_time(run_duration),
    )

LocalDockerOrchestratorFlavor

Bases: BaseOrchestratorFlavor

Flavor for the local Docker orchestrator.

Attributes
config_class: Type[BaseOrchestratorConfig] property

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[BaseOrchestratorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[LocalDockerOrchestrator] property

Implementation class for this flavor.

Returns:

Type Description
Type[LocalDockerOrchestrator]

Implementation class for this flavor.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the orchestrator flavor.

Returns:

Type Description
str

Name of the orchestrator flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

LocalOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseOrchestrator

Orchestrator responsible for running pipelines locally.

This orchestrator does not allow for concurrent execution of steps and also does not support running on a schedule.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Functions
get_orchestrator_run_id() -> str

Returns the active orchestrator run id.

Raises:

Type Description
RuntimeError

If no run id exists. This happens when this method gets called while the orchestrator is not running a pipeline.

Returns:

Type Description
str

The orchestrator run id.

Source code in src/zenml/orchestrators/local/local_orchestrator.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If no run id exists. This happens when this method
            gets called while the orchestrator is not running a pipeline.

    Returns:
        The orchestrator run id.
    """
    if not self._orchestrator_run_id:
        raise RuntimeError("No run id set.")

    return self._orchestrator_run_id
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Any

Iterates through all steps and executes them sequentially.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack on which the pipeline is deployed.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment.

None
Source code in src/zenml/orchestrators/local/local_orchestrator.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Any:
    """Iterates through all steps and executes them sequentially.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack on which the pipeline is deployed.
        environment: Environment variables to set in the orchestration
            environment.
        placeholder_run: An optional placeholder run for the deployment.
    """
    if deployment.schedule:
        logger.warning(
            "Local Orchestrator currently does not support the "
            "use of schedules. The `schedule` will be ignored "
            "and the pipeline will be run immediately."
        )

    self._orchestrator_run_id = str(uuid4())
    start_time = time.time()

    # Run each step
    for step_name, step in deployment.step_configurations.items():
        if self.requires_resources_in_orchestration_environment(step):
            logger.warning(
                "Specifying step resources is not supported for the local "
                "orchestrator, ignoring resource configuration for "
                "step %s.",
                step_name,
            )

        self.run_step(
            step=step,
        )

    run_duration = time.time() - start_time
    logger.info(
        "Pipeline run has finished in `%s`.",
        string_utils.get_human_readable_time(run_duration),
    )
    self._orchestrator_run_id = None

LocalOrchestratorFlavor

Bases: BaseOrchestratorFlavor

Class for the LocalOrchestratorFlavor.

Attributes
config_class: Type[BaseOrchestratorConfig] property

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[BaseOrchestratorConfig]

The config class.

docs_url: Optional[str] property

A URL to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[LocalOrchestrator] property

Implementation class for this flavor.

Returns:

Type Description
Type[LocalOrchestrator]

The implementation class for this flavor.

logo_url: str property

A URL to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

The flavor name.

Returns:

Type Description
str

The flavor name.

sdk_docs_url: Optional[str] property

A URL to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

WheeledOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseOrchestrator, ABC

Base class for wheeled orchestrators.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Functions
copy_repository_to_temp_dir_and_add_setup_py() -> str

Copy the repository to a temporary directory and add a setup.py file.

Returns:

Type Description
str

Path to the temporary directory containing the copied repository.

Source code in src/zenml/orchestrators/wheeled_orchestrator.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
    def copy_repository_to_temp_dir_and_add_setup_py(self) -> str:
        """Copy the repository to a temporary directory and add a setup.py file.

        Returns:
            Path to the temporary directory containing the copied repository.
        """
        repo_path = get_source_root()

        self.package_name = f"{DEFAULT_PACKAGE_NAME}_{self.sanitize_name(os.path.basename(repo_path))}"

        # Create a temporary folder
        temp_dir = tempfile.mkdtemp(prefix="zenml-temp-")

        # Create a folder within the temporary directory
        temp_repo_path = os.path.join(temp_dir, self.package_name)
        fileio.mkdir(temp_repo_path)

        # Copy the repository to the temporary directory
        copy_dir(repo_path, temp_repo_path)

        # Create init file in the copied directory
        init_file_path = os.path.join(temp_repo_path, "__init__.py")
        with fileio.open(init_file_path, "w") as f:
            f.write("")

        # Create a setup.py file
        setup_py_content = f"""
from setuptools import setup, find_packages

setup(
    name="{self.package_name}",
    version="{self.package_version}",
    packages=find_packages(),
)
"""
        setup_py_path = os.path.join(temp_dir, "setup.py")
        with fileio.open(setup_py_path, "w") as f:
            f.write(setup_py_content)

        return temp_dir
create_wheel(temp_dir: str) -> str

Create a wheel for the package in the given temporary directory.

Parameters:

Name Type Description Default
temp_dir str

Path to the temporary directory containing the package.

required

Raises:

Type Description
RuntimeError

If the wheel file could not be created.

Returns:

Name Type Description
str str

Path to the created wheel file.

Source code in src/zenml/orchestrators/wheeled_orchestrator.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def create_wheel(self, temp_dir: str) -> str:
    """Create a wheel for the package in the given temporary directory.

    Args:
        temp_dir (str): Path to the temporary directory containing the package.

    Raises:
        RuntimeError: If the wheel file could not be created.

    Returns:
        str: Path to the created wheel file.
    """
    # Change to the temporary directory
    original_dir = os.getcwd()
    os.chdir(temp_dir)

    try:
        # Run the `pip wheel` command to create the wheel
        result = subprocess.run(
            ["pip", "wheel", "."], check=True, capture_output=True
        )
        logger.debug(f"Wheel creation stdout: {result.stdout.decode()}")
        logger.debug(f"Wheel creation stderr: {result.stderr.decode()}")

        # Find the created wheel file
        wheel_file = next(
            (
                file
                for file in os.listdir(temp_dir)
                if file.endswith(".whl")
            ),
            None,
        )

        if wheel_file is None:
            raise RuntimeError("Failed to create wheel file.")

        wheel_path = os.path.join(temp_dir, wheel_file)

        # Verify the wheel file is a valid zip file
        import zipfile

        if not zipfile.is_zipfile(wheel_path):
            raise RuntimeError(
                f"The file {wheel_path} is not a valid zip file."
            )

        return wheel_path
    finally:
        # Change back to the original directory
        os.chdir(original_dir)
sanitize_name(name: str) -> str

Sanitize the value to be used in a cluster name.

Parameters:

Name Type Description Default
name str

Arbitrary input cluster name.

required

Returns:

Type Description
str

Sanitized cluster name.

Source code in src/zenml/orchestrators/wheeled_orchestrator.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def sanitize_name(self, name: str) -> str:
    """Sanitize the value to be used in a cluster name.

    Args:
        name: Arbitrary input cluster name.

    Returns:
        Sanitized cluster name.
    """
    name = re.sub(
        r"[^a-z0-9-]", "-", name.lower()
    )  # replaces any character that is not a lowercase letter, digit, or hyphen with a hyphen
    name = re.sub(r"^[-]+", "", name)  # trim leading hyphens
    name = re.sub(r"[-]+$", "", name)  # trim trailing hyphens
    return name

Modules

base_orchestrator

Base orchestrator class.

Classes
BaseOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: StackComponent, ABC

Base class for all orchestrators.

In order to implement an orchestrator you will need to subclass from this class.

How it works:

The run(...) method is the entrypoint that is executed when the pipeline's run method is called within the user code (pipeline_instance.run(...)).

This method will do some internal preparation and then call the prepare_or_run_pipeline(...) method. BaseOrchestrator subclasses must implement this method and either run the pipeline steps directly or deploy the pipeline to some remote infrastructure.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: BaseOrchestratorConfig property

Returns the BaseOrchestratorConfig config.

Returns:

Type Description
BaseOrchestratorConfig

The configuration.

Functions
fetch_status(run: PipelineRunResponse) -> ExecutionStatus

Refreshes the status of a specific pipeline run.

Parameters:

Name Type Description Default
run PipelineRunResponse

A pipeline run response to fetch its status.

required

Raises:

Type Description
NotImplementedError

If any orchestrator inheriting from the base class does not implement this logic.

Source code in src/zenml/orchestrators/base_orchestrator.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def fetch_status(self, run: "PipelineRunResponse") -> ExecutionStatus:
    """Refreshes the status of a specific pipeline run.

    Args:
        run: A pipeline run response to fetch its status.

    Raises:
        NotImplementedError: If any orchestrator inheriting from the base
            class does not implement this logic.
    """
    raise NotImplementedError(
        "The fetch status functionality is not implemented for the "
        f"'{self.__class__.__name__}' orchestrator."
    )
get_orchestrator_run_id() -> str abstractmethod

Returns the run id of the active orchestrator run.

Important: This needs to be a unique ID and return the same value for all steps of a pipeline run.

Returns:

Type Description
str

The orchestrator run id.

Source code in src/zenml/orchestrators/base_orchestrator.py
126
127
128
129
130
131
132
133
134
135
@abstractmethod
def get_orchestrator_run_id(self) -> str:
    """Returns the run id of the active orchestrator run.

    Important: This needs to be a unique ID and return the same value for
    all steps of a pipeline run.

    Returns:
        The orchestrator run id.
    """
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Optional[Iterator[Dict[str, MetadataType]]] abstractmethod

The method needs to be implemented by the respective orchestrator.

Depending on the type of orchestrator you'll have to perform slightly different operations.

Simple Case:

The Steps are run directly from within the same environment in which the orchestrator code is executed. In this case you will need to deal with implementation-specific runtime configurations (like the schedule) and then iterate through the steps and finally call self.run_step(...) to execute each step.

Advanced Case:

Most orchestrators will not run the steps directly. Instead, they build some intermediate representation of the pipeline that is then used to create and run the pipeline and its steps on the target environment. For such orchestrators this method will have to build this representation and deploy it.

Regardless of the implementation details, the orchestrator will need to run each step in the target environment. For this the self.run_step(...) method should be used.

The easiest way to make this work is by using an entrypoint configuration to run single steps (zenml.entrypoints.step_entrypoint_configuration.StepEntrypointConfiguration) or entire pipelines (zenml.entrypoints.pipeline_entrypoint_configuration.PipelineEntrypointConfiguration).

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment. These don't need to be set if running locally.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment.

None

Yields:

Type Description
Optional[Iterator[Dict[str, MetadataType]]]

Metadata for the pipeline run.

Source code in src/zenml/orchestrators/base_orchestrator.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
@abstractmethod
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Optional[Iterator[Dict[str, MetadataType]]]:
    """The method needs to be implemented by the respective orchestrator.

    Depending on the type of orchestrator you'll have to perform slightly
    different operations.

    Simple Case:
    ------------
    The Steps are run directly from within the same environment in which
    the orchestrator code is executed. In this case you will need to
    deal with implementation-specific runtime configurations (like the
    schedule) and then iterate through the steps and finally call
    `self.run_step(...)` to execute each step.

    Advanced Case:
    --------------
    Most orchestrators will not run the steps directly. Instead, they
    build some intermediate representation of the pipeline that is then
    used to create and run the pipeline and its steps on the target
    environment. For such orchestrators this method will have to build
    this representation and deploy it.

    Regardless of the implementation details, the orchestrator will need
    to run each step in the target environment. For this the
    `self.run_step(...)` method should be used.

    The easiest way to make this work is by using an entrypoint
    configuration to run single steps (`zenml.entrypoints.step_entrypoint_configuration.StepEntrypointConfiguration`)
    or entire pipelines (`zenml.entrypoints.pipeline_entrypoint_configuration.PipelineEntrypointConfiguration`).

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment. These don't need to be set if running locally.
        placeholder_run: An optional placeholder run for the deployment.

    Yields:
        Metadata for the pipeline run.
    """
requires_resources_in_orchestration_environment(step: Step) -> bool staticmethod

Checks if the orchestrator should run this step on special resources.

Parameters:

Name Type Description Default
step Step

The step that will be checked.

required

Returns:

Type Description
bool

True if the step requires special resources in the orchestration

bool

environment, False otherwise.

Source code in src/zenml/orchestrators/base_orchestrator.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
@staticmethod
def requires_resources_in_orchestration_environment(
    step: "Step",
) -> bool:
    """Checks if the orchestrator should run this step on special resources.

    Args:
        step: The step that will be checked.

    Returns:
        True if the step requires special resources in the orchestration
        environment, False otherwise.
    """
    # If the step requires custom resources and doesn't run with a step
    # operator, it would need these requirements in the orchestrator
    # environment
    if step.config.step_operator:
        return False

    return not step.config.resource_settings.empty
run(deployment: PipelineDeploymentResponse, stack: Stack, placeholder_run: Optional[PipelineRunResponse] = None) -> None

Runs a pipeline on a stack.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment.

required
stack Stack

The stack on which to run the pipeline.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment. This will be deleted in case the pipeline deployment failed.

None
Source code in src/zenml/orchestrators/base_orchestrator.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def run(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> None:
    """Runs a pipeline on a stack.

    Args:
        deployment: The pipeline deployment.
        stack: The stack on which to run the pipeline.
        placeholder_run: An optional placeholder run for the deployment.
            This will be deleted in case the pipeline deployment failed.
    """
    self._prepare_run(deployment=deployment)

    pipeline_run_id: Optional[UUID] = None
    schedule_id: Optional[UUID] = None
    if deployment.schedule:
        schedule_id = deployment.schedule.id
    if placeholder_run:
        pipeline_run_id = placeholder_run.id

    environment = get_config_environment_vars(
        schedule_id=schedule_id,
        pipeline_run_id=pipeline_run_id,
    )

    prevent_client_side_caching = handle_bool_env_var(
        ENV_ZENML_PREVENT_CLIENT_SIDE_CACHING, default=False
    )

    if (
        placeholder_run
        and self.config.supports_client_side_caching
        and not deployment.schedule
        and not prevent_client_side_caching
    ):
        from zenml.orchestrators import step_run_utils

        cached_invocations = step_run_utils.create_cached_step_runs(
            deployment=deployment,
            pipeline_run=placeholder_run,
            stack=stack,
        )

        for invocation_id in cached_invocations:
            # Remove the cached step invocations from the deployment so
            # the orchestrator does not try to run them
            deployment.step_configurations.pop(invocation_id)

        for step in deployment.step_configurations.values():
            for invocation_id in cached_invocations:
                if invocation_id in step.spec.upstream_steps:
                    step.spec.upstream_steps.remove(invocation_id)

        if len(deployment.step_configurations) == 0:
            # All steps were cached, we update the pipeline run status and
            # don't actually use the orchestrator to run the pipeline
            self._cleanup_run()
            logger.info("All steps of the pipeline run were cached.")
            return
    else:
        logger.debug("Skipping client-side caching.")

    try:
        if metadata_iterator := self.prepare_or_run_pipeline(
            deployment=deployment,
            stack=stack,
            environment=environment,
            placeholder_run=placeholder_run,
        ):
            for metadata_dict in metadata_iterator:
                try:
                    if placeholder_run:
                        publish_pipeline_run_metadata(
                            pipeline_run_id=placeholder_run.id,
                            pipeline_run_metadata={self.id: metadata_dict},
                        )
                except Exception as e:
                    logger.debug(
                        "Something went went wrong trying to publish the"
                        f"run metadata: {e}"
                    )
    finally:
        self._cleanup_run()
run_step(step: Step) -> None

Runs the given step.

Parameters:

Name Type Description Default
step Step

The step to run.

required
Source code in src/zenml/orchestrators/base_orchestrator.py
272
273
274
275
276
277
278
279
280
281
282
283
284
def run_step(self, step: "Step") -> None:
    """Runs the given step.

    Args:
        step: The step to run.
    """
    assert self._active_deployment
    launcher = StepLauncher(
        deployment=self._active_deployment,
        step=step,
        orchestrator_run_id=self.get_orchestrator_run_id(),
    )
    launcher.launch()
BaseOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: StackComponentConfig

Base orchestrator config.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_schedulable: bool property

Whether the orchestrator is schedulable or not.

Returns:

Type Description
bool

Whether the orchestrator is schedulable or not.

is_synchronous: bool property

Whether the orchestrator runs synchronous or not.

Returns:

Type Description
bool

Whether the orchestrator runs synchronous or not.

supports_client_side_caching: bool property

Whether the orchestrator supports client side caching.

Returns:

Type Description
bool

Whether the orchestrator supports client side caching.

BaseOrchestratorFlavor

Bases: Flavor

Base orchestrator flavor class.

Attributes
config_class: Type[BaseOrchestratorConfig] property

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[BaseOrchestratorConfig]

The config class.

implementation_class: Type[BaseOrchestrator] abstractmethod property

Implementation class for this flavor.

Returns:

Type Description
Type[BaseOrchestrator]

The implementation class.

type: StackComponentType property

Returns the flavor type.

Returns:

Type Description
StackComponentType

The flavor type.

Functions

cache_utils

Utilities for caching.

Classes
Functions
generate_cache_key(step: Step, input_artifact_ids: Dict[str, UUID], artifact_store: BaseArtifactStore, project_id: UUID) -> str

Generates a cache key for a step run.

If the cache key is the same for two step runs, we conclude that the step runs are identical and can be cached.

The cache key is a MD5 hash of: - the project ID, - the artifact store ID and path, - the source code that defines the step, - the parameters of the step, - the names and IDs of the input artifacts of the step, - the names and source codes of the output artifacts of the step, - the source codes of the output materializers of the step. - additional custom caching parameters of the step.

Parameters:

Name Type Description Default
step Step

The step to generate the cache key for.

required
input_artifact_ids Dict[str, UUID]

The input artifact IDs for the step.

required
artifact_store BaseArtifactStore

The artifact store of the active stack.

required
project_id UUID

The ID of the active project.

required

Returns:

Type Description
str

A cache key.

Source code in src/zenml/orchestrators/cache_utils.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def generate_cache_key(
    step: "Step",
    input_artifact_ids: Dict[str, "UUID"],
    artifact_store: "BaseArtifactStore",
    project_id: "UUID",
) -> str:
    """Generates a cache key for a step run.

    If the cache key is the same for two step runs, we conclude that the step
    runs are identical and can be cached.

    The cache key is a MD5 hash of:
    - the project ID,
    - the artifact store ID and path,
    - the source code that defines the step,
    - the parameters of the step,
    - the names and IDs of the input artifacts of the step,
    - the names and source codes of the output artifacts of the step,
    - the source codes of the output materializers of the step.
    - additional custom caching parameters of the step.

    Args:
        step: The step to generate the cache key for.
        input_artifact_ids: The input artifact IDs for the step.
        artifact_store: The artifact store of the active stack.
        project_id: The ID of the active project.

    Returns:
        A cache key.
    """
    hash_ = hashlib.md5()  # nosec

    # Project ID
    hash_.update(project_id.bytes)

    # Artifact store ID and path
    hash_.update(artifact_store.id.bytes)
    hash_.update(artifact_store.path.encode())

    if artifact_store.custom_cache_key:
        hash_.update(artifact_store.custom_cache_key)

    # Step source. This currently only uses the string representation of the
    # source (e.g. my_module.step_class) instead of the full source to keep
    # the caching behavior of previous versions and to not invalidate caching
    # when committing some unrelated files
    hash_.update(step.spec.source.import_path.encode())

    # Step parameters
    for key, value in sorted(step.config.parameters.items()):
        hash_.update(key.encode())
        hash_.update(str(value).encode())

    # Input artifacts
    for name, artifact_version_id in input_artifact_ids.items():
        hash_.update(name.encode())
        hash_.update(artifact_version_id.bytes)

    # Output artifacts and materializers
    for name, output in step.config.outputs.items():
        hash_.update(name.encode())
        for source in output.materializer_source:
            hash_.update(source.import_path.encode())

    # Custom caching parameters
    for key, value in sorted(step.config.caching_parameters.items()):
        hash_.update(key.encode())
        hash_.update(str(value).encode())

    return hash_.hexdigest()
get_cached_step_run(cache_key: str) -> Optional[StepRunResponse]

If a given step can be cached, get the corresponding existing step run.

A step run can be cached if there is an existing step run in the same project which has the same cache key and was successfully executed.

Parameters:

Name Type Description Default
cache_key str

The cache key of the step.

required

Returns:

Type Description
Optional[StepRunResponse]

The existing step run if the step can be cached, otherwise None.

Source code in src/zenml/orchestrators/cache_utils.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def get_cached_step_run(cache_key: str) -> Optional["StepRunResponse"]:
    """If a given step can be cached, get the corresponding existing step run.

    A step run can be cached if there is an existing step run in the same
    project which has the same cache key and was successfully executed.

    Args:
        cache_key: The cache key of the step.

    Returns:
        The existing step run if the step can be cached, otherwise None.
    """
    client = Client()

    cache_candidates = client.list_run_steps(
        project=client.active_project.id,
        cache_key=cache_key,
        status=ExecutionStatus.COMPLETED,
        sort_by=f"{SorterOps.DESCENDING}:created",
        size=1,
    ).items

    if cache_candidates:
        return cache_candidates[0]
    return None

containerized_orchestrator

Containerized orchestrator class.

Classes
ContainerizedOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseOrchestrator, ABC

Base class for containerized orchestrators.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Functions
get_docker_builds(deployment: PipelineDeploymentBase) -> List[BuildConfiguration]

Gets the Docker builds required for the component.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The pipeline deployment for which to get the builds.

required

Returns:

Type Description
List[BuildConfiguration]

The required Docker builds.

Source code in src/zenml/orchestrators/containerized_orchestrator.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def get_docker_builds(
    self, deployment: "PipelineDeploymentBase"
) -> List["BuildConfiguration"]:
    """Gets the Docker builds required for the component.

    Args:
        deployment: The pipeline deployment for which to get the builds.

    Returns:
        The required Docker builds.
    """
    pipeline_settings = deployment.pipeline_configuration.docker_settings

    included_pipeline_build = False
    builds = []

    for name, step in deployment.step_configurations.items():
        step_settings = step.config.docker_settings

        if step_settings != pipeline_settings:
            build = BuildConfiguration(
                key=ORCHESTRATOR_DOCKER_IMAGE_KEY,
                settings=step_settings,
                step_name=name,
            )
            builds.append(build)
        elif not included_pipeline_build:
            pipeline_build = BuildConfiguration(
                key=ORCHESTRATOR_DOCKER_IMAGE_KEY,
                settings=pipeline_settings,
            )
            builds.append(pipeline_build)
            included_pipeline_build = True

    return builds
get_image(deployment: PipelineDeploymentResponse, step_name: Optional[str] = None) -> str staticmethod

Gets the Docker image for the pipeline/a step.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The deployment from which to get the image.

required
step_name Optional[str]

Pipeline step name for which to get the image. If not given the generic pipeline image will be returned.

None

Raises:

Type Description
RuntimeError

If the deployment does not have an associated build.

Returns:

Type Description
str

The image name or digest.

Source code in src/zenml/orchestrators/containerized_orchestrator.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@staticmethod
def get_image(
    deployment: "PipelineDeploymentResponse",
    step_name: Optional[str] = None,
) -> str:
    """Gets the Docker image for the pipeline/a step.

    Args:
        deployment: The deployment from which to get the image.
        step_name: Pipeline step name for which to get the image. If not
            given the generic pipeline image will be returned.

    Raises:
        RuntimeError: If the deployment does not have an associated build.

    Returns:
        The image name or digest.
    """
    if not deployment.build:
        raise RuntimeError(
            f"Missing build for deployment {deployment.id}. This is "
            "probably because the build was manually deleted."
        )

    return deployment.build.get_image(
        component_key=ORCHESTRATOR_DOCKER_IMAGE_KEY, step=step_name
    )

dag_runner

DAG (Directed Acyclic Graph) Runners.

Classes
NodeStatus

Bases: Enum

Status of the execution of a node.

ThreadedDagRunner(dag: Dict[str, List[str]], run_fn: Callable[[str], Any], finalize_fn: Optional[Callable[[Dict[str, NodeStatus]], None]] = None, parallel_node_startup_waiting_period: float = 0.0)

Multi-threaded DAG Runner.

This class expects a DAG of strings in adjacency list representation, as well as a custom run_fn as input, then calls run_fn(node) for each string node in the DAG.

Steps that can be executed in parallel will be started in separate threads.

Define attributes and initialize all nodes in waiting state.

Parameters:

Name Type Description Default
dag Dict[str, List[str]]

Adjacency list representation of a DAG. E.g.: [(1->2), (1->3), (2->4), (3->4)] should be represented as dag={2: [1], 3: [1], 4: [2, 3]}

required
run_fn Callable[[str], Any]

A function run_fn(node) that runs a single node

required
finalize_fn Optional[Callable[[Dict[str, NodeStatus]], None]]

A function finalize_fn(node_states) that is called when all nodes have completed.

None
parallel_node_startup_waiting_period float

Delay in seconds to wait in between starting parallel nodes.

0.0
Source code in src/zenml/orchestrators/dag_runner.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def __init__(
    self,
    dag: Dict[str, List[str]],
    run_fn: Callable[[str], Any],
    finalize_fn: Optional[Callable[[Dict[str, NodeStatus]], None]] = None,
    parallel_node_startup_waiting_period: float = 0.0,
) -> None:
    """Define attributes and initialize all nodes in waiting state.

    Args:
        dag: Adjacency list representation of a DAG.
            E.g.: [(1->2), (1->3), (2->4), (3->4)] should be represented as
            `dag={2: [1], 3: [1], 4: [2, 3]}`
        run_fn: A function `run_fn(node)` that runs a single node
        finalize_fn: A function `finalize_fn(node_states)` that is called
            when all nodes have completed.
        parallel_node_startup_waiting_period: Delay in seconds to wait in
            between starting parallel nodes.
    """
    self.parallel_node_startup_waiting_period = (
        parallel_node_startup_waiting_period
    )
    self.dag = dag
    self.reversed_dag = reverse_dag(dag)
    self.run_fn = run_fn
    self.finalize_fn = finalize_fn
    self.nodes = dag.keys()
    self.node_states = {node: NodeStatus.WAITING for node in self.nodes}
    self._lock = threading.Lock()
Functions
run() -> None

Call self.run_fn on all nodes in self.dag.

The order of execution is determined using topological sort. Each node is run in a separate thread to enable parallelism.

Source code in src/zenml/orchestrators/dag_runner.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def run(self) -> None:
    """Call `self.run_fn` on all nodes in `self.dag`.

    The order of execution is determined using topological sort.
    Each node is run in a separate thread to enable parallelism.
    """
    # Run all nodes that can be started immediately.
    # These will, in turn, start other nodes once all of their respective
    # upstream nodes have completed.
    threads: List[threading.Thread] = []
    for node in self.nodes:
        if self._can_run(node):
            if threads and self.parallel_node_startup_waiting_period > 0:
                time.sleep(self.parallel_node_startup_waiting_period)

            thread = self._run_node_in_thread(node)
            threads.append(thread)

    # Wait till all nodes have completed.
    for thread in threads:
        thread.join()

    # Call the finalize function.
    if self.finalize_fn:
        self.finalize_fn(self.node_states)

    # Print a status report.
    failed_nodes = []
    skipped_nodes = []
    for node in self.nodes:
        if self.node_states[node] == NodeStatus.FAILED:
            failed_nodes.append(node)
        elif self.node_states[node] == NodeStatus.WAITING:
            skipped_nodes.append(node)

    if failed_nodes:
        logger.error(
            "The following nodes failed: " + ", ".join(failed_nodes)
        )
    if skipped_nodes:
        logger.warning(
            "The following nodes were not run because they depend on other "
            "nodes that didn't complete: " + ", ".join(skipped_nodes)
        )
    if not failed_nodes and not skipped_nodes:
        logger.info("All nodes completed successfully.")
Functions
reverse_dag(dag: Dict[str, List[str]]) -> Dict[str, List[str]]

Reverse a DAG.

Parameters:

Name Type Description Default
dag Dict[str, List[str]]

Adjacency list representation of a DAG.

required

Returns:

Type Description
Dict[str, List[str]]

Adjacency list representation of the reversed DAG.

Source code in src/zenml/orchestrators/dag_runner.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def reverse_dag(dag: Dict[str, List[str]]) -> Dict[str, List[str]]:
    """Reverse a DAG.

    Args:
        dag: Adjacency list representation of a DAG.

    Returns:
        Adjacency list representation of the reversed DAG.
    """
    reversed_dag = defaultdict(list)

    # Reverse all edges in the graph.
    for node, upstream_nodes in dag.items():
        for upstream_node in upstream_nodes:
            reversed_dag[upstream_node].append(node)

    # Add nodes without incoming edges back in.
    for node in dag:
        if node not in reversed_dag:
            reversed_dag[node] = []

    return reversed_dag

input_utils

Utilities for inputs.

Classes
Functions
resolve_step_inputs(step: Step, pipeline_run: PipelineRunResponse) -> Tuple[Dict[str, StepRunInputResponse], List[UUID]]

Resolves inputs for the current step.

Parameters:

Name Type Description Default
step Step

The step for which to resolve the inputs.

required
pipeline_run PipelineRunResponse

The current pipeline run.

required

Raises:

Type Description
InputResolutionError

If input resolving failed due to a missing step or output.

ValueError

If object from model version passed into a step cannot be resolved in runtime due to missing object.

Returns:

Type Description
Tuple[Dict[str, StepRunInputResponse], List[UUID]]

The IDs of the input artifact versions and the IDs of parent steps of the current step.

Source code in src/zenml/orchestrators/input_utils.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def resolve_step_inputs(
    step: "Step",
    pipeline_run: "PipelineRunResponse",
) -> Tuple[Dict[str, "StepRunInputResponse"], List[UUID]]:
    """Resolves inputs for the current step.

    Args:
        step: The step for which to resolve the inputs.
        pipeline_run: The current pipeline run.

    Raises:
        InputResolutionError: If input resolving failed due to a missing
            step or output.
        ValueError: If object from model version passed into a step cannot be
            resolved in runtime due to missing object.

    Returns:
        The IDs of the input artifact versions and the IDs of parent steps of
            the current step.
    """
    from zenml.models import ArtifactVersionResponse
    from zenml.models.v2.core.step_run import StepRunInputResponse

    current_run_steps = {
        run_step.name: run_step
        for run_step in pagination_utils.depaginate(
            Client().list_run_steps,
            pipeline_run_id=pipeline_run.id,
            project=pipeline_run.project.id,
        )
    }

    input_artifacts: Dict[str, StepRunInputResponse] = {}
    for name, input_ in step.spec.inputs.items():
        try:
            step_run = current_run_steps[input_.step_name]
        except KeyError:
            raise InputResolutionError(
                f"No step `{input_.step_name}` found in current run."
            )

        # Try to get the substitutions from the pipeline run first, as we
        # already have a hydrated version of that. In the unlikely case
        # that the pipeline run is outdated, we fetch it from the step
        # run instead which will costs us one hydration call.
        substitutions = (
            pipeline_run.step_substitutions.get(step_run.name)
            or step_run.config.substitutions
        )
        output_name = string_utils.format_name_template(
            input_.output_name, substitutions=substitutions
        )

        try:
            outputs = step_run.outputs[output_name]
        except KeyError:
            raise InputResolutionError(
                f"No step output `{output_name}` found for step "
                f"`{input_.step_name}`."
            )

        step_outputs = [
            output
            for output in outputs
            if output.save_type == ArtifactSaveType.STEP_OUTPUT
        ]
        if len(step_outputs) > 2:
            # This should never happen, there can only be a single regular step
            # output for a name
            raise InputResolutionError(
                f"Too many step outputs for output `{output_name}` of "
                f"step `{input_.step_name}`."
            )
        elif len(step_outputs) == 0:
            raise InputResolutionError(
                f"No step output `{output_name}` found for step "
                f"`{input_.step_name}`."
            )

        input_artifacts[name] = StepRunInputResponse(
            input_type=StepRunInputArtifactType.STEP_OUTPUT,
            **step_outputs[0].model_dump(),
        )

    for (
        name,
        external_artifact,
    ) in step.config.external_input_artifacts.items():
        artifact_version_id = external_artifact.get_artifact_version_id()
        input_artifacts[name] = StepRunInputResponse(
            input_type=StepRunInputArtifactType.EXTERNAL,
            **Client().get_artifact_version(artifact_version_id).model_dump(),
        )

    for name, config_ in step.config.model_artifacts_or_metadata.items():
        err_msg = ""
        try:
            context_model_version = config_._get_model_response(
                pipeline_run=pipeline_run
            )
        except RuntimeError as e:
            err_msg = str(e)
        else:
            if (
                config_.artifact_name is None
                and config_.metadata_name
                and context_model_version.run_metadata is not None
            ):
                # metadata values should go directly in parameters, as primitive types
                step.config.parameters[name] = (
                    context_model_version.run_metadata[config_.metadata_name]
                )
            elif config_.artifact_name is None:
                err_msg = (
                    "Cannot load artifact from model version, "
                    "no artifact name specified."
                )
            else:
                if artifact_ := context_model_version.get_artifact(
                    config_.artifact_name, config_.artifact_version
                ):
                    if config_.metadata_name is None:
                        input_artifacts[name] = StepRunInputResponse(
                            input_type=StepRunInputArtifactType.LAZY_LOADED,
                            **artifact_.model_dump(),
                        )
                    elif config_.metadata_name:
                        # metadata values should go directly in parameters, as primitive types
                        try:
                            step.config.parameters[name] = (
                                artifact_.run_metadata[config_.metadata_name]
                            )
                        except KeyError:
                            err_msg = (
                                f"Artifact run metadata `{config_.metadata_name}` "
                                "could not be found in artifact "
                                f"`{config_.artifact_name}::{config_.artifact_version}`."
                            )
                else:
                    err_msg = (
                        f"Artifact `{config_.artifact_name}::{config_.artifact_version}` "
                        f"not found in model `{context_model_version.model.name}` "
                        f"version `{context_model_version.name}`."
                    )
        if err_msg:
            raise ValueError(
                f"Failed to lazy load model version data in step `{step.config.name}`: "
                + err_msg
            )
    for name, cll_ in step.config.client_lazy_loaders.items():
        value_ = cll_.evaluate()
        if isinstance(value_, ArtifactVersionResponse):
            input_artifacts[name] = StepRunInputResponse(
                input_type=StepRunInputArtifactType.LAZY_LOADED,
                **value_.model_dump(),
            )
        else:
            step.config.parameters[name] = value_

    parent_step_ids = [
        current_run_steps[upstream_step].id
        for upstream_step in step.spec.upstream_steps
    ]

    return input_artifacts, parent_step_ids
Modules

local

Initialization for the local orchestrator.

Modules
local_orchestrator

Implementation of the ZenML local orchestrator.

Classes
LocalOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseOrchestrator

Orchestrator responsible for running pipelines locally.

This orchestrator does not allow for concurrent execution of steps and also does not support running on a schedule.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Functions
get_orchestrator_run_id() -> str

Returns the active orchestrator run id.

Raises:

Type Description
RuntimeError

If no run id exists. This happens when this method gets called while the orchestrator is not running a pipeline.

Returns:

Type Description
str

The orchestrator run id.

Source code in src/zenml/orchestrators/local/local_orchestrator.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If no run id exists. This happens when this method
            gets called while the orchestrator is not running a pipeline.

    Returns:
        The orchestrator run id.
    """
    if not self._orchestrator_run_id:
        raise RuntimeError("No run id set.")

    return self._orchestrator_run_id
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Any

Iterates through all steps and executes them sequentially.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack on which the pipeline is deployed.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment.

None
Source code in src/zenml/orchestrators/local/local_orchestrator.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Any:
    """Iterates through all steps and executes them sequentially.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack on which the pipeline is deployed.
        environment: Environment variables to set in the orchestration
            environment.
        placeholder_run: An optional placeholder run for the deployment.
    """
    if deployment.schedule:
        logger.warning(
            "Local Orchestrator currently does not support the "
            "use of schedules. The `schedule` will be ignored "
            "and the pipeline will be run immediately."
        )

    self._orchestrator_run_id = str(uuid4())
    start_time = time.time()

    # Run each step
    for step_name, step in deployment.step_configurations.items():
        if self.requires_resources_in_orchestration_environment(step):
            logger.warning(
                "Specifying step resources is not supported for the local "
                "orchestrator, ignoring resource configuration for "
                "step %s.",
                step_name,
            )

        self.run_step(
            step=step,
        )

    run_duration = time.time() - start_time
    logger.info(
        "Pipeline run has finished in `%s`.",
        string_utils.get_human_readable_time(run_duration),
    )
    self._orchestrator_run_id = None
LocalOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseOrchestratorConfig

Local orchestrator config.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_local: bool property

Checks if this stack component is running locally.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

is_synchronous: bool property

Whether the orchestrator runs synchronous or not.

Returns:

Type Description
bool

Whether the orchestrator runs synchronous or not.

LocalOrchestratorFlavor

Bases: BaseOrchestratorFlavor

Class for the LocalOrchestratorFlavor.

Attributes
config_class: Type[BaseOrchestratorConfig] property

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[BaseOrchestratorConfig]

The config class.

docs_url: Optional[str] property

A URL to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[LocalOrchestrator] property

Implementation class for this flavor.

Returns:

Type Description
Type[LocalOrchestrator]

The implementation class for this flavor.

logo_url: str property

A URL to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

The flavor name.

Returns:

Type Description
str

The flavor name.

sdk_docs_url: Optional[str] property

A URL to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

Functions Modules

local_docker

Initialization for the local Docker orchestrator.

Modules
local_docker_orchestrator

Implementation of the ZenML local Docker orchestrator.

Classes
LocalDockerOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: ContainerizedOrchestrator

Orchestrator responsible for running pipelines locally using Docker.

This orchestrator does not allow for concurrent execution of steps and also does not support running on a schedule.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
settings_class: Optional[Type[BaseSettings]] property

Settings class for the Local Docker orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property

Ensures there is an image builder in the stack.

Returns:

Type Description
Optional[StackValidator]

A StackValidator instance.

Functions
get_orchestrator_run_id() -> str

Returns the active orchestrator run id.

Raises:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in src/zenml/orchestrators/local_docker/local_docker_orchestrator.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    try:
        return os.environ[ENV_ZENML_DOCKER_ORCHESTRATOR_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_DOCKER_ORCHESTRATOR_RUN_ID}."
        )
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Any

Sequentially runs all pipeline steps in local Docker containers.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment.

None

Raises:

Type Description
RuntimeError

If a step fails.

Source code in src/zenml/orchestrators/local_docker/local_docker_orchestrator.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Any:
    """Sequentially runs all pipeline steps in local Docker containers.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment.
        placeholder_run: An optional placeholder run for the deployment.

    Raises:
        RuntimeError: If a step fails.
    """
    if deployment.schedule:
        logger.warning(
            "Local Docker Orchestrator currently does not support the "
            "use of schedules. The `schedule` will be ignored "
            "and the pipeline will be run immediately."
        )

    docker_client = docker_utils._try_get_docker_client_from_env()

    entrypoint = StepEntrypointConfiguration.get_entrypoint_command()

    # Add the local stores path as a volume mount
    stack.check_local_paths()
    local_stores_path = GlobalConfiguration().local_stores_path
    volumes = {
        local_stores_path: {
            "bind": local_stores_path,
            "mode": "rw",
        }
    }
    orchestrator_run_id = str(uuid4())
    environment[ENV_ZENML_DOCKER_ORCHESTRATOR_RUN_ID] = orchestrator_run_id
    environment[ENV_ZENML_LOCAL_STORES_PATH] = local_stores_path
    start_time = time.time()

    # Run each step
    for step_name, step in deployment.step_configurations.items():
        if self.requires_resources_in_orchestration_environment(step):
            logger.warning(
                "Specifying step resources is not supported for the local "
                "Docker orchestrator, ignoring resource configuration for "
                "step %s.",
                step_name,
            )

        arguments = StepEntrypointConfiguration.get_entrypoint_arguments(
            step_name=step_name, deployment_id=deployment.id
        )

        settings = cast(
            LocalDockerOrchestratorSettings,
            self.get_settings(step),
        )
        image = self.get_image(deployment=deployment, step_name=step_name)

        user = None
        if sys.platform != "win32":
            user = os.getuid()
        logger.info("Running step `%s` in Docker:", step_name)

        run_args = copy.deepcopy(settings.run_args)
        docker_environment = run_args.pop("environment", {})
        docker_environment.update(environment)

        docker_volumes = run_args.pop("volumes", {})
        docker_volumes.update(volumes)

        extra_hosts = run_args.pop("extra_hosts", {})
        extra_hosts["host.docker.internal"] = "host-gateway"

        try:
            logs = docker_client.containers.run(
                image=image,
                entrypoint=entrypoint,
                command=arguments,
                user=user,
                volumes=docker_volumes,
                environment=docker_environment,
                stream=True,
                extra_hosts=extra_hosts,
                **run_args,
            )

            for line in logs:
                logger.info(line.strip().decode())
        except ContainerError as e:
            error_message = e.stderr.decode()
            raise RuntimeError(error_message)

    run_duration = time.time() - start_time
    logger.info(
        "Pipeline run has finished in `%s`.",
        string_utils.get_human_readable_time(run_duration),
    )
LocalDockerOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseOrchestratorConfig, LocalDockerOrchestratorSettings

Local Docker orchestrator config.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_local: bool property

Checks if this stack component is running locally.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

is_synchronous: bool property

Whether the orchestrator runs synchronous or not.

Returns:

Type Description
bool

Whether the orchestrator runs synchronous or not.

LocalDockerOrchestratorFlavor

Bases: BaseOrchestratorFlavor

Flavor for the local Docker orchestrator.

Attributes
config_class: Type[BaseOrchestratorConfig] property

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[BaseOrchestratorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[LocalDockerOrchestrator] property

Implementation class for this flavor.

Returns:

Type Description
Type[LocalDockerOrchestrator]

Implementation class for this flavor.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the orchestrator flavor.

Returns:

Type Description
str

Name of the orchestrator flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

LocalDockerOrchestratorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Local Docker orchestrator settings.

Attributes:

Name Type Description
run_args Dict[str, Any]

Arguments to pass to the docker run call. (See https://docker-py.readthedocs.io/en/stable/containers.html for a list of what can be passed.)

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
Functions Modules

output_utils

Utilities for outputs.

Classes
Functions
generate_artifact_uri(artifact_store: BaseArtifactStore, step_run: StepRunResponse, output_name: str) -> str

Generates a URI for an output artifact.

Parameters:

Name Type Description Default
artifact_store BaseArtifactStore

The artifact store on which the artifact will be stored.

required
step_run StepRunResponse

The step run that created the artifact.

required
output_name str

The name of the output in the step run for this artifact.

required

Returns:

Type Description
str

The URI of the output artifact.

Source code in src/zenml/orchestrators/output_utils.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def generate_artifact_uri(
    artifact_store: "BaseArtifactStore",
    step_run: "StepRunResponse",
    output_name: str,
) -> str:
    """Generates a URI for an output artifact.

    Args:
        artifact_store: The artifact store on which the artifact will be stored.
        step_run: The step run that created the artifact.
        output_name: The name of the output in the step run for this artifact.

    Returns:
        The URI of the output artifact.
    """
    for banned_character in ["<", ">", ":", '"', "/", "\\", "|", "?", "*"]:
        output_name = output_name.replace(banned_character, "_")
    return os.path.join(
        artifact_store.path,
        step_run.name,
        output_name,
        str(step_run.id),
        str(uuid4())[:8],  # add random subfolder to avoid collisions
    )
prepare_output_artifact_uris(step_run: StepRunResponse, stack: Stack, step: Step) -> Dict[str, str]

Prepares the output artifact URIs to run the current step.

Parameters:

Name Type Description Default
step_run StepRunResponse

The step run for which to prepare the artifact URIs.

required
stack Stack

The stack on which the pipeline is running.

required
step Step

The step configuration.

required

Raises:

Type Description
RuntimeError

If an artifact URI already exists.

Returns:

Type Description
Dict[str, str]

A dictionary mapping output names to artifact URIs.

Source code in src/zenml/orchestrators/output_utils.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def prepare_output_artifact_uris(
    step_run: "StepRunResponse", stack: "Stack", step: "Step"
) -> Dict[str, str]:
    """Prepares the output artifact URIs to run the current step.

    Args:
        step_run: The step run for which to prepare the artifact URIs.
        stack: The stack on which the pipeline is running.
        step: The step configuration.

    Raises:
        RuntimeError: If an artifact URI already exists.

    Returns:
        A dictionary mapping output names to artifact URIs.
    """
    artifact_store = stack.artifact_store
    output_artifact_uris: Dict[str, str] = {}
    for output_name in step.config.outputs.keys():
        substituted_output_name = string_utils.format_name_template(
            output_name, substitutions=step_run.config.substitutions
        )
        artifact_uri = generate_artifact_uri(
            artifact_store=stack.artifact_store,
            step_run=step_run,
            output_name=substituted_output_name,
        )
        if artifact_store.exists(artifact_uri):
            raise RuntimeError("Artifact already exists")
        artifact_store.makedirs(artifact_uri)
        output_artifact_uris[output_name] = artifact_uri
    return output_artifact_uris
remove_artifact_dirs(artifact_uris: Sequence[str]) -> None

Removes the artifact directories.

Parameters:

Name Type Description Default
artifact_uris Sequence[str]

URIs of the artifacts to remove the directories for.

required
Source code in src/zenml/orchestrators/output_utils.py
 94
 95
 96
 97
 98
 99
100
101
102
103
def remove_artifact_dirs(artifact_uris: Sequence[str]) -> None:
    """Removes the artifact directories.

    Args:
        artifact_uris: URIs of the artifacts to remove the directories for.
    """
    artifact_store = Client().active_stack.artifact_store
    for artifact_uri in artifact_uris:
        if artifact_store.isdir(artifact_uri):
            artifact_store.rmtree(artifact_uri)
Modules

publish_utils

Utilities to publish pipeline and step runs.

Classes
Functions
get_pipeline_run_status(step_statuses: List[ExecutionStatus], num_steps: int) -> ExecutionStatus

Gets the pipeline run status for the given step statuses.

Parameters:

Name Type Description Default
step_statuses List[ExecutionStatus]

The status of steps in this run.

required
num_steps int

The total amount of steps in this run.

required

Returns:

Type Description
ExecutionStatus

The run status.

Source code in src/zenml/orchestrators/publish_utils.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def get_pipeline_run_status(
    step_statuses: List[ExecutionStatus], num_steps: int
) -> ExecutionStatus:
    """Gets the pipeline run status for the given step statuses.

    Args:
        step_statuses: The status of steps in this run.
        num_steps: The total amount of steps in this run.

    Returns:
        The run status.
    """
    if ExecutionStatus.FAILED in step_statuses:
        return ExecutionStatus.FAILED
    if (
        ExecutionStatus.RUNNING in step_statuses
        or len(step_statuses) < num_steps
    ):
        return ExecutionStatus.RUNNING

    return ExecutionStatus.COMPLETED
publish_failed_pipeline_run(pipeline_run_id: UUID) -> PipelineRunResponse

Publishes a failed pipeline run.

Parameters:

Name Type Description Default
pipeline_run_id UUID

The ID of the pipeline run to update.

required

Returns:

Type Description
PipelineRunResponse

The updated pipeline run.

Source code in src/zenml/orchestrators/publish_utils.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def publish_failed_pipeline_run(
    pipeline_run_id: "UUID",
) -> "PipelineRunResponse":
    """Publishes a failed pipeline run.

    Args:
        pipeline_run_id: The ID of the pipeline run to update.

    Returns:
        The updated pipeline run.
    """
    return Client().zen_store.update_run(
        run_id=pipeline_run_id,
        run_update=PipelineRunUpdate(
            status=ExecutionStatus.FAILED,
            end_time=utc_now(),
        ),
    )
publish_failed_step_run(step_run_id: UUID) -> StepRunResponse

Publishes a failed step run.

Parameters:

Name Type Description Default
step_run_id UUID

The ID of the step run to update.

required

Returns:

Type Description
StepRunResponse

The updated step run.

Source code in src/zenml/orchestrators/publish_utils.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def publish_failed_step_run(step_run_id: "UUID") -> "StepRunResponse":
    """Publishes a failed step run.

    Args:
        step_run_id: The ID of the step run to update.

    Returns:
        The updated step run.
    """
    return Client().zen_store.update_run_step(
        step_run_id=step_run_id,
        step_run_update=StepRunUpdate(
            status=ExecutionStatus.FAILED,
            end_time=utc_now(),
        ),
    )
publish_pipeline_run_metadata(pipeline_run_id: UUID, pipeline_run_metadata: Dict[UUID, Dict[str, MetadataType]]) -> None

Publishes the given pipeline run metadata.

Parameters:

Name Type Description Default
pipeline_run_id UUID

The ID of the pipeline run.

required
pipeline_run_metadata Dict[UUID, Dict[str, MetadataType]]

A dictionary mapping stack component IDs to the metadata they created.

required
Source code in src/zenml/orchestrators/publish_utils.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def publish_pipeline_run_metadata(
    pipeline_run_id: "UUID",
    pipeline_run_metadata: Dict["UUID", Dict[str, "MetadataType"]],
) -> None:
    """Publishes the given pipeline run metadata.

    Args:
        pipeline_run_id: The ID of the pipeline run.
        pipeline_run_metadata: A dictionary mapping stack component IDs to the
            metadata they created.
    """
    client = Client()
    for stack_component_id, metadata in pipeline_run_metadata.items():
        client.create_run_metadata(
            metadata=metadata,
            resources=[
                RunMetadataResource(
                    id=pipeline_run_id, type=MetadataResourceTypes.PIPELINE_RUN
                )
            ],
            stack_component_id=stack_component_id,
        )
publish_step_run_metadata(step_run_id: UUID, step_run_metadata: Dict[UUID, Dict[str, MetadataType]]) -> None

Publishes the given step run metadata.

Parameters:

Name Type Description Default
step_run_id UUID

The ID of the step run.

required
step_run_metadata Dict[UUID, Dict[str, MetadataType]]

A dictionary mapping stack component IDs to the metadata they created.

required
Source code in src/zenml/orchestrators/publish_utils.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def publish_step_run_metadata(
    step_run_id: "UUID",
    step_run_metadata: Dict["UUID", Dict[str, "MetadataType"]],
) -> None:
    """Publishes the given step run metadata.

    Args:
        step_run_id: The ID of the step run.
        step_run_metadata: A dictionary mapping stack component IDs to the
            metadata they created.
    """
    client = Client()
    for stack_component_id, metadata in step_run_metadata.items():
        client.create_run_metadata(
            metadata=metadata,
            resources=[
                RunMetadataResource(
                    id=step_run_id, type=MetadataResourceTypes.STEP_RUN
                )
            ],
            stack_component_id=stack_component_id,
        )
publish_successful_step_run(step_run_id: UUID, output_artifact_ids: Dict[str, List[UUID]]) -> StepRunResponse

Publishes a successful step run.

Parameters:

Name Type Description Default
step_run_id UUID

The ID of the step run to update.

required
output_artifact_ids Dict[str, List[UUID]]

The output artifact IDs for the step run.

required

Returns:

Type Description
StepRunResponse

The updated step run.

Source code in src/zenml/orchestrators/publish_utils.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def publish_successful_step_run(
    step_run_id: "UUID", output_artifact_ids: Dict[str, List["UUID"]]
) -> "StepRunResponse":
    """Publishes a successful step run.

    Args:
        step_run_id: The ID of the step run to update.
        output_artifact_ids: The output artifact IDs for the step run.

    Returns:
        The updated step run.
    """
    return Client().zen_store.update_run_step(
        step_run_id=step_run_id,
        step_run_update=StepRunUpdate(
            status=ExecutionStatus.COMPLETED,
            end_time=utc_now(),
            outputs=output_artifact_ids,
        ),
    )

step_launcher

Class to launch (run directly or using a step operator) steps.

Classes
StepLauncher(deployment: PipelineDeploymentResponse, step: Step, orchestrator_run_id: str)

A class responsible for launching a step of a ZenML pipeline.

This class follows these steps to launch and publish a ZenML step: 1. Publish or reuse a PipelineRun 2. Resolve the input artifacts of the step 3. Generate a cache key for the step 4. Check if the step can be cached or not 5. Publish a new StepRun 6. If the step can't be cached, the step will be executed in one of these two ways depending on its configuration: - Calling a step operator to run the step in a different environment - Calling a step runner to run the step in the current environment 7. Update the status of the previously published StepRun 8. Update the status of the PipelineRun

Initializes the launcher.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment.

required
step Step

The step to launch.

required
orchestrator_run_id str

The orchestrator pipeline run id.

required

Raises:

Type Description
RuntimeError

If the deployment has no associated stack.

Source code in src/zenml/orchestrators/step_launcher.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def __init__(
    self,
    deployment: PipelineDeploymentResponse,
    step: Step,
    orchestrator_run_id: str,
):
    """Initializes the launcher.

    Args:
        deployment: The pipeline deployment.
        step: The step to launch.
        orchestrator_run_id: The orchestrator pipeline run id.

    Raises:
        RuntimeError: If the deployment has no associated stack.
    """
    self._deployment = deployment
    self._step = step
    self._orchestrator_run_id = orchestrator_run_id

    if not deployment.stack:
        raise RuntimeError(
            f"Missing stack for deployment {deployment.id}. This is "
            "probably because the stack was manually deleted."
        )

    self._stack = Stack.from_model(deployment.stack)
    self._step_name = step.spec.pipeline_parameter_name
Functions
launch() -> None

Launches the step.

Raises:

Type Description
BaseException

If the step failed to launch, run, or publish.

Source code in src/zenml/orchestrators/step_launcher.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def launch(self) -> None:
    """Launches the step.

    Raises:
        BaseException: If the step failed to launch, run, or publish.
    """
    pipeline_run, run_was_created = self._create_or_reuse_run()

    # Enable or disable step logs storage
    if handle_bool_env_var(ENV_ZENML_DISABLE_STEP_LOGS_STORAGE, False):
        step_logging_enabled = False
    else:
        step_logging_enabled = orchestrator_utils.is_setting_enabled(
            is_enabled_on_step=self._step.config.enable_step_logs,
            is_enabled_on_pipeline=self._deployment.pipeline_configuration.enable_step_logs,
        )

    logs_context = nullcontext()
    logs_model = None

    if step_logging_enabled:
        # Configure the logs
        logs_uri = step_logging.prepare_logs_uri(
            self._stack.artifact_store,
            self._step.config.name,
        )

        logs_context = step_logging.StepLogsStorageContext(
            logs_uri=logs_uri, artifact_store=self._stack.artifact_store
        )  # type: ignore[assignment]

        logs_model = LogsRequest(
            uri=logs_uri,
            artifact_store_id=self._stack.artifact_store.id,
        )

    try:
        with logs_context:
            if run_was_created:
                pipeline_run_metadata = (
                    self._stack.get_pipeline_run_metadata(
                        run_id=pipeline_run.id
                    )
                )
                publish_utils.publish_pipeline_run_metadata(
                    pipeline_run_id=pipeline_run.id,
                    pipeline_run_metadata=pipeline_run_metadata,
                )
                if model_version := pipeline_run.model_version:
                    step_run_utils.log_model_version_dashboard_url(
                        model_version=model_version
                    )

            request_factory = step_run_utils.StepRunRequestFactory(
                deployment=self._deployment,
                pipeline_run=pipeline_run,
                stack=self._stack,
            )
            step_run_request = request_factory.create_request(
                invocation_id=self._step_name
            )
            step_run_request.logs = logs_model

            try:
                request_factory.populate_request(request=step_run_request)
            except:
                logger.exception(
                    f"Failed preparing step `{self._step_name}`."
                )
                step_run_request.status = ExecutionStatus.FAILED
                step_run_request.end_time = utc_now()
                raise
            finally:
                step_run = Client().zen_store.create_run_step(
                    step_run_request
                )
                if model_version := step_run.model_version:
                    step_run_utils.log_model_version_dashboard_url(
                        model_version=model_version
                    )

            if not step_run.status.is_finished:
                logger.info(f"Step `{self._step_name}` has started.")
                retries = 0
                last_retry = True
                max_retries = (
                    step_run.config.retry.max_retries
                    if step_run.config.retry
                    else 1
                )
                delay = (
                    step_run.config.retry.delay
                    if step_run.config.retry
                    else 0
                )
                backoff = (
                    step_run.config.retry.backoff
                    if step_run.config.retry
                    else 1
                )

                while retries < max_retries:
                    last_retry = retries == max_retries - 1
                    try:
                        # here pass a forced save_to_file callable to be
                        # used as a dump function to use before starting
                        # the external jobs in step operators
                        if isinstance(
                            logs_context,
                            step_logging.StepLogsStorageContext,
                        ):
                            force_write_logs = partial(
                                logs_context.storage.save_to_file,
                                force=True,
                            )
                        else:

                            def _bypass() -> None:
                                return None

                            force_write_logs = _bypass
                        self._run_step(
                            pipeline_run=pipeline_run,
                            step_run=step_run,
                            last_retry=last_retry,
                            force_write_logs=force_write_logs,
                        )
                        break
                    except BaseException as e:  # noqa: E722
                        retries += 1
                        if retries < max_retries:
                            logger.error(
                                f"Failed to run step `{self._step_name}`. Retrying..."
                            )
                            logger.exception(e)
                            logger.info(
                                f"Sleeping for {delay} seconds before retrying."
                            )
                            time.sleep(delay)
                            delay *= backoff
                        else:
                            logger.error(
                                f"Failed to run step `{self._step_name}` after {max_retries} retries. Exiting."
                            )
                            logger.exception(e)
                            publish_utils.publish_failed_step_run(
                                step_run.id
                            )
                            raise
            else:
                logger.info(
                    f"Using cached version of step `{self._step_name}`."
                )
                if (
                    model_version := step_run.model_version
                    or pipeline_run.model_version
                ):
                    step_run_utils.link_output_artifacts_to_model_version(
                        artifacts=step_run.outputs,
                        model_version=model_version,
                    )

    except:  # noqa: E722
        logger.error(f"Pipeline run `{pipeline_run.name}` failed.")
        publish_utils.publish_failed_pipeline_run(pipeline_run.id)
        raise
Functions
Modules

step_run_utils

Utilities for creating step runs.

Classes
StepRunRequestFactory(deployment: PipelineDeploymentResponse, pipeline_run: PipelineRunResponse, stack: Stack)

Helper class to create step run requests.

Initialize the object.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The deployment for which to create step run requests.

required
pipeline_run PipelineRunResponse

The pipeline run for which to create step run requests.

required
stack Stack

The stack on which the pipeline run is happening.

required
Source code in src/zenml/orchestrators/step_run_utils.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def __init__(
    self,
    deployment: "PipelineDeploymentResponse",
    pipeline_run: "PipelineRunResponse",
    stack: "Stack",
) -> None:
    """Initialize the object.

    Args:
        deployment: The deployment for which to create step run requests.
        pipeline_run: The pipeline run for which to create step run
            requests.
        stack: The stack on which the pipeline run is happening.
    """
    self.deployment = deployment
    self.pipeline_run = pipeline_run
    self.stack = stack
Functions
create_request(invocation_id: str) -> StepRunRequest

Create a step run request.

This will only create a request with basic information and will not yet compute information like the cache key and inputs. This is separated into a different method populate_request(...) that might raise exceptions while trying to compute this information.

Parameters:

Name Type Description Default
invocation_id str

The invocation ID for which to create the request.

required

Returns:

Type Description
StepRunRequest

The step run request.

Source code in src/zenml/orchestrators/step_run_utils.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def create_request(self, invocation_id: str) -> StepRunRequest:
    """Create a step run request.

    This will only create a request with basic information and will not yet
    compute information like the cache key and inputs. This is separated
    into a different method `populate_request(...)` that might raise
    exceptions while trying to compute this information.

    Args:
        invocation_id: The invocation ID for which to create the request.

    Returns:
        The step run request.
    """
    return StepRunRequest(
        name=invocation_id,
        pipeline_run_id=self.pipeline_run.id,
        status=ExecutionStatus.RUNNING,
        start_time=utc_now(),
        project=Client().active_project.id,
    )
populate_request(request: StepRunRequest) -> None

Populate a step run request with additional information.

Parameters:

Name Type Description Default
request StepRunRequest

The request to populate.

required
Source code in src/zenml/orchestrators/step_run_utils.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def populate_request(self, request: StepRunRequest) -> None:
    """Populate a step run request with additional information.

    Args:
        request: The request to populate.
    """
    step = self.deployment.step_configurations[request.name]

    input_artifacts, parent_step_ids = input_utils.resolve_step_inputs(
        step=step,
        pipeline_run=self.pipeline_run,
    )
    input_artifact_ids = {
        input_name: artifact.id
        for input_name, artifact in input_artifacts.items()
    }

    request.inputs = input_artifact_ids
    request.parent_step_ids = parent_step_ids

    cache_key = cache_utils.generate_cache_key(
        step=step,
        input_artifact_ids=input_artifact_ids,
        artifact_store=self.stack.artifact_store,
        project_id=Client().active_project.id,
    )
    request.cache_key = cache_key

    (
        docstring,
        source_code,
    ) = self._get_docstring_and_source_code(invocation_id=request.name)

    request.docstring = docstring
    request.source_code = source_code
    request.code_hash = step.config.parameters.get(
        CODE_HASH_PARAMETER_NAME
    )

    cache_enabled = utils.is_setting_enabled(
        is_enabled_on_step=step.config.enable_cache,
        is_enabled_on_pipeline=self.deployment.pipeline_configuration.enable_cache,
    )

    if cache_enabled:
        if cached_step_run := cache_utils.get_cached_step_run(
            cache_key=cache_key
        ):
            request.inputs = {
                input_name: artifact.id
                for input_name, artifact in cached_step_run.inputs.items()
            }

            request.original_step_run_id = cached_step_run.id
            request.outputs = {
                output_name: [artifact.id for artifact in artifacts]
                for output_name, artifacts in cached_step_run.outputs.items()
            }

            request.status = ExecutionStatus.CACHED
            request.end_time = request.start_time
Functions
create_cached_step_runs(deployment: PipelineDeploymentResponse, pipeline_run: PipelineRunResponse, stack: Stack) -> Set[str]

Create all cached step runs for a pipeline run.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The deployment of the pipeline run.

required
pipeline_run PipelineRunResponse

The pipeline run for which to create the step runs.

required
stack Stack

The stack on which the pipeline run is happening.

required

Returns:

Type Description
Set[str]

The invocation IDs of the created step runs.

Source code in src/zenml/orchestrators/step_run_utils.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
def create_cached_step_runs(
    deployment: "PipelineDeploymentResponse",
    pipeline_run: PipelineRunResponse,
    stack: "Stack",
) -> Set[str]:
    """Create all cached step runs for a pipeline run.

    Args:
        deployment: The deployment of the pipeline run.
        pipeline_run: The pipeline run for which to create the step runs.
        stack: The stack on which the pipeline run is happening.

    Returns:
        The invocation IDs of the created step runs.
    """
    cached_invocations: Set[str] = set()
    visited_invocations: Set[str] = set()
    request_factory = StepRunRequestFactory(
        deployment=deployment, pipeline_run=pipeline_run, stack=stack
    )

    while (
        cache_candidates := find_cacheable_invocation_candidates(
            deployment=deployment,
            finished_invocations=cached_invocations,
        )
        # We've already checked these invocations and were not able to cache
        # them -> no need to check them again
        - visited_invocations
    ):
        for invocation_id in cache_candidates:
            visited_invocations.add(invocation_id)

            # Make sure the request factory has the most up to date pipeline
            # run to avoid hydration calls
            request_factory.pipeline_run = Client().get_pipeline_run(
                pipeline_run.id
            )
            try:
                step_run_request = request_factory.create_request(
                    invocation_id
                )
                request_factory.populate_request(step_run_request)
            except Exception as e:
                # We failed to create/populate the step run. This might be due
                # to some input resolution error, or an error importing the step
                # source (there might be some missing dependencies). We just
                # treat the step as not cached and let the orchestrator try
                # again in a potentially different environment.
                logger.debug(
                    "Failed preparing step run `%s`: %s", invocation_id, str(e)
                )
                continue

            if step_run_request.status != ExecutionStatus.CACHED:
                # If we're not able to cache the step run, the orchestrator
                # will run the step later which will create the step run
                # -> We don't need to do anything here
                continue

            step_run = Client().zen_store.create_run_step(step_run_request)

            if (
                model_version := step_run.model_version
                or pipeline_run.model_version
            ):
                link_output_artifacts_to_model_version(
                    artifacts=step_run.outputs,
                    model_version=model_version,
                )

            logger.info("Using cached version of step `%s`.", invocation_id)
            cached_invocations.add(invocation_id)

    return cached_invocations
find_cacheable_invocation_candidates(deployment: PipelineDeploymentResponse, finished_invocations: Set[str]) -> Set[str]

Find invocations that can potentially be cached.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment containing the invocations.

required
finished_invocations Set[str]

A set of invocations that are already finished.

required

Returns:

Type Description
Set[str]

The set of invocations that can potentially be cached.

Source code in src/zenml/orchestrators/step_run_utils.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def find_cacheable_invocation_candidates(
    deployment: "PipelineDeploymentResponse",
    finished_invocations: Set[str],
) -> Set[str]:
    """Find invocations that can potentially be cached.

    Args:
        deployment: The pipeline deployment containing the invocations.
        finished_invocations: A set of invocations that are already finished.

    Returns:
        The set of invocations that can potentially be cached.
    """
    invocations = set()
    for invocation_id, step in deployment.step_configurations.items():
        if invocation_id in finished_invocations:
            continue

        cache_enabled = utils.is_setting_enabled(
            is_enabled_on_step=step.config.enable_cache,
            is_enabled_on_pipeline=deployment.pipeline_configuration.enable_cache,
        )

        if not cache_enabled:
            continue

        if set(step.spec.upstream_steps) - finished_invocations:
            continue

        invocations.add(invocation_id)

    return invocations

Link the outputs of a step run to a model version.

Parameters:

Name Type Description Default
artifacts Dict[str, List[ArtifactVersionResponse]]

The step output artifacts.

required
model_version ModelVersionResponse

The model version to link.

required
Source code in src/zenml/orchestrators/step_run_utils.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
def link_output_artifacts_to_model_version(
    artifacts: Dict[str, List[ArtifactVersionResponse]],
    model_version: ModelVersionResponse,
) -> None:
    """Link the outputs of a step run to a model version.

    Args:
        artifacts: The step output artifacts.
        model_version: The model version to link.
    """
    for output_artifacts in artifacts.values():
        for output_artifact in output_artifacts:
            link_artifact_version_to_model_version(
                artifact_version=output_artifact,
                model_version=model_version,
            )
log_model_version_dashboard_url(model_version: ModelVersionResponse) -> None

Log the dashboard URL for a model version.

If the current server is not a ZenML Pro workspace, a fallback message is logged instead.

Parameters:

Name Type Description Default
model_version ModelVersionResponse

The model version for which to log the dashboard URL.

required
Source code in src/zenml/orchestrators/step_run_utils.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
def log_model_version_dashboard_url(
    model_version: ModelVersionResponse,
) -> None:
    """Log the dashboard URL for a model version.

    If the current server is not a ZenML Pro workspace, a fallback message is
    logged instead.

    Args:
        model_version: The model version for which to log the dashboard URL.
    """
    from zenml.utils.dashboard_utils import get_model_version_url

    if model_version_url := get_model_version_url(model_version):
        logger.info(
            "Dashboard URL for Model Version `%s (%s)`:\n%s",
            model_version.model.name,
            model_version.name,
            model_version_url,
        )
    else:
        logger.info(
            "Models can be viewed in the dashboard using ZenML Pro. Sign up "
            "for a free trial at https://www.zenml.io/pro/"
        )
Modules

step_runner

Class to run steps.

Classes
StepRunner(step: Step, stack: Stack)

Class to run steps.

Initializes the step runner.

Parameters:

Name Type Description Default
step Step

The step to run.

required
stack Stack

The stack on which the step should run.

required
Source code in src/zenml/orchestrators/step_runner.py
86
87
88
89
90
91
92
93
94
def __init__(self, step: "Step", stack: "Stack"):
    """Initializes the step runner.

    Args:
        step: The step to run.
        stack: The stack on which the step should run.
    """
    self._step = step
    self._stack = stack
Attributes
configuration: StepConfiguration property

Configuration of the step to run.

Returns:

Type Description
StepConfiguration

The step configuration.

Functions
load_and_run_hook(hook_source: Source, step_exception: Optional[BaseException]) -> None

Loads hook source and runs the hook.

Parameters:

Name Type Description Default
hook_source Source

The source of the hook function.

required
step_exception Optional[BaseException]

The exception of the original step.

required
Source code in src/zenml/orchestrators/step_runner.py
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
def load_and_run_hook(
    self,
    hook_source: "Source",
    step_exception: Optional[BaseException],
) -> None:
    """Loads hook source and runs the hook.

    Args:
        hook_source: The source of the hook function.
        step_exception: The exception of the original step.
    """
    try:
        hook = source_utils.load(hook_source)
        hook_spec = inspect.getfullargspec(inspect.unwrap(hook))

        function_params = self._parse_hook_inputs(
            args=hook_spec.args,
            annotations=hook_spec.annotations,
            step_exception=step_exception,
        )
        logger.debug(f"Running hook {hook} with params: {function_params}")
        hook(**function_params)
    except Exception as e:
        logger.error(
            f"Failed to load hook source with exception: '{hook_source}': "
            f"{e}"
        )
run(pipeline_run: PipelineRunResponse, step_run: StepRunResponse, input_artifacts: Dict[str, StepRunInputResponse], output_artifact_uris: Dict[str, str], step_run_info: StepRunInfo) -> None

Runs the step.

Parameters:

Name Type Description Default
pipeline_run PipelineRunResponse

The model of the current pipeline run.

required
step_run StepRunResponse

The model of the current step run.

required
input_artifacts Dict[str, StepRunInputResponse]

The input artifact versions of the step.

required
output_artifact_uris Dict[str, str]

The URIs of the output artifacts of the step.

required
step_run_info StepRunInfo

The step run info.

required

Raises:

Type Description
BaseException

A general exception if the step fails.

Source code in src/zenml/orchestrators/step_runner.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def run(
    self,
    pipeline_run: "PipelineRunResponse",
    step_run: "StepRunResponse",
    input_artifacts: Dict[str, StepRunInputResponse],
    output_artifact_uris: Dict[str, str],
    step_run_info: StepRunInfo,
) -> None:
    """Runs the step.

    Args:
        pipeline_run: The model of the current pipeline run.
        step_run: The model of the current step run.
        input_artifacts: The input artifact versions of the step.
        output_artifact_uris: The URIs of the output artifacts of the step.
        step_run_info: The step run info.

    Raises:
        BaseException: A general exception if the step fails.
    """
    if handle_bool_env_var(ENV_ZENML_DISABLE_STEP_LOGS_STORAGE, False):
        step_logging_enabled = False
    else:
        enabled_on_step = step_run.config.enable_step_logs
        enabled_on_pipeline = pipeline_run.config.enable_step_logs

        step_logging_enabled = is_setting_enabled(
            is_enabled_on_step=enabled_on_step,
            is_enabled_on_pipeline=enabled_on_pipeline,
        )

    logs_context = nullcontext()
    if step_logging_enabled and not redirected.get():
        if step_run.logs:
            logs_context = StepLogsStorageContext(  # type: ignore[assignment]
                logs_uri=step_run.logs.uri,
                artifact_store=self._stack.artifact_store,
            )
        else:
            logger.debug(
                "There is no LogsResponseModel prepared for the step. The"
                "step logging storage is disabled."
            )

    with logs_context:
        step_instance = self._load_step()
        output_materializers = self._load_output_materializers()
        spec = inspect.getfullargspec(
            inspect.unwrap(step_instance.entrypoint)
        )

        output_annotations = parse_return_type_annotations(
            func=step_instance.entrypoint
        )

        self._evaluate_artifact_names_in_collections(
            step_run,
            output_annotations,
            [
                output_artifact_uris,
                output_materializers,
            ],
        )

        self._stack.prepare_step_run(info=step_run_info)

        # Initialize the step context singleton
        StepContext._clear()
        step_context = StepContext(
            pipeline_run=pipeline_run,
            step_run=step_run,
            output_materializers=output_materializers,
            output_artifact_uris=output_artifact_uris,
            output_artifact_configs={
                k: v.artifact_config for k, v in output_annotations.items()
            },
        )

        # Parse the inputs for the entrypoint function.
        function_params = self._parse_inputs(
            args=spec.args,
            annotations=spec.annotations,
            input_artifacts=input_artifacts,
        )

        step_failed = False
        try:
            return_values = step_instance.call_entrypoint(
                **function_params
            )
        except BaseException as step_exception:  # noqa: E722
            step_failed = True
            if not handle_bool_env_var(
                ENV_ZENML_IGNORE_FAILURE_HOOK, False
            ):
                if (
                    failure_hook_source
                    := self.configuration.failure_hook_source
                ):
                    logger.info("Detected failure hook. Running...")
                    self.load_and_run_hook(
                        failure_hook_source,
                        step_exception=step_exception,
                    )
            raise
        finally:
            try:
                step_run_metadata = self._stack.get_step_run_metadata(
                    info=step_run_info,
                )
                publish_step_run_metadata(
                    step_run_id=step_run_info.step_run_id,
                    step_run_metadata=step_run_metadata,
                )
                self._stack.cleanup_step_run(
                    info=step_run_info, step_failed=step_failed
                )
                if not step_failed:
                    if (
                        success_hook_source
                        := self.configuration.success_hook_source
                    ):
                        logger.info("Detected success hook. Running...")
                        self.load_and_run_hook(
                            success_hook_source,
                            step_exception=None,
                        )

                    # Store and publish the output artifacts of the step function.
                    output_data = self._validate_outputs(
                        return_values, output_annotations
                    )
                    artifact_metadata_enabled = is_setting_enabled(
                        is_enabled_on_step=step_run_info.config.enable_artifact_metadata,
                        is_enabled_on_pipeline=step_run_info.pipeline.enable_artifact_metadata,
                    )
                    artifact_visualization_enabled = is_setting_enabled(
                        is_enabled_on_step=step_run_info.config.enable_artifact_visualization,
                        is_enabled_on_pipeline=step_run_info.pipeline.enable_artifact_visualization,
                    )
                    output_artifacts = self._store_output_artifacts(
                        output_data=output_data,
                        output_artifact_uris=output_artifact_uris,
                        output_materializers=output_materializers,
                        output_annotations=output_annotations,
                        artifact_metadata_enabled=artifact_metadata_enabled,
                        artifact_visualization_enabled=artifact_visualization_enabled,
                    )

                    if (
                        model_version := step_run.model_version
                        or pipeline_run.model_version
                    ):
                        from zenml.orchestrators import step_run_utils

                        step_run_utils.link_output_artifacts_to_model_version(
                            artifacts={
                                k: [v] for k, v in output_artifacts.items()
                            },
                            model_version=model_version,
                        )
            finally:
                step_context._cleanup_registry.execute_callbacks(
                    raise_on_exception=False
                )
                StepContext._clear()  # Remove the step context singleton

        # Update the status and output artifacts of the step run.
        output_artifact_ids = {
            output_name: [
                artifact.id,
            ]
            for output_name, artifact in output_artifacts.items()
        }
        publish_successful_step_run(
            step_run_id=step_run_info.step_run_id,
            output_artifact_ids=output_artifact_ids,
        )
Functions
Modules

topsort

Utilities for topological sort.

Implementation heavily inspired by TFX: https://github.com/tensorflow/tfx/blob/master/tfx/utils/topsort.py

Functions
topsorted_layers(nodes: Sequence[NodeT], get_node_id_fn: Callable[[NodeT], str], get_parent_nodes: Callable[[NodeT], List[NodeT]], get_child_nodes: Callable[[NodeT], List[NodeT]]) -> List[List[NodeT]]

Sorts the DAG of nodes in topological order.

Parameters:

Name Type Description Default
nodes Sequence[NodeT]

A sequence of nodes.

required
get_node_id_fn Callable[[NodeT], str]

Callable that returns a unique text identifier for a node.

required
get_parent_nodes Callable[[NodeT], List[NodeT]]

Callable that returns a list of parent nodes for a node. If a parent node's id is not found in the list of node ids, that parent node will be omitted.

required
get_child_nodes Callable[[NodeT], List[NodeT]]

Callable that returns a list of child nodes for a node. If a child node's id is not found in the list of node ids, that child node will be omitted.

required

Returns:

Type Description
List[List[NodeT]]

A list of topologically ordered node layers. Each layer of nodes is sorted

List[List[NodeT]]

by its node id given by get_node_id_fn.

Raises:

Type Description
RuntimeError

If the input nodes don't form a DAG.

ValueError

If the nodes are not unique.

Source code in src/zenml/orchestrators/topsort.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def topsorted_layers(
    nodes: Sequence[NodeT],
    get_node_id_fn: Callable[[NodeT], str],
    get_parent_nodes: Callable[[NodeT], List[NodeT]],
    get_child_nodes: Callable[[NodeT], List[NodeT]],
) -> List[List[NodeT]]:
    """Sorts the DAG of nodes in topological order.

    Args:
        nodes: A sequence of nodes.
        get_node_id_fn: Callable that returns a unique text identifier for a node.
        get_parent_nodes: Callable that returns a list of parent nodes for a node.
            If a parent node's id is not found in the list of node ids, that parent
            node will be omitted.
        get_child_nodes: Callable that returns a list of child nodes for a node.
            If a child node's id is not found in the list of node ids, that child
            node will be omitted.

    Returns:
        A list of topologically ordered node layers. Each layer of nodes is sorted
        by its node id given by `get_node_id_fn`.

    Raises:
        RuntimeError: If the input nodes don't form a DAG.
        ValueError: If the nodes are not unique.
    """
    # Make sure the nodes are unique.
    node_ids = set(get_node_id_fn(n) for n in nodes)
    if len(node_ids) != len(nodes):
        raise ValueError("Nodes must have unique ids.")

    # The outputs of get_(parent|child)_nodes should always be deduplicated,
    # and references to unknown nodes should be removed.
    def _apply_and_clean(
        func: Callable[[NodeT], List[NodeT]], func_name: str, node: NodeT
    ) -> List[NodeT]:
        seen_inner_node_ids = set()
        result = []
        for inner_node in func(node):
            inner_node_id = get_node_id_fn(inner_node)
            if inner_node_id in seen_inner_node_ids:
                logger.warning(
                    "Duplicate node_id %s found when calling %s on node %s. "
                    "This entry will be ignored.",
                    inner_node_id,
                    func_name,
                    node,
                )
            elif inner_node_id not in node_ids:
                logger.warning(
                    "node_id %s found when calling %s on node %s, but this node_id is "
                    "not found in the set of input nodes %s. This entry will be "
                    "ignored.",
                    inner_node_id,
                    func_name,
                    node,
                    node_ids,
                )
            else:
                seen_inner_node_ids.add(inner_node_id)
                result.append(inner_node)

        return result

    def get_clean_parent_nodes(node: NodeT) -> List[NodeT]:
        return _apply_and_clean(get_parent_nodes, "get_parent_nodes", node)

    def get_clean_child_nodes(node: NodeT) -> List[NodeT]:
        return _apply_and_clean(get_child_nodes, "get_child_nodes", node)

    # The first layer contains nodes with no incoming edges.
    layer = [node for node in nodes if not get_clean_parent_nodes(node)]

    visited_node_ids = set()
    layers = []
    while layer:
        layer = sorted(layer, key=get_node_id_fn)
        layers.append(layer)

        next_layer = []
        for node in layer:
            visited_node_ids.add(get_node_id_fn(node))
            for child_node in get_clean_child_nodes(node):
                # Include the child node if all its parents are visited. If the child
                # node is part of a cycle, it will never be included since it will have
                # at least one unvisited parent node which is also part of the cycle.
                parent_node_ids = set(
                    get_node_id_fn(p)
                    for p in get_clean_parent_nodes(child_node)
                )
                if parent_node_ids.issubset(visited_node_ids):
                    next_layer.append(child_node)
        layer = next_layer

    num_output_nodes = sum(len(layer) for layer in layers)
    # Nodes in cycles are not included in layers; raise an error if this happens.
    if num_output_nodes < len(nodes):
        raise RuntimeError("Cannot sort graph because it contains a cycle.")
    # This should never happen; raise an error if this occurs.
    if num_output_nodes > len(nodes):
        raise RuntimeError("Unknown error occurred while sorting DAG.")

    return layers

utils

Utility functions for the orchestrator.

Classes
register_artifact_store_filesystem(target_artifact_store_id: Optional[UUID])

Context manager for the artifact_store/filesystem_registry dependency.

Even though it is rare, sometimes we bump into cases where we are trying to load artifacts that belong to an artifact store which is different from the active artifact store.

In cases like this, we will try to instantiate the target artifact store by creating the corresponding artifact store Python object, which ends up registering the right filesystem in the filesystem registry.

The problem is, the keys in the filesystem registry are schemes (such as "s3://" or "gcs://"). If we have two artifact stores with the same set of supported schemes, we might end up overwriting the filesystem that belongs to the active artifact store (and its authentication). That's why we have to re-instantiate the active artifact store again, so the correct filesystem will be restored.

Initialization of the context manager.

Parameters:

Name Type Description Default
target_artifact_store_id Optional[UUID]

the ID of the artifact store to load.

required
Source code in src/zenml/orchestrators/utils.py
245
246
247
248
249
250
251
def __init__(self, target_artifact_store_id: Optional[UUID]) -> None:
    """Initialization of the context manager.

    Args:
        target_artifact_store_id: the ID of the artifact store to load.
    """
    self.target_artifact_store_id = target_artifact_store_id
Functions
Functions
get_config_environment_vars(schedule_id: Optional[UUID] = None, pipeline_run_id: Optional[UUID] = None, step_run_id: Optional[UUID] = None) -> Dict[str, str]

Gets environment variables to set for mirroring the active config.

If a schedule ID, pipeline run ID or step run ID is given, and the current client is not authenticated to a server with an API key, the environment variables will be updated to include a newly generated workload API token that will be valid for the duration of the schedule, pipeline run, or step run instead of the current API token used to authenticate the client.

Parameters:

Name Type Description Default
schedule_id Optional[UUID]

Optional schedule ID to use to generate a new API token.

None
pipeline_run_id Optional[UUID]

Optional pipeline run ID to use to generate a new API token.

None
step_run_id Optional[UUID]

Optional step run ID to use to generate a new API token.

None

Returns:

Type Description
Dict[str, str]

Environment variable dict.

Source code in src/zenml/orchestrators/utils.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def get_config_environment_vars(
    schedule_id: Optional[UUID] = None,
    pipeline_run_id: Optional[UUID] = None,
    step_run_id: Optional[UUID] = None,
) -> Dict[str, str]:
    """Gets environment variables to set for mirroring the active config.

    If a schedule ID, pipeline run ID or step run ID is given, and the current
    client is not authenticated to a server with an API key, the environment
    variables will be updated to include a newly generated workload API token
    that will be valid for the duration of the schedule, pipeline run, or step
    run instead of the current API token used to authenticate the client.

    Args:
        schedule_id: Optional schedule ID to use to generate a new API token.
        pipeline_run_id: Optional pipeline run ID to use to generate a new API
            token.
        step_run_id: Optional step run ID to use to generate a new API token.

    Returns:
        Environment variable dict.
    """
    from zenml.login.credentials_store import get_credentials_store
    from zenml.zen_stores.rest_zen_store import RestZenStore

    global_config = GlobalConfiguration()
    environment_vars = global_config.get_config_environment_vars()

    if (
        global_config.store_configuration.type == StoreType.REST
        and global_config.zen_store.get_store_info().auth_scheme
        != AuthScheme.NO_AUTH
    ):
        credentials_store = get_credentials_store()
        url = global_config.store_configuration.url
        api_token = credentials_store.get_token(url, allow_expired=False)
        if schedule_id or pipeline_run_id or step_run_id:
            assert isinstance(global_config.zen_store, RestZenStore)

            # The user has the option to manually set an expiration for the API
            # token generated for a pipeline run. In this case, we generate a new
            # generic API token that will be valid for the indicated duration.
            if (
                pipeline_run_id
                and ZENML_PIPELINE_RUN_API_TOKEN_EXPIRATION != 0
            ):
                logger.warning(
                    f"An unscoped API token will be generated for this pipeline "
                    f"run that will expire after "
                    f"{ZENML_PIPELINE_RUN_API_TOKEN_EXPIRATION} "
                    f"seconds instead of being scoped to the pipeline run "
                    f"and not having an expiration time. This is more insecure "
                    f"because the API token will remain valid even after the "
                    f"pipeline run completes its execution. This option has "
                    "been explicitly enabled by setting the "
                    f"{ENV_ZENML_PIPELINE_RUN_API_TOKEN_EXPIRATION} environment "
                    f"variable"
                )
                new_api_token = global_config.zen_store.get_api_token(
                    token_type=APITokenType.GENERIC,
                    expires_in=ZENML_PIPELINE_RUN_API_TOKEN_EXPIRATION,
                )

            else:
                # If a schedule ID, pipeline run ID or step run ID is supplied,
                # we need to fetch a new workload API token scoped to the
                # schedule, pipeline run or step run.

                # If only a schedule is given, the pipeline run credentials will
                # be valid for the entire duration of the schedule.
                api_key = credentials_store.get_api_key(url)
                if not api_key and not pipeline_run_id and not step_run_id:
                    logger.warning(
                        "An API token without an expiration time will be generated "
                        "and used to run this pipeline on a schedule. This is very "
                        "insecure because the API token will be valid for the "
                        "entire lifetime of the schedule and can be used to access "
                        "your user account if accidentally leaked. When deploying "
                        "a pipeline on a schedule, it is strongly advised to use a "
                        "service account API key to authenticate to the ZenML "
                        "server instead of your regular user account. For more "
                        "information, see "
                        "https://docs.zenml.io/how-to/manage-zenml-server/connecting-to-zenml/connect-with-a-service-account"
                    )

                # The schedule, pipeline run or step run credentials are scoped to
                # the schedule, pipeline run or step run and will only be valid for
                # the duration of the schedule/pipeline run/step run.
                new_api_token = global_config.zen_store.get_api_token(
                    token_type=APITokenType.WORKLOAD,
                    schedule_id=schedule_id,
                    pipeline_run_id=pipeline_run_id,
                    step_run_id=step_run_id,
                )

            environment_vars[ENV_ZENML_STORE_PREFIX + "API_TOKEN"] = (
                new_api_token
            )
        elif api_token:
            # For all other cases, the pipeline run environment is configured
            # with the current access token.
            environment_vars[ENV_ZENML_STORE_PREFIX + "API_TOKEN"] = (
                api_token.access_token
            )

    # Disable credentials caching to avoid storing sensitive information
    # in the pipeline run environment
    environment_vars[ENV_ZENML_DISABLE_CREDENTIALS_DISK_CACHING] = "true"

    # Make sure to use the correct active stack/project which might come
    # from a .zen repository and not the global config
    environment_vars[ENV_ZENML_ACTIVE_STACK_ID] = str(
        Client().active_stack_model.id
    )
    environment_vars[ENV_ZENML_ACTIVE_PROJECT_ID] = str(
        Client().active_project.id
    )

    return environment_vars
get_orchestrator_run_name(pipeline_name: str, max_length: Optional[int] = None) -> str

Gets an orchestrator run name.

This run name is not the same as the ZenML run name but can instead be used to display in the orchestrator UI.

Parameters:

Name Type Description Default
pipeline_name str

Name of the pipeline that will run.

required
max_length Optional[int]

Maximum length of the generated name.

None

Raises:

Type Description
ValueError

If the max length is below 8 characters.

Returns:

Type Description
str

The orchestrator run name.

Source code in src/zenml/orchestrators/utils.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def get_orchestrator_run_name(
    pipeline_name: str, max_length: Optional[int] = None
) -> str:
    """Gets an orchestrator run name.

    This run name is not the same as the ZenML run name but can instead be
    used to display in the orchestrator UI.

    Args:
        pipeline_name: Name of the pipeline that will run.
        max_length: Maximum length of the generated name.

    Raises:
        ValueError: If the max length is below 8 characters.

    Returns:
        The orchestrator run name.
    """
    suffix_length = 32
    pipeline_name = f"{pipeline_name}_"

    if max_length:
        if max_length < 8:
            raise ValueError(
                "Maximum length for orchestrator run name must be 8 or above."
            )

        # Make sure we always have a certain suffix to guarantee no overlap
        # with other runs
        suffix_length = min(32, max(8, max_length - len(pipeline_name)))
        pipeline_name = pipeline_name[: (max_length - suffix_length)]

    suffix = "".join(random.choices("0123456789abcdef", k=suffix_length))

    return f"{pipeline_name}{suffix}"
is_setting_enabled(is_enabled_on_step: Optional[bool], is_enabled_on_pipeline: Optional[bool]) -> bool

Checks if a certain setting is enabled within a step run.

This is the case if: - the setting is explicitly enabled for the step, or - the setting is neither explicitly disabled for the step nor the pipeline.

Parameters:

Name Type Description Default
is_enabled_on_step Optional[bool]

The setting of the step.

required
is_enabled_on_pipeline Optional[bool]

The setting of the pipeline.

required

Returns:

Type Description
bool

True if the setting is enabled within the step run, False otherwise.

Source code in src/zenml/orchestrators/utils.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def is_setting_enabled(
    is_enabled_on_step: Optional[bool],
    is_enabled_on_pipeline: Optional[bool],
) -> bool:
    """Checks if a certain setting is enabled within a step run.

    This is the case if:
    - the setting is explicitly enabled for the step, or
    - the setting is neither explicitly disabled for the step nor the pipeline.

    Args:
        is_enabled_on_step: The setting of the step.
        is_enabled_on_pipeline: The setting of the pipeline.

    Returns:
        True if the setting is enabled within the step run, False otherwise.
    """
    if is_enabled_on_step is not None:
        return is_enabled_on_step
    if is_enabled_on_pipeline is not None:
        return is_enabled_on_pipeline
    return True

wheeled_orchestrator

Wheeled orchestrator class.

Classes
WheeledOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseOrchestrator, ABC

Base class for wheeled orchestrators.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Functions
copy_repository_to_temp_dir_and_add_setup_py() -> str

Copy the repository to a temporary directory and add a setup.py file.

Returns:

Type Description
str

Path to the temporary directory containing the copied repository.

Source code in src/zenml/orchestrators/wheeled_orchestrator.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
    def copy_repository_to_temp_dir_and_add_setup_py(self) -> str:
        """Copy the repository to a temporary directory and add a setup.py file.

        Returns:
            Path to the temporary directory containing the copied repository.
        """
        repo_path = get_source_root()

        self.package_name = f"{DEFAULT_PACKAGE_NAME}_{self.sanitize_name(os.path.basename(repo_path))}"

        # Create a temporary folder
        temp_dir = tempfile.mkdtemp(prefix="zenml-temp-")

        # Create a folder within the temporary directory
        temp_repo_path = os.path.join(temp_dir, self.package_name)
        fileio.mkdir(temp_repo_path)

        # Copy the repository to the temporary directory
        copy_dir(repo_path, temp_repo_path)

        # Create init file in the copied directory
        init_file_path = os.path.join(temp_repo_path, "__init__.py")
        with fileio.open(init_file_path, "w") as f:
            f.write("")

        # Create a setup.py file
        setup_py_content = f"""
from setuptools import setup, find_packages

setup(
    name="{self.package_name}",
    version="{self.package_version}",
    packages=find_packages(),
)
"""
        setup_py_path = os.path.join(temp_dir, "setup.py")
        with fileio.open(setup_py_path, "w") as f:
            f.write(setup_py_content)

        return temp_dir
create_wheel(temp_dir: str) -> str

Create a wheel for the package in the given temporary directory.

Parameters:

Name Type Description Default
temp_dir str

Path to the temporary directory containing the package.

required

Raises:

Type Description
RuntimeError

If the wheel file could not be created.

Returns:

Name Type Description
str str

Path to the created wheel file.

Source code in src/zenml/orchestrators/wheeled_orchestrator.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def create_wheel(self, temp_dir: str) -> str:
    """Create a wheel for the package in the given temporary directory.

    Args:
        temp_dir (str): Path to the temporary directory containing the package.

    Raises:
        RuntimeError: If the wheel file could not be created.

    Returns:
        str: Path to the created wheel file.
    """
    # Change to the temporary directory
    original_dir = os.getcwd()
    os.chdir(temp_dir)

    try:
        # Run the `pip wheel` command to create the wheel
        result = subprocess.run(
            ["pip", "wheel", "."], check=True, capture_output=True
        )
        logger.debug(f"Wheel creation stdout: {result.stdout.decode()}")
        logger.debug(f"Wheel creation stderr: {result.stderr.decode()}")

        # Find the created wheel file
        wheel_file = next(
            (
                file
                for file in os.listdir(temp_dir)
                if file.endswith(".whl")
            ),
            None,
        )

        if wheel_file is None:
            raise RuntimeError("Failed to create wheel file.")

        wheel_path = os.path.join(temp_dir, wheel_file)

        # Verify the wheel file is a valid zip file
        import zipfile

        if not zipfile.is_zipfile(wheel_path):
            raise RuntimeError(
                f"The file {wheel_path} is not a valid zip file."
            )

        return wheel_path
    finally:
        # Change back to the original directory
        os.chdir(original_dir)
sanitize_name(name: str) -> str

Sanitize the value to be used in a cluster name.

Parameters:

Name Type Description Default
name str

Arbitrary input cluster name.

required

Returns:

Type Description
str

Sanitized cluster name.

Source code in src/zenml/orchestrators/wheeled_orchestrator.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def sanitize_name(self, name: str) -> str:
    """Sanitize the value to be used in a cluster name.

    Args:
        name: Arbitrary input cluster name.

    Returns:
        Sanitized cluster name.
    """
    name = re.sub(
        r"[^a-z0-9-]", "-", name.lower()
    )  # replaces any character that is not a lowercase letter, digit, or hyphen with a hyphen
    name = re.sub(r"^[-]+", "", name)  # trim leading hyphens
    name = re.sub(r"[-]+$", "", name)  # trim trailing hyphens
    return name
Functions
Modules