Skip to content

Orchestrators

Initialization for ZenML orchestrators.

An orchestrator is a special kind of backend that manages the running of each step of the pipeline. Orchestrators administer the actual pipeline runs. You can think of it as the 'root' of any pipeline job that you run during your experimentation.

ZenML supports a local orchestrator out of the box which allows you to run your pipelines in a local environment. We also support using Apache Airflow as the orchestrator to handle the steps of your pipeline.

BaseOrchestrator

Bases: StackComponent, ABC

Base class for all orchestrators.

In order to implement an orchestrator you will need to subclass from this class.

How it works:

The run(...) method is the entrypoint that is executed when the pipeline's run method is called within the user code (pipeline_instance.run(...)).

This method will do some internal preparation and then call the prepare_or_run_pipeline(...) method. BaseOrchestrator subclasses must implement this method and either run the pipeline steps directly or deploy the pipeline to some remote infrastructure.

Source code in src/zenml/orchestrators/base_orchestrator.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
class BaseOrchestrator(StackComponent, ABC):
    """Base class for all orchestrators.

    In order to implement an orchestrator you will need to subclass from this
    class.

    How it works:
    -------------
    The `run(...)` method is the entrypoint that is executed when the
    pipeline's run method is called within the user code
    (`pipeline_instance.run(...)`).

    This method will do some internal preparation and then call the
    `prepare_or_run_pipeline(...)` method. BaseOrchestrator subclasses must
    implement this method and either run the pipeline steps directly or deploy
    the pipeline to some remote infrastructure.
    """

    _active_deployment: Optional["PipelineDeploymentResponse"] = None

    @property
    def config(self) -> BaseOrchestratorConfig:
        """Returns the `BaseOrchestratorConfig` config.

        Returns:
            The configuration.
        """
        return cast(BaseOrchestratorConfig, self._config)

    @abstractmethod
    def get_orchestrator_run_id(self) -> str:
        """Returns the run id of the active orchestrator run.

        Important: This needs to be a unique ID and return the same value for
        all steps of a pipeline run.

        Returns:
            The orchestrator run id.
        """

    @abstractmethod
    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeploymentResponse",
        stack: "Stack",
        environment: Dict[str, str],
    ) -> Optional[Iterator[Dict[str, MetadataType]]]:
        """The method needs to be implemented by the respective orchestrator.

        Depending on the type of orchestrator you'll have to perform slightly
        different operations.

        Simple Case:
        ------------
        The Steps are run directly from within the same environment in which
        the orchestrator code is executed. In this case you will need to
        deal with implementation-specific runtime configurations (like the
        schedule) and then iterate through the steps and finally call
        `self.run_step(...)` to execute each step.

        Advanced Case:
        --------------
        Most orchestrators will not run the steps directly. Instead, they
        build some intermediate representation of the pipeline that is then
        used to create and run the pipeline and its steps on the target
        environment. For such orchestrators this method will have to build
        this representation and deploy it.

        Regardless of the implementation details, the orchestrator will need
        to run each step in the target environment. For this the
        `self.run_step(...)` method should be used.

        The easiest way to make this work is by using an entrypoint
        configuration to run single steps (`zenml.entrypoints.step_entrypoint_configuration.StepEntrypointConfiguration`)
        or entire pipelines (`zenml.entrypoints.pipeline_entrypoint_configuration.PipelineEntrypointConfiguration`).

        Args:
            deployment: The pipeline deployment to prepare or run.
            stack: The stack the pipeline will run on.
            environment: Environment variables to set in the orchestration
                environment. These don't need to be set if running locally.

        Yields:
            Metadata for the pipeline run.
        """

    def run(
        self,
        deployment: "PipelineDeploymentResponse",
        stack: "Stack",
        placeholder_run: Optional["PipelineRunResponse"] = None,
    ) -> None:
        """Runs a pipeline on a stack.

        Args:
            deployment: The pipeline deployment.
            stack: The stack on which to run the pipeline.
            placeholder_run: An optional placeholder run for the deployment.
                This will be deleted in case the pipeline deployment failed.
        """
        self._prepare_run(deployment=deployment)

        pipeline_run_id: Optional[UUID] = None
        schedule_id: Optional[UUID] = None
        if deployment.schedule:
            schedule_id = deployment.schedule.id
        if placeholder_run:
            pipeline_run_id = placeholder_run.id

        environment = get_config_environment_vars(
            schedule_id=schedule_id,
            pipeline_run_id=pipeline_run_id,
        )

        prevent_client_side_caching = handle_bool_env_var(
            ENV_ZENML_PREVENT_CLIENT_SIDE_CACHING, default=False
        )

        if (
            placeholder_run
            and self.config.supports_client_side_caching
            and not deployment.schedule
            and not prevent_client_side_caching
        ):
            from zenml.orchestrators import step_run_utils

            cached_invocations = step_run_utils.create_cached_step_runs(
                deployment=deployment,
                pipeline_run=placeholder_run,
                stack=stack,
            )

            for invocation_id in cached_invocations:
                # Remove the cached step invocations from the deployment so
                # the orchestrator does not try to run them
                deployment.step_configurations.pop(invocation_id)

            for step in deployment.step_configurations.values():
                for invocation_id in cached_invocations:
                    if invocation_id in step.spec.upstream_steps:
                        step.spec.upstream_steps.remove(invocation_id)

            if len(deployment.step_configurations) == 0:
                # All steps were cached, we update the pipeline run status and
                # don't actually use the orchestrator to run the pipeline
                self._cleanup_run()
                logger.info("All steps of the pipeline run were cached.")
                return
        else:
            logger.debug("Skipping client-side caching.")

        try:
            if metadata_iterator := self.prepare_or_run_pipeline(
                deployment=deployment,
                stack=stack,
                environment=environment,
            ):
                for metadata_dict in metadata_iterator:
                    try:
                        if placeholder_run:
                            publish_pipeline_run_metadata(
                                pipeline_run_id=placeholder_run.id,
                                pipeline_run_metadata={self.id: metadata_dict},
                            )
                    except Exception as e:
                        logger.debug(
                            "Something went went wrong trying to publish the"
                            f"run metadata: {e}"
                        )
        finally:
            self._cleanup_run()

    def run_step(self, step: "Step") -> None:
        """Runs the given step.

        Args:
            step: The step to run.
        """
        assert self._active_deployment
        launcher = StepLauncher(
            deployment=self._active_deployment,
            step=step,
            orchestrator_run_id=self.get_orchestrator_run_id(),
        )
        launcher.launch()

    @staticmethod
    def requires_resources_in_orchestration_environment(
        step: "Step",
    ) -> bool:
        """Checks if the orchestrator should run this step on special resources.

        Args:
            step: The step that will be checked.

        Returns:
            True if the step requires special resources in the orchestration
            environment, False otherwise.
        """
        # If the step requires custom resources and doesn't run with a step
        # operator, it would need these requirements in the orchestrator
        # environment
        if step.config.step_operator:
            return False

        return not step.config.resource_settings.empty

    def _prepare_run(self, deployment: "PipelineDeploymentResponse") -> None:
        """Prepares a run.

        Args:
            deployment: The deployment to prepare.
        """
        self._active_deployment = deployment

    def _cleanup_run(self) -> None:
        """Cleans up the active run."""
        self._active_deployment = None

    def fetch_status(self, run: "PipelineRunResponse") -> ExecutionStatus:
        """Refreshes the status of a specific pipeline run.

        Args:
            run: A pipeline run response to fetch its status.

        Raises:
            NotImplementedError: If any orchestrator inheriting from the base
                class does not implement this logic.
        """
        raise NotImplementedError(
            "The fetch status functionality is not implemented for the "
            f"'{self.__class__.__name__}' orchestrator."
        )

config property

Returns the BaseOrchestratorConfig config.

Returns:

Type Description
BaseOrchestratorConfig

The configuration.

fetch_status(run)

Refreshes the status of a specific pipeline run.

Parameters:

Name Type Description Default
run PipelineRunResponse

A pipeline run response to fetch its status.

required

Raises:

Type Description
NotImplementedError

If any orchestrator inheriting from the base class does not implement this logic.

Source code in src/zenml/orchestrators/base_orchestrator.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
def fetch_status(self, run: "PipelineRunResponse") -> ExecutionStatus:
    """Refreshes the status of a specific pipeline run.

    Args:
        run: A pipeline run response to fetch its status.

    Raises:
        NotImplementedError: If any orchestrator inheriting from the base
            class does not implement this logic.
    """
    raise NotImplementedError(
        "The fetch status functionality is not implemented for the "
        f"'{self.__class__.__name__}' orchestrator."
    )

get_orchestrator_run_id() abstractmethod

Returns the run id of the active orchestrator run.

Important: This needs to be a unique ID and return the same value for all steps of a pipeline run.

Returns:

Type Description
str

The orchestrator run id.

Source code in src/zenml/orchestrators/base_orchestrator.py
126
127
128
129
130
131
132
133
134
135
@abstractmethod
def get_orchestrator_run_id(self) -> str:
    """Returns the run id of the active orchestrator run.

    Important: This needs to be a unique ID and return the same value for
    all steps of a pipeline run.

    Returns:
        The orchestrator run id.
    """

prepare_or_run_pipeline(deployment, stack, environment) abstractmethod

The method needs to be implemented by the respective orchestrator.

Depending on the type of orchestrator you'll have to perform slightly different operations.

Simple Case:

The Steps are run directly from within the same environment in which the orchestrator code is executed. In this case you will need to deal with implementation-specific runtime configurations (like the schedule) and then iterate through the steps and finally call self.run_step(...) to execute each step.

Advanced Case:

Most orchestrators will not run the steps directly. Instead, they build some intermediate representation of the pipeline that is then used to create and run the pipeline and its steps on the target environment. For such orchestrators this method will have to build this representation and deploy it.

Regardless of the implementation details, the orchestrator will need to run each step in the target environment. For this the self.run_step(...) method should be used.

The easiest way to make this work is by using an entrypoint configuration to run single steps (zenml.entrypoints.step_entrypoint_configuration.StepEntrypointConfiguration) or entire pipelines (zenml.entrypoints.pipeline_entrypoint_configuration.PipelineEntrypointConfiguration).

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment. These don't need to be set if running locally.

required

Yields:

Type Description
Optional[Iterator[Dict[str, MetadataType]]]

Metadata for the pipeline run.

Source code in src/zenml/orchestrators/base_orchestrator.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
@abstractmethod
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
) -> Optional[Iterator[Dict[str, MetadataType]]]:
    """The method needs to be implemented by the respective orchestrator.

    Depending on the type of orchestrator you'll have to perform slightly
    different operations.

    Simple Case:
    ------------
    The Steps are run directly from within the same environment in which
    the orchestrator code is executed. In this case you will need to
    deal with implementation-specific runtime configurations (like the
    schedule) and then iterate through the steps and finally call
    `self.run_step(...)` to execute each step.

    Advanced Case:
    --------------
    Most orchestrators will not run the steps directly. Instead, they
    build some intermediate representation of the pipeline that is then
    used to create and run the pipeline and its steps on the target
    environment. For such orchestrators this method will have to build
    this representation and deploy it.

    Regardless of the implementation details, the orchestrator will need
    to run each step in the target environment. For this the
    `self.run_step(...)` method should be used.

    The easiest way to make this work is by using an entrypoint
    configuration to run single steps (`zenml.entrypoints.step_entrypoint_configuration.StepEntrypointConfiguration`)
    or entire pipelines (`zenml.entrypoints.pipeline_entrypoint_configuration.PipelineEntrypointConfiguration`).

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment. These don't need to be set if running locally.

    Yields:
        Metadata for the pipeline run.
    """

requires_resources_in_orchestration_environment(step) staticmethod

Checks if the orchestrator should run this step on special resources.

Parameters:

Name Type Description Default
step Step

The step that will be checked.

required

Returns:

Type Description
bool

True if the step requires special resources in the orchestration

bool

environment, False otherwise.

Source code in src/zenml/orchestrators/base_orchestrator.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
@staticmethod
def requires_resources_in_orchestration_environment(
    step: "Step",
) -> bool:
    """Checks if the orchestrator should run this step on special resources.

    Args:
        step: The step that will be checked.

    Returns:
        True if the step requires special resources in the orchestration
        environment, False otherwise.
    """
    # If the step requires custom resources and doesn't run with a step
    # operator, it would need these requirements in the orchestrator
    # environment
    if step.config.step_operator:
        return False

    return not step.config.resource_settings.empty

run(deployment, stack, placeholder_run=None)

Runs a pipeline on a stack.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment.

required
stack Stack

The stack on which to run the pipeline.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment. This will be deleted in case the pipeline deployment failed.

None
Source code in src/zenml/orchestrators/base_orchestrator.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def run(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> None:
    """Runs a pipeline on a stack.

    Args:
        deployment: The pipeline deployment.
        stack: The stack on which to run the pipeline.
        placeholder_run: An optional placeholder run for the deployment.
            This will be deleted in case the pipeline deployment failed.
    """
    self._prepare_run(deployment=deployment)

    pipeline_run_id: Optional[UUID] = None
    schedule_id: Optional[UUID] = None
    if deployment.schedule:
        schedule_id = deployment.schedule.id
    if placeholder_run:
        pipeline_run_id = placeholder_run.id

    environment = get_config_environment_vars(
        schedule_id=schedule_id,
        pipeline_run_id=pipeline_run_id,
    )

    prevent_client_side_caching = handle_bool_env_var(
        ENV_ZENML_PREVENT_CLIENT_SIDE_CACHING, default=False
    )

    if (
        placeholder_run
        and self.config.supports_client_side_caching
        and not deployment.schedule
        and not prevent_client_side_caching
    ):
        from zenml.orchestrators import step_run_utils

        cached_invocations = step_run_utils.create_cached_step_runs(
            deployment=deployment,
            pipeline_run=placeholder_run,
            stack=stack,
        )

        for invocation_id in cached_invocations:
            # Remove the cached step invocations from the deployment so
            # the orchestrator does not try to run them
            deployment.step_configurations.pop(invocation_id)

        for step in deployment.step_configurations.values():
            for invocation_id in cached_invocations:
                if invocation_id in step.spec.upstream_steps:
                    step.spec.upstream_steps.remove(invocation_id)

        if len(deployment.step_configurations) == 0:
            # All steps were cached, we update the pipeline run status and
            # don't actually use the orchestrator to run the pipeline
            self._cleanup_run()
            logger.info("All steps of the pipeline run were cached.")
            return
    else:
        logger.debug("Skipping client-side caching.")

    try:
        if metadata_iterator := self.prepare_or_run_pipeline(
            deployment=deployment,
            stack=stack,
            environment=environment,
        ):
            for metadata_dict in metadata_iterator:
                try:
                    if placeholder_run:
                        publish_pipeline_run_metadata(
                            pipeline_run_id=placeholder_run.id,
                            pipeline_run_metadata={self.id: metadata_dict},
                        )
                except Exception as e:
                    logger.debug(
                        "Something went went wrong trying to publish the"
                        f"run metadata: {e}"
                    )
    finally:
        self._cleanup_run()

run_step(step)

Runs the given step.

Parameters:

Name Type Description Default
step Step

The step to run.

required
Source code in src/zenml/orchestrators/base_orchestrator.py
269
270
271
272
273
274
275
276
277
278
279
280
281
def run_step(self, step: "Step") -> None:
    """Runs the given step.

    Args:
        step: The step to run.
    """
    assert self._active_deployment
    launcher = StepLauncher(
        deployment=self._active_deployment,
        step=step,
        orchestrator_run_id=self.get_orchestrator_run_id(),
    )
    launcher.launch()

BaseOrchestratorConfig

Bases: StackComponentConfig

Base orchestrator config.

Source code in src/zenml/orchestrators/base_orchestrator.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class BaseOrchestratorConfig(StackComponentConfig):
    """Base orchestrator config."""

    @model_validator(mode="before")
    @classmethod
    @before_validator_handler
    def _deprecations(cls, data: Dict[str, Any]) -> Dict[str, Any]:
        """Validate and/or remove deprecated fields.

        Args:
            data: The values to validate.

        Returns:
            The validated values.
        """
        if "custom_docker_base_image_name" in data:
            image_name = data.pop("custom_docker_base_image_name", None)
            if image_name:
                logger.warning(
                    "The 'custom_docker_base_image_name' field has been "
                    "deprecated. To use a custom base container image with your "
                    "orchestrators, please use the DockerSettings in your "
                    "pipeline (see https://docs.zenml.io/how-to/infrastructure-deployment/customize-docker-builds)."
                )

        return data

    @property
    def is_synchronous(self) -> bool:
        """Whether the orchestrator runs synchronous or not.

        Returns:
            Whether the orchestrator runs synchronous or not.
        """
        return False

    @property
    def is_schedulable(self) -> bool:
        """Whether the orchestrator is schedulable or not.

        Returns:
            Whether the orchestrator is schedulable or not.
        """
        return False

    @property
    def supports_client_side_caching(self) -> bool:
        """Whether the orchestrator supports client side caching.

        Returns:
            Whether the orchestrator supports client side caching.
        """
        return True

is_schedulable property

Whether the orchestrator is schedulable or not.

Returns:

Type Description
bool

Whether the orchestrator is schedulable or not.

is_synchronous property

Whether the orchestrator runs synchronous or not.

Returns:

Type Description
bool

Whether the orchestrator runs synchronous or not.

supports_client_side_caching property

Whether the orchestrator supports client side caching.

Returns:

Type Description
bool

Whether the orchestrator supports client side caching.

BaseOrchestratorFlavor

Bases: Flavor

Base orchestrator flavor class.

Source code in src/zenml/orchestrators/base_orchestrator.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
class BaseOrchestratorFlavor(Flavor):
    """Base orchestrator flavor class."""

    @property
    def type(self) -> StackComponentType:
        """Returns the flavor type.

        Returns:
            The flavor type.
        """
        return StackComponentType.ORCHESTRATOR

    @property
    def config_class(self) -> Type[BaseOrchestratorConfig]:
        """Config class for the base orchestrator flavor.

        Returns:
            The config class.
        """
        return BaseOrchestratorConfig

    @property
    @abstractmethod
    def implementation_class(self) -> Type["BaseOrchestrator"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """

config_class property

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[BaseOrchestratorConfig]

The config class.

implementation_class abstractmethod property

Implementation class for this flavor.

Returns:

Type Description
Type[BaseOrchestrator]

The implementation class.

type property

Returns the flavor type.

Returns:

Type Description
StackComponentType

The flavor type.

ContainerizedOrchestrator

Bases: BaseOrchestrator, ABC

Base class for containerized orchestrators.

Source code in src/zenml/orchestrators/containerized_orchestrator.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class ContainerizedOrchestrator(BaseOrchestrator, ABC):
    """Base class for containerized orchestrators."""

    @staticmethod
    def get_image(
        deployment: "PipelineDeploymentResponse",
        step_name: Optional[str] = None,
    ) -> str:
        """Gets the Docker image for the pipeline/a step.

        Args:
            deployment: The deployment from which to get the image.
            step_name: Pipeline step name for which to get the image. If not
                given the generic pipeline image will be returned.

        Raises:
            RuntimeError: If the deployment does not have an associated build.

        Returns:
            The image name or digest.
        """
        if not deployment.build:
            raise RuntimeError(
                f"Missing build for deployment {deployment.id}. This is "
                "probably because the build was manually deleted."
            )

        return deployment.build.get_image(
            component_key=ORCHESTRATOR_DOCKER_IMAGE_KEY, step=step_name
        )

    def get_docker_builds(
        self, deployment: "PipelineDeploymentBase"
    ) -> List["BuildConfiguration"]:
        """Gets the Docker builds required for the component.

        Args:
            deployment: The pipeline deployment for which to get the builds.

        Returns:
            The required Docker builds.
        """
        pipeline_settings = deployment.pipeline_configuration.docker_settings

        included_pipeline_build = False
        builds = []

        for name, step in deployment.step_configurations.items():
            step_settings = step.config.docker_settings

            if step_settings != pipeline_settings:
                build = BuildConfiguration(
                    key=ORCHESTRATOR_DOCKER_IMAGE_KEY,
                    settings=step_settings,
                    step_name=name,
                )
                builds.append(build)
            elif not included_pipeline_build:
                pipeline_build = BuildConfiguration(
                    key=ORCHESTRATOR_DOCKER_IMAGE_KEY,
                    settings=pipeline_settings,
                )
                builds.append(pipeline_build)
                included_pipeline_build = True

        return builds

get_docker_builds(deployment)

Gets the Docker builds required for the component.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The pipeline deployment for which to get the builds.

required

Returns:

Type Description
List[BuildConfiguration]

The required Docker builds.

Source code in src/zenml/orchestrators/containerized_orchestrator.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def get_docker_builds(
    self, deployment: "PipelineDeploymentBase"
) -> List["BuildConfiguration"]:
    """Gets the Docker builds required for the component.

    Args:
        deployment: The pipeline deployment for which to get the builds.

    Returns:
        The required Docker builds.
    """
    pipeline_settings = deployment.pipeline_configuration.docker_settings

    included_pipeline_build = False
    builds = []

    for name, step in deployment.step_configurations.items():
        step_settings = step.config.docker_settings

        if step_settings != pipeline_settings:
            build = BuildConfiguration(
                key=ORCHESTRATOR_DOCKER_IMAGE_KEY,
                settings=step_settings,
                step_name=name,
            )
            builds.append(build)
        elif not included_pipeline_build:
            pipeline_build = BuildConfiguration(
                key=ORCHESTRATOR_DOCKER_IMAGE_KEY,
                settings=pipeline_settings,
            )
            builds.append(pipeline_build)
            included_pipeline_build = True

    return builds

get_image(deployment, step_name=None) staticmethod

Gets the Docker image for the pipeline/a step.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The deployment from which to get the image.

required
step_name Optional[str]

Pipeline step name for which to get the image. If not given the generic pipeline image will be returned.

None

Raises:

Type Description
RuntimeError

If the deployment does not have an associated build.

Returns:

Type Description
str

The image name or digest.

Source code in src/zenml/orchestrators/containerized_orchestrator.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@staticmethod
def get_image(
    deployment: "PipelineDeploymentResponse",
    step_name: Optional[str] = None,
) -> str:
    """Gets the Docker image for the pipeline/a step.

    Args:
        deployment: The deployment from which to get the image.
        step_name: Pipeline step name for which to get the image. If not
            given the generic pipeline image will be returned.

    Raises:
        RuntimeError: If the deployment does not have an associated build.

    Returns:
        The image name or digest.
    """
    if not deployment.build:
        raise RuntimeError(
            f"Missing build for deployment {deployment.id}. This is "
            "probably because the build was manually deleted."
        )

    return deployment.build.get_image(
        component_key=ORCHESTRATOR_DOCKER_IMAGE_KEY, step=step_name
    )

LocalDockerOrchestrator

Bases: ContainerizedOrchestrator

Orchestrator responsible for running pipelines locally using Docker.

This orchestrator does not allow for concurrent execution of steps and also does not support running on a schedule.

Source code in src/zenml/orchestrators/local_docker/local_docker_orchestrator.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
class LocalDockerOrchestrator(ContainerizedOrchestrator):
    """Orchestrator responsible for running pipelines locally using Docker.

    This orchestrator does not allow for concurrent execution of steps and also
    does not support running on a schedule.
    """

    @property
    def settings_class(self) -> Optional[Type["BaseSettings"]]:
        """Settings class for the Local Docker orchestrator.

        Returns:
            The settings class.
        """
        return LocalDockerOrchestratorSettings

    @property
    def validator(self) -> Optional[StackValidator]:
        """Ensures there is an image builder in the stack.

        Returns:
            A `StackValidator` instance.
        """
        return StackValidator(
            required_components={StackComponentType.IMAGE_BUILDER}
        )

    def get_orchestrator_run_id(self) -> str:
        """Returns the active orchestrator run id.

        Raises:
            RuntimeError: If the environment variable specifying the run id
                is not set.

        Returns:
            The orchestrator run id.
        """
        try:
            return os.environ[ENV_ZENML_DOCKER_ORCHESTRATOR_RUN_ID]
        except KeyError:
            raise RuntimeError(
                "Unable to read run id from environment variable "
                f"{ENV_ZENML_DOCKER_ORCHESTRATOR_RUN_ID}."
            )

    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeploymentResponse",
        stack: "Stack",
        environment: Dict[str, str],
    ) -> Any:
        """Sequentially runs all pipeline steps in local Docker containers.

        Args:
            deployment: The pipeline deployment to prepare or run.
            stack: The stack the pipeline will run on.
            environment: Environment variables to set in the orchestration
                environment.

        Raises:
            RuntimeError: If a step fails.
        """
        if deployment.schedule:
            logger.warning(
                "Local Docker Orchestrator currently does not support the "
                "use of schedules. The `schedule` will be ignored "
                "and the pipeline will be run immediately."
            )

        docker_client = docker_utils._try_get_docker_client_from_env()

        entrypoint = StepEntrypointConfiguration.get_entrypoint_command()

        # Add the local stores path as a volume mount
        stack.check_local_paths()
        local_stores_path = GlobalConfiguration().local_stores_path
        volumes = {
            local_stores_path: {
                "bind": local_stores_path,
                "mode": "rw",
            }
        }
        orchestrator_run_id = str(uuid4())
        environment[ENV_ZENML_DOCKER_ORCHESTRATOR_RUN_ID] = orchestrator_run_id
        environment[ENV_ZENML_LOCAL_STORES_PATH] = local_stores_path
        start_time = time.time()

        # Run each step
        for step_name, step in deployment.step_configurations.items():
            if self.requires_resources_in_orchestration_environment(step):
                logger.warning(
                    "Specifying step resources is not supported for the local "
                    "Docker orchestrator, ignoring resource configuration for "
                    "step %s.",
                    step_name,
                )

            arguments = StepEntrypointConfiguration.get_entrypoint_arguments(
                step_name=step_name, deployment_id=deployment.id
            )

            settings = cast(
                LocalDockerOrchestratorSettings,
                self.get_settings(step),
            )
            image = self.get_image(deployment=deployment, step_name=step_name)

            user = None
            if sys.platform != "win32":
                user = os.getuid()
            logger.info("Running step `%s` in Docker:", step_name)

            run_args = copy.deepcopy(settings.run_args)
            docker_environment = run_args.pop("environment", {})
            docker_environment.update(environment)

            docker_volumes = run_args.pop("volumes", {})
            docker_volumes.update(volumes)

            extra_hosts = run_args.pop("extra_hosts", {})
            extra_hosts["host.docker.internal"] = "host-gateway"

            try:
                logs = docker_client.containers.run(
                    image=image,
                    entrypoint=entrypoint,
                    command=arguments,
                    user=user,
                    volumes=docker_volumes,
                    environment=docker_environment,
                    stream=True,
                    extra_hosts=extra_hosts,
                    **run_args,
                )

                for line in logs:
                    logger.info(line.strip().decode())
            except ContainerError as e:
                error_message = e.stderr.decode()
                raise RuntimeError(error_message)

        run_duration = time.time() - start_time
        logger.info(
            "Pipeline run has finished in `%s`.",
            string_utils.get_human_readable_time(run_duration),
        )

settings_class property

Settings class for the Local Docker orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator property

Ensures there is an image builder in the stack.

Returns:

Type Description
Optional[StackValidator]

A StackValidator instance.

get_orchestrator_run_id()

Returns the active orchestrator run id.

Raises:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in src/zenml/orchestrators/local_docker/local_docker_orchestrator.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    try:
        return os.environ[ENV_ZENML_DOCKER_ORCHESTRATOR_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_DOCKER_ORCHESTRATOR_RUN_ID}."
        )

prepare_or_run_pipeline(deployment, stack, environment)

Sequentially runs all pipeline steps in local Docker containers.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required

Raises:

Type Description
RuntimeError

If a step fails.

Source code in src/zenml/orchestrators/local_docker/local_docker_orchestrator.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
) -> Any:
    """Sequentially runs all pipeline steps in local Docker containers.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment.

    Raises:
        RuntimeError: If a step fails.
    """
    if deployment.schedule:
        logger.warning(
            "Local Docker Orchestrator currently does not support the "
            "use of schedules. The `schedule` will be ignored "
            "and the pipeline will be run immediately."
        )

    docker_client = docker_utils._try_get_docker_client_from_env()

    entrypoint = StepEntrypointConfiguration.get_entrypoint_command()

    # Add the local stores path as a volume mount
    stack.check_local_paths()
    local_stores_path = GlobalConfiguration().local_stores_path
    volumes = {
        local_stores_path: {
            "bind": local_stores_path,
            "mode": "rw",
        }
    }
    orchestrator_run_id = str(uuid4())
    environment[ENV_ZENML_DOCKER_ORCHESTRATOR_RUN_ID] = orchestrator_run_id
    environment[ENV_ZENML_LOCAL_STORES_PATH] = local_stores_path
    start_time = time.time()

    # Run each step
    for step_name, step in deployment.step_configurations.items():
        if self.requires_resources_in_orchestration_environment(step):
            logger.warning(
                "Specifying step resources is not supported for the local "
                "Docker orchestrator, ignoring resource configuration for "
                "step %s.",
                step_name,
            )

        arguments = StepEntrypointConfiguration.get_entrypoint_arguments(
            step_name=step_name, deployment_id=deployment.id
        )

        settings = cast(
            LocalDockerOrchestratorSettings,
            self.get_settings(step),
        )
        image = self.get_image(deployment=deployment, step_name=step_name)

        user = None
        if sys.platform != "win32":
            user = os.getuid()
        logger.info("Running step `%s` in Docker:", step_name)

        run_args = copy.deepcopy(settings.run_args)
        docker_environment = run_args.pop("environment", {})
        docker_environment.update(environment)

        docker_volumes = run_args.pop("volumes", {})
        docker_volumes.update(volumes)

        extra_hosts = run_args.pop("extra_hosts", {})
        extra_hosts["host.docker.internal"] = "host-gateway"

        try:
            logs = docker_client.containers.run(
                image=image,
                entrypoint=entrypoint,
                command=arguments,
                user=user,
                volumes=docker_volumes,
                environment=docker_environment,
                stream=True,
                extra_hosts=extra_hosts,
                **run_args,
            )

            for line in logs:
                logger.info(line.strip().decode())
        except ContainerError as e:
            error_message = e.stderr.decode()
            raise RuntimeError(error_message)

    run_duration = time.time() - start_time
    logger.info(
        "Pipeline run has finished in `%s`.",
        string_utils.get_human_readable_time(run_duration),
    )

LocalDockerOrchestratorFlavor

Bases: BaseOrchestratorFlavor

Flavor for the local Docker orchestrator.

Source code in src/zenml/orchestrators/local_docker/local_docker_orchestrator.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
class LocalDockerOrchestratorFlavor(BaseOrchestratorFlavor):
    """Flavor for the local Docker orchestrator."""

    @property
    def name(self) -> str:
        """Name of the orchestrator flavor.

        Returns:
            Name of the orchestrator flavor.
        """
        return "local_docker"

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/docker.png"

    @property
    def config_class(self) -> Type[BaseOrchestratorConfig]:
        """Config class for the base orchestrator flavor.

        Returns:
            The config class.
        """
        return LocalDockerOrchestratorConfig

    @property
    def implementation_class(self) -> Type["LocalDockerOrchestrator"]:
        """Implementation class for this flavor.

        Returns:
            Implementation class for this flavor.
        """
        return LocalDockerOrchestrator

config_class property

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[BaseOrchestratorConfig]

The config class.

docs_url property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class property

Implementation class for this flavor.

Returns:

Type Description
Type[LocalDockerOrchestrator]

Implementation class for this flavor.

logo_url property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name property

Name of the orchestrator flavor.

Returns:

Type Description
str

Name of the orchestrator flavor.

sdk_docs_url property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

LocalOrchestrator

Bases: BaseOrchestrator

Orchestrator responsible for running pipelines locally.

This orchestrator does not allow for concurrent execution of steps and also does not support running on a schedule.

Source code in src/zenml/orchestrators/local/local_orchestrator.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class LocalOrchestrator(BaseOrchestrator):
    """Orchestrator responsible for running pipelines locally.

    This orchestrator does not allow for concurrent execution of steps and also
    does not support running on a schedule.
    """

    _orchestrator_run_id: Optional[str] = None

    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeploymentResponse",
        stack: "Stack",
        environment: Dict[str, str],
    ) -> Any:
        """Iterates through all steps and executes them sequentially.

        Args:
            deployment: The pipeline deployment to prepare or run.
            stack: The stack on which the pipeline is deployed.
            environment: Environment variables to set in the orchestration
                environment.
        """
        if deployment.schedule:
            logger.warning(
                "Local Orchestrator currently does not support the "
                "use of schedules. The `schedule` will be ignored "
                "and the pipeline will be run immediately."
            )

        self._orchestrator_run_id = str(uuid4())
        start_time = time.time()

        # Run each step
        for step_name, step in deployment.step_configurations.items():
            if self.requires_resources_in_orchestration_environment(step):
                logger.warning(
                    "Specifying step resources is not supported for the local "
                    "orchestrator, ignoring resource configuration for "
                    "step %s.",
                    step_name,
                )

            self.run_step(
                step=step,
            )

        run_duration = time.time() - start_time
        logger.info(
            "Pipeline run has finished in `%s`.",
            string_utils.get_human_readable_time(run_duration),
        )
        self._orchestrator_run_id = None

    def get_orchestrator_run_id(self) -> str:
        """Returns the active orchestrator run id.

        Raises:
            RuntimeError: If no run id exists. This happens when this method
                gets called while the orchestrator is not running a pipeline.

        Returns:
            The orchestrator run id.
        """
        if not self._orchestrator_run_id:
            raise RuntimeError("No run id set.")

        return self._orchestrator_run_id

get_orchestrator_run_id()

Returns the active orchestrator run id.

Raises:

Type Description
RuntimeError

If no run id exists. This happens when this method gets called while the orchestrator is not running a pipeline.

Returns:

Type Description
str

The orchestrator run id.

Source code in src/zenml/orchestrators/local/local_orchestrator.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If no run id exists. This happens when this method
            gets called while the orchestrator is not running a pipeline.

    Returns:
        The orchestrator run id.
    """
    if not self._orchestrator_run_id:
        raise RuntimeError("No run id set.")

    return self._orchestrator_run_id

prepare_or_run_pipeline(deployment, stack, environment)

Iterates through all steps and executes them sequentially.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack on which the pipeline is deployed.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required
Source code in src/zenml/orchestrators/local/local_orchestrator.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
) -> Any:
    """Iterates through all steps and executes them sequentially.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack on which the pipeline is deployed.
        environment: Environment variables to set in the orchestration
            environment.
    """
    if deployment.schedule:
        logger.warning(
            "Local Orchestrator currently does not support the "
            "use of schedules. The `schedule` will be ignored "
            "and the pipeline will be run immediately."
        )

    self._orchestrator_run_id = str(uuid4())
    start_time = time.time()

    # Run each step
    for step_name, step in deployment.step_configurations.items():
        if self.requires_resources_in_orchestration_environment(step):
            logger.warning(
                "Specifying step resources is not supported for the local "
                "orchestrator, ignoring resource configuration for "
                "step %s.",
                step_name,
            )

        self.run_step(
            step=step,
        )

    run_duration = time.time() - start_time
    logger.info(
        "Pipeline run has finished in `%s`.",
        string_utils.get_human_readable_time(run_duration),
    )
    self._orchestrator_run_id = None

LocalOrchestratorFlavor

Bases: BaseOrchestratorFlavor

Class for the LocalOrchestratorFlavor.

Source code in src/zenml/orchestrators/local/local_orchestrator.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
class LocalOrchestratorFlavor(BaseOrchestratorFlavor):
    """Class for the `LocalOrchestratorFlavor`."""

    @property
    def name(self) -> str:
        """The flavor name.

        Returns:
            The flavor name.
        """
        return "local"

    @property
    def docs_url(self) -> Optional[str]:
        """A URL to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A URL to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A URL to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/local.png"

    @property
    def config_class(self) -> Type[BaseOrchestratorConfig]:
        """Config class for the base orchestrator flavor.

        Returns:
            The config class.
        """
        return LocalOrchestratorConfig

    @property
    def implementation_class(self) -> Type[LocalOrchestrator]:
        """Implementation class for this flavor.

        Returns:
            The implementation class for this flavor.
        """
        return LocalOrchestrator

config_class property

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[BaseOrchestratorConfig]

The config class.

docs_url property

A URL to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class property

Implementation class for this flavor.

Returns:

Type Description
Type[LocalOrchestrator]

The implementation class for this flavor.

logo_url property

A URL to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name property

The flavor name.

Returns:

Type Description
str

The flavor name.

sdk_docs_url property

A URL to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

WheeledOrchestrator

Bases: BaseOrchestrator, ABC

Base class for wheeled orchestrators.

Source code in src/zenml/orchestrators/wheeled_orchestrator.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
class WheeledOrchestrator(BaseOrchestrator, ABC):
    """Base class for wheeled orchestrators."""

    package_name = DEFAULT_PACKAGE_NAME
    package_version = __version__

    def copy_repository_to_temp_dir_and_add_setup_py(self) -> str:
        """Copy the repository to a temporary directory and add a setup.py file.

        Returns:
            Path to the temporary directory containing the copied repository.
        """
        repo_path = get_source_root()

        self.package_name = f"{DEFAULT_PACKAGE_NAME}_{self.sanitize_name(os.path.basename(repo_path))}"

        # Create a temporary folder
        temp_dir = tempfile.mkdtemp(prefix="zenml-temp-")

        # Create a folder within the temporary directory
        temp_repo_path = os.path.join(temp_dir, self.package_name)
        fileio.mkdir(temp_repo_path)

        # Copy the repository to the temporary directory
        copy_dir(repo_path, temp_repo_path)

        # Create init file in the copied directory
        init_file_path = os.path.join(temp_repo_path, "__init__.py")
        with fileio.open(init_file_path, "w") as f:
            f.write("")

        # Create a setup.py file
        setup_py_content = f"""
from setuptools import setup, find_packages

setup(
    name="{self.package_name}",
    version="{self.package_version}",
    packages=find_packages(),
)
"""
        setup_py_path = os.path.join(temp_dir, "setup.py")
        with fileio.open(setup_py_path, "w") as f:
            f.write(setup_py_content)

        return temp_dir

    def create_wheel(self, temp_dir: str) -> str:
        """Create a wheel for the package in the given temporary directory.

        Args:
            temp_dir (str): Path to the temporary directory containing the package.

        Raises:
            RuntimeError: If the wheel file could not be created.

        Returns:
            str: Path to the created wheel file.
        """
        # Change to the temporary directory
        original_dir = os.getcwd()
        os.chdir(temp_dir)

        try:
            # Run the `pip wheel` command to create the wheel
            result = subprocess.run(
                ["pip", "wheel", "."], check=True, capture_output=True
            )
            logger.debug(f"Wheel creation stdout: {result.stdout.decode()}")
            logger.debug(f"Wheel creation stderr: {result.stderr.decode()}")

            # Find the created wheel file
            wheel_file = next(
                (
                    file
                    for file in os.listdir(temp_dir)
                    if file.endswith(".whl")
                ),
                None,
            )

            if wheel_file is None:
                raise RuntimeError("Failed to create wheel file.")

            wheel_path = os.path.join(temp_dir, wheel_file)

            # Verify the wheel file is a valid zip file
            import zipfile

            if not zipfile.is_zipfile(wheel_path):
                raise RuntimeError(
                    f"The file {wheel_path} is not a valid zip file."
                )

            return wheel_path
        finally:
            # Change back to the original directory
            os.chdir(original_dir)

    def sanitize_name(self, name: str) -> str:
        """Sanitize the value to be used in a cluster name.

        Args:
            name: Arbitrary input cluster name.

        Returns:
            Sanitized cluster name.
        """
        name = re.sub(
            r"[^a-z0-9-]", "-", name.lower()
        )  # replaces any character that is not a lowercase letter, digit, or hyphen with a hyphen
        name = re.sub(r"^[-]+", "", name)  # trim leading hyphens
        name = re.sub(r"[-]+$", "", name)  # trim trailing hyphens
        return name

copy_repository_to_temp_dir_and_add_setup_py()

Copy the repository to a temporary directory and add a setup.py file.

Returns:

Type Description
str

Path to the temporary directory containing the copied repository.

Source code in src/zenml/orchestrators/wheeled_orchestrator.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
    def copy_repository_to_temp_dir_and_add_setup_py(self) -> str:
        """Copy the repository to a temporary directory and add a setup.py file.

        Returns:
            Path to the temporary directory containing the copied repository.
        """
        repo_path = get_source_root()

        self.package_name = f"{DEFAULT_PACKAGE_NAME}_{self.sanitize_name(os.path.basename(repo_path))}"

        # Create a temporary folder
        temp_dir = tempfile.mkdtemp(prefix="zenml-temp-")

        # Create a folder within the temporary directory
        temp_repo_path = os.path.join(temp_dir, self.package_name)
        fileio.mkdir(temp_repo_path)

        # Copy the repository to the temporary directory
        copy_dir(repo_path, temp_repo_path)

        # Create init file in the copied directory
        init_file_path = os.path.join(temp_repo_path, "__init__.py")
        with fileio.open(init_file_path, "w") as f:
            f.write("")

        # Create a setup.py file
        setup_py_content = f"""
from setuptools import setup, find_packages

setup(
    name="{self.package_name}",
    version="{self.package_version}",
    packages=find_packages(),
)
"""
        setup_py_path = os.path.join(temp_dir, "setup.py")
        with fileio.open(setup_py_path, "w") as f:
            f.write(setup_py_content)

        return temp_dir

create_wheel(temp_dir)

Create a wheel for the package in the given temporary directory.

Parameters:

Name Type Description Default
temp_dir str

Path to the temporary directory containing the package.

required

Raises:

Type Description
RuntimeError

If the wheel file could not be created.

Returns:

Name Type Description
str str

Path to the created wheel file.

Source code in src/zenml/orchestrators/wheeled_orchestrator.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def create_wheel(self, temp_dir: str) -> str:
    """Create a wheel for the package in the given temporary directory.

    Args:
        temp_dir (str): Path to the temporary directory containing the package.

    Raises:
        RuntimeError: If the wheel file could not be created.

    Returns:
        str: Path to the created wheel file.
    """
    # Change to the temporary directory
    original_dir = os.getcwd()
    os.chdir(temp_dir)

    try:
        # Run the `pip wheel` command to create the wheel
        result = subprocess.run(
            ["pip", "wheel", "."], check=True, capture_output=True
        )
        logger.debug(f"Wheel creation stdout: {result.stdout.decode()}")
        logger.debug(f"Wheel creation stderr: {result.stderr.decode()}")

        # Find the created wheel file
        wheel_file = next(
            (
                file
                for file in os.listdir(temp_dir)
                if file.endswith(".whl")
            ),
            None,
        )

        if wheel_file is None:
            raise RuntimeError("Failed to create wheel file.")

        wheel_path = os.path.join(temp_dir, wheel_file)

        # Verify the wheel file is a valid zip file
        import zipfile

        if not zipfile.is_zipfile(wheel_path):
            raise RuntimeError(
                f"The file {wheel_path} is not a valid zip file."
            )

        return wheel_path
    finally:
        # Change back to the original directory
        os.chdir(original_dir)

sanitize_name(name)

Sanitize the value to be used in a cluster name.

Parameters:

Name Type Description Default
name str

Arbitrary input cluster name.

required

Returns:

Type Description
str

Sanitized cluster name.

Source code in src/zenml/orchestrators/wheeled_orchestrator.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def sanitize_name(self, name: str) -> str:
    """Sanitize the value to be used in a cluster name.

    Args:
        name: Arbitrary input cluster name.

    Returns:
        Sanitized cluster name.
    """
    name = re.sub(
        r"[^a-z0-9-]", "-", name.lower()
    )  # replaces any character that is not a lowercase letter, digit, or hyphen with a hyphen
    name = re.sub(r"^[-]+", "", name)  # trim leading hyphens
    name = re.sub(r"[-]+$", "", name)  # trim trailing hyphens
    return name