Skip to content

Skypilot

zenml.integrations.skypilot

Modules

flavors

Modules
skypilot_orchestrator_base_vm_config

Skypilot orchestrator base config and settings.

Classes
SkypilotBaseOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseOrchestratorConfig, SkypilotBaseOrchestratorSettings

Skypilot orchestrator base config.

Attributes:

Name Type Description
disable_step_based_settings bool

whether to disable step-based settings. If True, the orchestrator will run all steps with the pipeline settings in one single VM. If False, the orchestrator will run each step with its own settings in separate VMs if provided.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_local: bool property

Checks if this stack component is running locally.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

supports_client_side_caching: bool property

Whether the orchestrator supports client side caching.

Returns:

Type Description
bool

Whether the orchestrator supports client side caching.

SkypilotBaseOrchestratorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Skypilot orchestrator base settings.

Attributes:

Name Type Description
instance_type Optional[str]

the instance type to use.

cpus Union[None, int, float, str]

the number of CPUs required for the task. If a str, must be a string of the form '2' or '2+', where the + indicates that the task requires at least 2 CPUs.

memory Union[None, int, float, str]

the amount of memory in GiB required. If a str, must be a string of the form '16' or '16+', where the + indicates that the task requires at least 16 GB of memory.

accelerators Union[None, str, Dict[str, int]]

the accelerators required. If a str, must be a string of the form 'V100' or 'V100:2', where the :2 indicates that the task requires 2 V100 GPUs. If a dict, must be a dict of the form {'V100': 2} or {'tpu-v2-8': 1}.

accelerator_args Optional[Dict[str, str]]

accelerator-specific arguments. For example, {'tpu_vm': True, 'runtime_version': 'tpu-vm-base'} for TPUs.

use_spot Optional[bool]

whether to use spot instances. If None, defaults to False.

job_recovery Union[None, str, Dict[str, Any]]

the spot recovery strategy to use for the managed spot to recover the cluster from preemption. Refer to recovery_strategy module <https://github.com/skypilot-org/skypilot/blob/master/sky/spot/recovery_strategy.py>__ # pylint: disable=line-too-long for more details.

region Optional[str]

the region to use.

zone Optional[str]

the zone to use.

image_id Union[Dict[str, str], str, None]

the image ID to use. If a str, must be a string of the image id from the cloud, such as AWS: 'ami-1234567890abcdef0', GCP: 'projects/my-project-id/global/images/my-image-name'; Or, a image tag provided by SkyPilot, such as AWS: 'skypilot:gpu-ubuntu-2004'. If a dict, must be a dict mapping from region to image ID, such as:

.. code-block:: python

{
'us-west1': 'ami-1234567890abcdef0',
'us-east1': 'ami-1234567890abcdef0'
}
disk_size Optional[int]

the size of the OS disk in GiB.

disk_tier Optional[Literal['high', 'medium', 'low', 'ultra', 'best']]

the disk performance tier to use. If None, defaults to 'medium'.

ports Union[None, int, str, List[Union[int, str]]]

Ports to expose. Could be an integer, a range, or a list of integers and ranges. All ports will be exposed to the public internet.

labels Optional[Dict[str, str]]

Labels to apply to instances as key-value pairs. These are mapped to cloud-specific implementations (instance tags in AWS, instance labels in GCP, etc.)

any_of Optional[List[Dict[str, Any]]]

List of candidate resources to try in order of preference based on cost (determined by the optimizer).

ordered Optional[List[Dict[str, Any]]]

List of candidate resources to try in the specified order.

cluster_name Optional[str]

name of the cluster to create/reuse. If None, auto-generate a name.

retry_until_up bool

whether to retry launching the cluster until it is up.

idle_minutes_to_autostop Optional[int]

automatically stop the cluster after this many minute of idleness, i.e., no running or pending jobs in the cluster's job queue. Idleness gets reset whenever setting-up/ running/pending jobs are found in the job queue. Setting this flag is equivalent to running sky.launch(..., detach_run=True, ...) and then sky.autostop(idle_minutes=<minutes>). If not set, the cluster will not be autostopped.

down bool

Tear down the cluster after all jobs finish (successfully or abnormally). If --idle-minutes-to-autostop is also set, the cluster will be torn down after the specified idle time. Note that if errors occur during provisioning/data syncing/setting up, the cluster will not be torn down for debugging purposes.

stream_logs bool

if True, show the logs in the terminal.

docker_run_args List[str]

Optional arguments to pass to the docker run command running inside the VM.

workdir Optional[str]

Working directory to sync to the VM. Synced to ~/sky_workdir.

task_name Optional[str]

Task name used for display purposes.

file_mounts Optional[Dict[str, Any]]

File and storage mounts configuration for remote cluster.

envs Optional[Dict[str, str]]

Environment variables for the task.

task_settings Dict[str, Any]

Dictionary of arbitrary settings to pass to sky.Task(). This allows passing future parameters added by SkyPilot without requiring updates to ZenML.

resources_settings Dict[str, Any]

Dictionary of arbitrary settings to pass to sky.Resources(). This allows passing future parameters added by SkyPilot without requiring updates to ZenML.

launch_settings Dict[str, Any]

Dictionary of arbitrary settings to pass to sky.launch(). This allows passing future parameters added by SkyPilot without requiring updates to ZenML.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
Functions

orchestrators

Initialization of the Skypilot ZenML orchestrators.

Classes
SkypilotBaseOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: ContainerizedOrchestrator

Base class for Orchestrator responsible for running pipelines remotely in a VM.

This orchestrator does not support running on a schedule.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
cloud: sky.clouds.Cloud abstractmethod property

The type of sky cloud to use.

Returns:

Type Description
Cloud

A sky.clouds.Cloud instance.

config: SkypilotBaseOrchestratorConfig property

Returns the SkypilotBaseOrchestratorConfig config.

Returns:

Type Description
SkypilotBaseOrchestratorConfig

The configuration.

validator: Optional[StackValidator] property

Validates the stack.

In the remote case, checks that the stack contains a container registry, image builder and only remote components.

Returns:

Type Description
Optional[StackValidator]

A StackValidator instance.

Functions
get_orchestrator_run_id() -> str

Returns the active orchestrator run id.

Raises:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    try:
        return os.environ[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID}."
        )
prepare_environment_variable(set: bool = True) -> None abstractmethod

Set up Environment variables that are required for the orchestrator.

Parameters:

Name Type Description Default
set bool

Whether to set the environment variables or not.

True
Source code in src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
148
149
150
151
152
153
154
@abstractmethod
def prepare_environment_variable(self, set: bool = True) -> None:
    """Set up Environment variables that are required for the orchestrator.

    Args:
        set: Whether to set the environment variables or not.
    """
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Any

Runs each pipeline step in a separate Skypilot container.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment.

None

Raises:

Type Description
Exception

If the pipeline run fails.

RuntimeError

If the code is running in a notebook.

Source code in src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Any:
    """Runs each pipeline step in a separate Skypilot container.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment.
        placeholder_run: An optional placeholder run for the deployment.

    Raises:
        Exception: If the pipeline run fails.
        RuntimeError: If the code is running in a notebook.
    """
    # First check whether the code is running in a notebook.
    if Environment.in_notebook():
        raise RuntimeError(
            "The Skypilot orchestrator cannot run pipelines in a notebook "
            "environment. The reason is that it is non-trivial to create "
            "a Docker image of a notebook. Please consider refactoring "
            "your notebook cells into separate scripts in a Python module "
            "and run the code outside of a notebook when using this "
            "orchestrator."
        )
    if deployment.schedule:
        logger.warning(
            "Skypilot Orchestrator currently does not support the "
            "use of schedules. The `schedule` will be ignored "
            "and the pipeline will be run immediately."
        )

    # Set up some variables for configuration
    orchestrator_run_id = str(uuid4())
    environment[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID] = (
        orchestrator_run_id
    )

    settings = cast(
        SkypilotBaseOrchestratorSettings,
        self.get_settings(deployment),
    )

    pipeline_name = deployment.pipeline_configuration.name
    orchestrator_run_name = get_orchestrator_run_name(pipeline_name)

    assert stack.container_registry

    # Get Docker image for the orchestrator pod
    try:
        image = self.get_image(deployment=deployment)
    except KeyError:
        # If no generic pipeline image exists (which means all steps have
        # custom builds) we use a random step image as all of them include
        # dependencies for the active stack
        pipeline_step_name = next(iter(deployment.step_configurations))
        image = self.get_image(
            deployment=deployment, step_name=pipeline_step_name
        )

    different_settings_found = False

    if not self.config.disable_step_based_settings:
        for _, step in deployment.step_configurations.items():
            step_settings = cast(
                SkypilotBaseOrchestratorSettings,
                self.get_settings(step),
            )
            if step_settings != settings:
                different_settings_found = True
                logger.info(
                    "At least one step has different settings than the "
                    "pipeline. The step with different settings will be "
                    "run in a separate VM.\n"
                    "You can configure the orchestrator to disable this "
                    "behavior by updating the `disable_step_based_settings` "
                    "in your orchestrator configuration "
                    "by running the following command: "
                    "`zenml orchestrator update --disable-step-based-settings=True`"
                )
                break

    # Decide which configuration to use based on whether different settings were found
    if (
        not self.config.disable_step_based_settings
        and different_settings_found
    ):
        # Run each step in a separate VM using SkypilotOrchestratorEntrypointConfiguration
        command = SkypilotOrchestratorEntrypointConfiguration.get_entrypoint_command()
        args = SkypilotOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
            run_name=orchestrator_run_name,
            deployment_id=deployment.id,
        )
    else:
        # Run the entire pipeline in one VM using PipelineEntrypointConfiguration
        command = PipelineEntrypointConfiguration.get_entrypoint_command()
        args = PipelineEntrypointConfiguration.get_entrypoint_arguments(
            deployment_id=deployment.id
        )

    entrypoint_str = " ".join(command)
    arguments_str = " ".join(args)

    task_envs = environment.copy()

    # Set up credentials
    self.setup_credentials()

    # Prepare Docker setup
    setup, docker_creds_envs = prepare_docker_setup(
        container_registry_uri=stack.container_registry.config.uri,
        credentials=stack.container_registry.credentials,
        use_sudo=True,  # Base orchestrator uses sudo
    )

    # Update task_envs with Docker credentials
    if docker_creds_envs:
        task_envs.update(docker_creds_envs)

    # Run the entire pipeline

    # Set the service connector AWS profile ENV variable
    self.prepare_environment_variable(set=True)

    try:
        if isinstance(self.cloud, sky.clouds.Kubernetes):
            run_command = f"${{VIRTUAL_ENV:+$VIRTUAL_ENV/bin/}}{entrypoint_str} {arguments_str}"
            setup = None
            down = False
            idle_minutes_to_autostop = None
        else:
            run_command = create_docker_run_command(
                image=image,
                entrypoint_str=entrypoint_str,
                arguments_str=arguments_str,
                environment=task_envs,
                docker_run_args=settings.docker_run_args,
                use_sudo=True,  # Base orchestrator uses sudo
            )
            down = settings.down
            idle_minutes_to_autostop = settings.idle_minutes_to_autostop

        # Create the Task with all parameters and task settings
        task_kwargs = prepare_task_kwargs(
            settings=settings,
            run_command=run_command,
            setup=setup,
            task_envs=task_envs,
            task_name=f"{orchestrator_run_name}",
        )

        task = sky.Task(**task_kwargs)
        logger.debug(f"Running run: {run_command}")

        # Set resources with all parameters and resource settings
        resources_kwargs = prepare_resources_kwargs(
            cloud=self.cloud,
            settings=settings,
            default_instance_type=self.DEFAULT_INSTANCE_TYPE,
            kubernetes_image=image
            if isinstance(self.cloud, sky.clouds.Kubernetes)
            else None,
        )

        task = task.set_resources(sky.Resources(**resources_kwargs))

        launch_new_cluster = True
        if settings.cluster_name:
            status_request_id = sky.status(
                refresh=StatusRefreshMode.AUTO,
                cluster_names=[settings.cluster_name],
            )
            cluster_info = sky.stream_and_get(status_request_id)

            if cluster_info:
                logger.info(
                    f"Found existing cluster {settings.cluster_name}. Reusing..."
                )
                launch_new_cluster = False

            else:
                logger.info(
                    f"Cluster {settings.cluster_name} not found. Launching a new one..."
                )
                cluster_name = settings.cluster_name
        else:
            cluster_name = sanitize_cluster_name(
                f"{orchestrator_run_name}"
            )
            logger.info(
                f"No cluster name provided. Launching a new cluster with name {cluster_name}..."
            )

        if launch_new_cluster:
            # Prepare launch parameters with additional launch settings
            launch_kwargs = prepare_launch_kwargs(
                settings=settings,
                down=down,
                idle_minutes_to_autostop=idle_minutes_to_autostop,
            )
            logger.info(
                f"Launching the task on a new cluster: {cluster_name}"
            )
            launch_job_id = sky.launch(
                task,
                cluster_name,
                **launch_kwargs,
            )
            sky_job_get(launch_job_id, settings.stream_logs, cluster_name)

        else:
            # Prepare exec parameters with additional launch settings
            exec_kwargs = {
                "down": down,
                "backend": None,
                **settings.launch_settings,  # Can reuse same settings for exec
            }

            # Remove None values to avoid overriding SkyPilot defaults
            exec_kwargs = {
                k: v for k, v in exec_kwargs.items() if v is not None
            }

            # Make sure the cluster is up
            start_request_id = sky.start(
                settings.cluster_name,
                down=down,
                idle_minutes_to_autostop=idle_minutes_to_autostop,
                retry_until_up=settings.retry_until_up,
            )
            sky.stream_and_get(start_request_id)

            logger.info(
                f"Executing the task on the cluster: {settings.cluster_name}"
            )
            exec_job_id = sky.exec(
                task,
                cluster_name=settings.cluster_name,
                **exec_kwargs,
            )
            assert settings.cluster_name is not None
            sky_job_get(
                exec_job_id, settings.stream_logs, settings.cluster_name
            )

    except Exception as e:
        logger.error(f"Pipeline run failed: {e}")
        raise

    finally:
        # Unset the service connector AWS profile ENV variable
        self.prepare_environment_variable(set=False)
setup_credentials() -> None

Set up credentials for the orchestrator.

Source code in src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
142
143
144
145
146
def setup_credentials(self) -> None:
    """Set up credentials for the orchestrator."""
    connector = self.get_connector()
    assert connector is not None
    connector.configure_local_client()
Modules
skypilot_base_vm_orchestrator

Implementation of the Skypilot base VM orchestrator.

Classes
SkypilotBaseOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: ContainerizedOrchestrator

Base class for Orchestrator responsible for running pipelines remotely in a VM.

This orchestrator does not support running on a schedule.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
cloud: sky.clouds.Cloud abstractmethod property

The type of sky cloud to use.

Returns:

Type Description
Cloud

A sky.clouds.Cloud instance.

config: SkypilotBaseOrchestratorConfig property

Returns the SkypilotBaseOrchestratorConfig config.

Returns:

Type Description
SkypilotBaseOrchestratorConfig

The configuration.

validator: Optional[StackValidator] property

Validates the stack.

In the remote case, checks that the stack contains a container registry, image builder and only remote components.

Returns:

Type Description
Optional[StackValidator]

A StackValidator instance.

Functions
get_orchestrator_run_id() -> str

Returns the active orchestrator run id.

Raises:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    try:
        return os.environ[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID}."
        )
prepare_environment_variable(set: bool = True) -> None abstractmethod

Set up Environment variables that are required for the orchestrator.

Parameters:

Name Type Description Default
set bool

Whether to set the environment variables or not.

True
Source code in src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
148
149
150
151
152
153
154
@abstractmethod
def prepare_environment_variable(self, set: bool = True) -> None:
    """Set up Environment variables that are required for the orchestrator.

    Args:
        set: Whether to set the environment variables or not.
    """
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Any

Runs each pipeline step in a separate Skypilot container.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment.

None

Raises:

Type Description
Exception

If the pipeline run fails.

RuntimeError

If the code is running in a notebook.

Source code in src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Any:
    """Runs each pipeline step in a separate Skypilot container.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment.
        placeholder_run: An optional placeholder run for the deployment.

    Raises:
        Exception: If the pipeline run fails.
        RuntimeError: If the code is running in a notebook.
    """
    # First check whether the code is running in a notebook.
    if Environment.in_notebook():
        raise RuntimeError(
            "The Skypilot orchestrator cannot run pipelines in a notebook "
            "environment. The reason is that it is non-trivial to create "
            "a Docker image of a notebook. Please consider refactoring "
            "your notebook cells into separate scripts in a Python module "
            "and run the code outside of a notebook when using this "
            "orchestrator."
        )
    if deployment.schedule:
        logger.warning(
            "Skypilot Orchestrator currently does not support the "
            "use of schedules. The `schedule` will be ignored "
            "and the pipeline will be run immediately."
        )

    # Set up some variables for configuration
    orchestrator_run_id = str(uuid4())
    environment[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID] = (
        orchestrator_run_id
    )

    settings = cast(
        SkypilotBaseOrchestratorSettings,
        self.get_settings(deployment),
    )

    pipeline_name = deployment.pipeline_configuration.name
    orchestrator_run_name = get_orchestrator_run_name(pipeline_name)

    assert stack.container_registry

    # Get Docker image for the orchestrator pod
    try:
        image = self.get_image(deployment=deployment)
    except KeyError:
        # If no generic pipeline image exists (which means all steps have
        # custom builds) we use a random step image as all of them include
        # dependencies for the active stack
        pipeline_step_name = next(iter(deployment.step_configurations))
        image = self.get_image(
            deployment=deployment, step_name=pipeline_step_name
        )

    different_settings_found = False

    if not self.config.disable_step_based_settings:
        for _, step in deployment.step_configurations.items():
            step_settings = cast(
                SkypilotBaseOrchestratorSettings,
                self.get_settings(step),
            )
            if step_settings != settings:
                different_settings_found = True
                logger.info(
                    "At least one step has different settings than the "
                    "pipeline. The step with different settings will be "
                    "run in a separate VM.\n"
                    "You can configure the orchestrator to disable this "
                    "behavior by updating the `disable_step_based_settings` "
                    "in your orchestrator configuration "
                    "by running the following command: "
                    "`zenml orchestrator update --disable-step-based-settings=True`"
                )
                break

    # Decide which configuration to use based on whether different settings were found
    if (
        not self.config.disable_step_based_settings
        and different_settings_found
    ):
        # Run each step in a separate VM using SkypilotOrchestratorEntrypointConfiguration
        command = SkypilotOrchestratorEntrypointConfiguration.get_entrypoint_command()
        args = SkypilotOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
            run_name=orchestrator_run_name,
            deployment_id=deployment.id,
        )
    else:
        # Run the entire pipeline in one VM using PipelineEntrypointConfiguration
        command = PipelineEntrypointConfiguration.get_entrypoint_command()
        args = PipelineEntrypointConfiguration.get_entrypoint_arguments(
            deployment_id=deployment.id
        )

    entrypoint_str = " ".join(command)
    arguments_str = " ".join(args)

    task_envs = environment.copy()

    # Set up credentials
    self.setup_credentials()

    # Prepare Docker setup
    setup, docker_creds_envs = prepare_docker_setup(
        container_registry_uri=stack.container_registry.config.uri,
        credentials=stack.container_registry.credentials,
        use_sudo=True,  # Base orchestrator uses sudo
    )

    # Update task_envs with Docker credentials
    if docker_creds_envs:
        task_envs.update(docker_creds_envs)

    # Run the entire pipeline

    # Set the service connector AWS profile ENV variable
    self.prepare_environment_variable(set=True)

    try:
        if isinstance(self.cloud, sky.clouds.Kubernetes):
            run_command = f"${{VIRTUAL_ENV:+$VIRTUAL_ENV/bin/}}{entrypoint_str} {arguments_str}"
            setup = None
            down = False
            idle_minutes_to_autostop = None
        else:
            run_command = create_docker_run_command(
                image=image,
                entrypoint_str=entrypoint_str,
                arguments_str=arguments_str,
                environment=task_envs,
                docker_run_args=settings.docker_run_args,
                use_sudo=True,  # Base orchestrator uses sudo
            )
            down = settings.down
            idle_minutes_to_autostop = settings.idle_minutes_to_autostop

        # Create the Task with all parameters and task settings
        task_kwargs = prepare_task_kwargs(
            settings=settings,
            run_command=run_command,
            setup=setup,
            task_envs=task_envs,
            task_name=f"{orchestrator_run_name}",
        )

        task = sky.Task(**task_kwargs)
        logger.debug(f"Running run: {run_command}")

        # Set resources with all parameters and resource settings
        resources_kwargs = prepare_resources_kwargs(
            cloud=self.cloud,
            settings=settings,
            default_instance_type=self.DEFAULT_INSTANCE_TYPE,
            kubernetes_image=image
            if isinstance(self.cloud, sky.clouds.Kubernetes)
            else None,
        )

        task = task.set_resources(sky.Resources(**resources_kwargs))

        launch_new_cluster = True
        if settings.cluster_name:
            status_request_id = sky.status(
                refresh=StatusRefreshMode.AUTO,
                cluster_names=[settings.cluster_name],
            )
            cluster_info = sky.stream_and_get(status_request_id)

            if cluster_info:
                logger.info(
                    f"Found existing cluster {settings.cluster_name}. Reusing..."
                )
                launch_new_cluster = False

            else:
                logger.info(
                    f"Cluster {settings.cluster_name} not found. Launching a new one..."
                )
                cluster_name = settings.cluster_name
        else:
            cluster_name = sanitize_cluster_name(
                f"{orchestrator_run_name}"
            )
            logger.info(
                f"No cluster name provided. Launching a new cluster with name {cluster_name}..."
            )

        if launch_new_cluster:
            # Prepare launch parameters with additional launch settings
            launch_kwargs = prepare_launch_kwargs(
                settings=settings,
                down=down,
                idle_minutes_to_autostop=idle_minutes_to_autostop,
            )
            logger.info(
                f"Launching the task on a new cluster: {cluster_name}"
            )
            launch_job_id = sky.launch(
                task,
                cluster_name,
                **launch_kwargs,
            )
            sky_job_get(launch_job_id, settings.stream_logs, cluster_name)

        else:
            # Prepare exec parameters with additional launch settings
            exec_kwargs = {
                "down": down,
                "backend": None,
                **settings.launch_settings,  # Can reuse same settings for exec
            }

            # Remove None values to avoid overriding SkyPilot defaults
            exec_kwargs = {
                k: v for k, v in exec_kwargs.items() if v is not None
            }

            # Make sure the cluster is up
            start_request_id = sky.start(
                settings.cluster_name,
                down=down,
                idle_minutes_to_autostop=idle_minutes_to_autostop,
                retry_until_up=settings.retry_until_up,
            )
            sky.stream_and_get(start_request_id)

            logger.info(
                f"Executing the task on the cluster: {settings.cluster_name}"
            )
            exec_job_id = sky.exec(
                task,
                cluster_name=settings.cluster_name,
                **exec_kwargs,
            )
            assert settings.cluster_name is not None
            sky_job_get(
                exec_job_id, settings.stream_logs, settings.cluster_name
            )

    except Exception as e:
        logger.error(f"Pipeline run failed: {e}")
        raise

    finally:
        # Unset the service connector AWS profile ENV variable
        self.prepare_environment_variable(set=False)
setup_credentials() -> None

Set up credentials for the orchestrator.

Source code in src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
142
143
144
145
146
def setup_credentials(self) -> None:
    """Set up credentials for the orchestrator."""
    connector = self.get_connector()
    assert connector is not None
    connector.configure_local_client()
Functions
skypilot_orchestrator_entrypoint

Entrypoint of the Skypilot master/orchestrator VM.

Classes Functions
main() -> None

Entrypoint of the Skypilot master/orchestrator VM.

This is the entrypoint of the Skypilot master/orchestrator VM. It is responsible for provisioning the VM and running the pipeline steps in separate VMs.

The VM is provisioned using the sky library. The pipeline steps are run using the sky library as well.

Raises:

Type Description
TypeError

If the active stack's orchestrator is not an instance of SkypilotBaseOrchestrator.

ValueError

If the active stack's container registry is None.

Exception

If the orchestration or one of the steps fails.

Source code in src/zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def main() -> None:
    """Entrypoint of the Skypilot master/orchestrator VM.

    This is the entrypoint of the Skypilot master/orchestrator VM. It is
    responsible for provisioning the VM and running the pipeline steps in
    separate VMs.

    The VM is provisioned using the `sky` library. The pipeline steps are run
    using the `sky` library as well.

    Raises:
        TypeError: If the active stack's orchestrator is not an instance of
            SkypilotBaseOrchestrator.
        ValueError: If the active stack's container registry is None.
        Exception: If the orchestration or one of the steps fails.
    """
    # Log to the container's stdout so it can be streamed by the client.
    logger.info("Skypilot orchestrator VM started.")

    # Parse / extract args.
    args = parse_args()
    orchestrator_run_id = socket.gethostname()

    run = None

    try:
        deployment = Client().get_deployment(args.deployment_id)

        pipeline_dag = {
            step_name: step.spec.upstream_steps
            for step_name, step in deployment.step_configurations.items()
        }
        step_command = StepEntrypointConfiguration.get_entrypoint_command()
        entrypoint_str = " ".join(step_command)

        active_stack = Client().active_stack

        orchestrator = active_stack.orchestrator
        if not isinstance(orchestrator, SkypilotBaseOrchestrator):
            raise TypeError(
                "The active stack's orchestrator is not an instance of SkypilotBaseOrchestrator."
            )

        # Set up credentials
        orchestrator.setup_credentials()

        # Set the service connector AWS profile ENV variable
        orchestrator.prepare_environment_variable(set=True)

        # get active container registry
        container_registry = active_stack.container_registry
        if container_registry is None:
            raise ValueError("Container registry cannot be None.")

        # Prepare Docker setup
        setup, task_envs = prepare_docker_setup(
            container_registry_uri=container_registry.config.uri,
            credentials=container_registry.credentials,
            use_sudo=False,  # Entrypoint doesn't use sudo
        )

        unique_resource_configs: Dict[str, str] = {}
        for step_name, step in deployment.step_configurations.items():
            settings = cast(
                SkypilotBaseOrchestratorSettings,
                orchestrator.get_settings(step),
            )
            # Handle both str and Dict[str, int] types for accelerators
            if isinstance(settings.accelerators, dict):
                accelerators_hashable = frozenset(
                    settings.accelerators.items()
                )
            elif isinstance(settings.accelerators, str):
                accelerators_hashable = frozenset({(settings.accelerators, 1)})
            else:
                accelerators_hashable = None
            resource_config = (
                settings.instance_type,
                settings.cpus,
                settings.memory,
                settings.disk_size,  # Assuming disk_size is part of the settings
                settings.disk_tier,  # Assuming disk_tier is part of the settings
                settings.use_spot,
                settings.job_recovery,
                settings.region,
                settings.zone,
                accelerators_hashable,
            )
            cluster_name_parts = [
                sanitize_cluster_name(str(part))
                for part in resource_config
                if part is not None
            ]
            cluster_name = f"cluster-{orchestrator_run_id}" + "-".join(
                cluster_name_parts
            )
            unique_resource_configs[step_name] = cluster_name

        run = Client().list_pipeline_runs(
            sort_by="asc:created",
            size=1,
            deployment_id=args.deployment_id,
            status=ExecutionStatus.INITIALIZING,
        )[0]

        logger.info("Fetching pipeline run: %s", run.id)

        def run_step_on_skypilot_vm(step_name: str) -> None:
            """Run a pipeline step in a separate Skypilot VM.

            Args:
                step_name: Name of the step.

            Raises:
                Exception: If the step execution fails.
            """
            logger.info(f"Running step `{step_name}` on a VM...")
            try:
                cluster_name = unique_resource_configs[step_name]

                image = SkypilotBaseOrchestrator.get_image(
                    deployment=deployment, step_name=step_name
                )

                step_args = (
                    StepEntrypointConfiguration.get_entrypoint_arguments(
                        step_name=step_name, deployment_id=deployment.id
                    )
                )
                arguments_str = " ".join(step_args)

                step = deployment.step_configurations[step_name]
                settings = cast(
                    SkypilotBaseOrchestratorSettings,
                    orchestrator.get_settings(step),
                )
                env = get_config_environment_vars()
                env[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID] = (
                    orchestrator_run_id
                )

                # Create the Docker run command
                run_command = create_docker_run_command(
                    image=image,
                    entrypoint_str=entrypoint_str,
                    arguments_str=arguments_str,
                    environment=env,
                    docker_run_args=settings.docker_run_args,
                    use_sudo=False,  # Entrypoint doesn't use sudo
                )

                task_name = f"{deployment.id}-{step_name}-{time.time()}"

                # Create task kwargs
                task_kwargs = prepare_task_kwargs(
                    settings=settings,
                    run_command=run_command,
                    setup=setup,
                    task_envs=task_envs,
                    task_name=task_name,
                )

                task = sky.Task(**task_kwargs)

                # Set resources
                resources_kwargs = prepare_resources_kwargs(
                    cloud=orchestrator.cloud,
                    settings=settings,
                    default_instance_type=orchestrator.DEFAULT_INSTANCE_TYPE,
                )

                task = task.set_resources(sky.Resources(**resources_kwargs))

                # Prepare launch parameters
                launch_kwargs = prepare_launch_kwargs(
                    settings=settings,
                )

                # sky.launch now returns a request ID (async). Capture it so we can
                # optionally stream logs and block until completion when desired.
                launch_request_id = sky.launch(
                    task,
                    cluster_name,
                    **launch_kwargs,
                )
                sky_job_get(launch_request_id, True, cluster_name)

                # Pop the resource configuration for this step
                unique_resource_configs.pop(step_name)

                if cluster_name in unique_resource_configs.values():
                    # If there are more steps using this configuration, skip deprovisioning the cluster
                    logger.info(
                        f"Resource configuration for cluster '{cluster_name}' "
                        "is used by subsequent steps. Skipping the deprovisioning of "
                        "the cluster."
                    )
                else:
                    # If there are no more steps using this configuration, down the cluster
                    logger.info(
                        f"Resource configuration for cluster '{cluster_name}' "
                        "is not used by subsequent steps. deprovisioning the cluster."
                    )
                    down_request_id = sky.down(cluster_name)
                    # Wait for the cluster to be terminated
                    sky.stream_and_get(down_request_id)

                logger.info(
                    f"Running step `{step_name}` on a VM is completed."
                )

            except Exception as e:
                logger.error(f"Failed while launching step `{step_name}`: {e}")
                raise

        dag_runner = ThreadedDagRunner(
            dag=pipeline_dag, run_fn=run_step_on_skypilot_vm
        )
        dag_runner.run()

        failed_nodes = []
        for node in dag_runner.nodes:
            if dag_runner.node_states[node] == NodeStatus.FAILED:
                failed_nodes.append(node)

        if failed_nodes:
            raise Exception(f"One or more steps failed: {failed_nodes}")

    except Exception as e:
        logger.error(f"Orchestrator failed: {e}")

        # Try to mark the pipeline run as failed
        if run:
            publish_failed_pipeline_run(run.id)
            logger.info("Marked pipeline run as failed in ZenML.")
        raise
parse_args() -> argparse.Namespace

Parse entrypoint arguments.

Returns:

Type Description
Namespace

Parsed args.

Source code in src/zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint.py
54
55
56
57
58
59
60
61
62
63
def parse_args() -> argparse.Namespace:
    """Parse entrypoint arguments.

    Returns:
        Parsed args.
    """
    parser = argparse.ArgumentParser()
    parser.add_argument("--run_name", type=str, required=True)
    parser.add_argument("--deployment_id", type=str, required=True)
    return parser.parse_args()
skypilot_orchestrator_entrypoint_configuration

Entrypoint configuration for the Skypilot master/orchestrator VM.

Classes
SkypilotOrchestratorEntrypointConfiguration

Entrypoint configuration for the Skypilot master/orchestrator VM.

Functions
get_entrypoint_arguments(run_name: str, deployment_id: UUID) -> List[str] classmethod

Gets all arguments that the entrypoint command should be called with.

Parameters:

Name Type Description Default
run_name str

Name of the ZenML run.

required
deployment_id UUID

ID of the deployment.

required

Returns:

Type Description
List[str]

List of entrypoint arguments.

Source code in src/zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint_configuration.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@classmethod
def get_entrypoint_arguments(
    cls,
    run_name: str,
    deployment_id: "UUID",
) -> List[str]:
    """Gets all arguments that the entrypoint command should be called with.

    Args:
        run_name: Name of the ZenML run.
        deployment_id: ID of the deployment.

    Returns:
        List of entrypoint arguments.
    """
    args = [
        f"--{RUN_NAME_OPTION}",
        run_name,
        f"--{DEPLOYMENT_ID_OPTION}",
        str(deployment_id),
    ]

    return args
get_entrypoint_command() -> List[str] classmethod

Returns a command that runs the entrypoint module.

Returns:

Type Description
List[str]

Entrypoint command.

Source code in src/zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint_configuration.py
41
42
43
44
45
46
47
48
49
50
51
52
53
@classmethod
def get_entrypoint_command(cls) -> List[str]:
    """Returns a command that runs the entrypoint module.

    Returns:
        Entrypoint command.
    """
    command = [
        "python",
        "-m",
        "zenml.integrations.skypilot.orchestrators.skypilot_orchestrator_entrypoint",
    ]
    return command
get_entrypoint_options() -> Set[str] classmethod

Gets all the options required for running this entrypoint.

Returns:

Type Description
Set[str]

Entrypoint options.

Source code in src/zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint_configuration.py
28
29
30
31
32
33
34
35
36
37
38
39
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
    """Gets all the options required for running this entrypoint.

    Returns:
        Entrypoint options.
    """
    options = {
        RUN_NAME_OPTION,
        DEPLOYMENT_ID_OPTION,
    }
    return options

utils

Utility functions for Skypilot orchestrators.

Classes
Functions
create_docker_run_command(image: str, entrypoint_str: str, arguments_str: str, environment: Dict[str, str], docker_run_args: List[str], use_sudo: bool = True) -> str

Create a Docker run command string.

Parameters:

Name Type Description Default
image str

Docker image to run.

required
entrypoint_str str

Entrypoint command.

required
arguments_str str

Command arguments.

required
environment Dict[str, str]

Environment variables.

required
docker_run_args List[str]

Additional Docker run arguments.

required
use_sudo bool

Whether to use sudo prefix in docker commands.

True

Returns:

Type Description
str

Docker run command as string.

Source code in src/zenml/integrations/skypilot/utils.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def create_docker_run_command(
    image: str,
    entrypoint_str: str,
    arguments_str: str,
    environment: Dict[str, str],
    docker_run_args: List[str],
    use_sudo: bool = True,
) -> str:
    """Create a Docker run command string.

    Args:
        image: Docker image to run.
        entrypoint_str: Entrypoint command.
        arguments_str: Command arguments.
        environment: Environment variables.
        docker_run_args: Additional Docker run arguments.
        use_sudo: Whether to use sudo prefix in docker commands.

    Returns:
        Docker run command as string.
    """
    docker_environment_str = " ".join(
        f"-e {k}={v}" for k, v in environment.items()
    )
    custom_run_args = " ".join(docker_run_args)
    if custom_run_args:
        custom_run_args += " "

    sudo_prefix = "sudo " if use_sudo else ""
    return f"{sudo_prefix}docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}"
prepare_docker_setup(container_registry_uri: str, credentials: Optional[Tuple[str, str]] = None, use_sudo: bool = True) -> Tuple[Optional[str], Dict[str, str]]

Prepare Docker login setup command and environment variables.

Parameters:

Name Type Description Default
container_registry_uri str

URI of the container registry.

required
credentials Optional[Tuple[str, str]]

Optional credentials (username, password) tuple.

None
use_sudo bool

Whether to use sudo prefix in docker commands.

True

Returns:

Type Description
Tuple[Optional[str], Dict[str, str]]

Tuple of (setup command, environment variables)

Source code in src/zenml/integrations/skypilot/utils.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def prepare_docker_setup(
    container_registry_uri: str,
    credentials: Optional[Tuple[str, str]] = None,
    use_sudo: bool = True,
) -> Tuple[Optional[str], Dict[str, str]]:
    """Prepare Docker login setup command and environment variables.

    Args:
        container_registry_uri: URI of the container registry.
        credentials: Optional credentials (username, password) tuple.
        use_sudo: Whether to use sudo prefix in docker commands.

    Returns:
        Tuple of (setup command, environment variables)
    """
    if credentials:
        docker_username, docker_password = credentials
        sudo_prefix = "sudo " if use_sudo else ""
        setup = (
            f"{sudo_prefix}docker login --username $DOCKER_USERNAME --password "
            f"$DOCKER_PASSWORD {container_registry_uri}"
        )
        task_envs = {
            "DOCKER_USERNAME": docker_username,
            "DOCKER_PASSWORD": docker_password,
        }
    else:
        setup = None
        task_envs = {}

    return setup, task_envs
prepare_launch_kwargs(settings: SkypilotBaseOrchestratorSettings, down: Optional[bool] = None, idle_minutes_to_autostop: Optional[int] = None) -> Dict[str, Any]

Prepare launch keyword arguments for sky.launch.

Parameters:

Name Type Description Default
settings SkypilotBaseOrchestratorSettings

Skypilot orchestrator settings.

required
down Optional[bool]

Whether to tear down the cluster after job completion.

None
idle_minutes_to_autostop Optional[int]

Minutes to autostop after idleness.

None

Returns:

Type Description
Dict[str, Any]

Launch keyword arguments dictionary.

Source code in src/zenml/integrations/skypilot/utils.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def prepare_launch_kwargs(
    settings: SkypilotBaseOrchestratorSettings,
    down: Optional[bool] = None,
    idle_minutes_to_autostop: Optional[int] = None,
) -> Dict[str, Any]:
    """Prepare launch keyword arguments for sky.launch.

    Args:
        settings: Skypilot orchestrator settings.
        down: Whether to tear down the cluster after job completion.
        idle_minutes_to_autostop: Minutes to autostop after idleness.

    Returns:
        Launch keyword arguments dictionary.
    """
    # Determine values falling back to settings where applicable
    down_value = down if down is not None else settings.down
    idle_value = (
        idle_minutes_to_autostop
        if idle_minutes_to_autostop is not None
        else settings.idle_minutes_to_autostop
    )

    # The following parameters were removed from sky.launch in versions > 0.8.
    # We therefore no longer include them in the kwargs passed to the call.
    # • stream_logs – handled by explicitly calling sky.stream_and_get
    # • detach_setup / detach_run – setup/run are now detached by default

    launch_kwargs = {
        "retry_until_up": settings.retry_until_up,
        "idle_minutes_to_autostop": idle_value,
        "down": down_value,
        "backend": None,
        **settings.launch_settings,  # Keep user-provided extras
    }

    # Remove keys that are no longer supported by sky.launch.
    for _deprecated in (
        "stream_logs",
        "detach_setup",
        "detach_run",
        "num_nodes",
    ):
        launch_kwargs.pop(_deprecated, None)

    # Remove None values to avoid overriding SkyPilot defaults
    return {k: v for k, v in launch_kwargs.items() if v is not None}
prepare_resources_kwargs(cloud: Cloud, settings: SkypilotBaseOrchestratorSettings, default_instance_type: Optional[str] = None, kubernetes_image: Optional[str] = None) -> Dict[str, Any]

Prepare resources keyword arguments for sky.Resources.

Parameters:

Name Type Description Default
cloud Cloud

Skypilot cloud.

required
settings SkypilotBaseOrchestratorSettings

Skypilot orchestrator settings.

required
default_instance_type Optional[str]

Default instance type.

None
kubernetes_image Optional[str]

Image to use for Kubernetes (if applicable).

None

Returns:

Type Description
Dict[str, Any]

Resources keyword arguments dictionary.

Source code in src/zenml/integrations/skypilot/utils.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def prepare_resources_kwargs(
    cloud: "Cloud",
    settings: SkypilotBaseOrchestratorSettings,
    default_instance_type: Optional[str] = None,
    kubernetes_image: Optional[str] = None,
) -> Dict[str, Any]:
    """Prepare resources keyword arguments for sky.Resources.

    Args:
        cloud: Skypilot cloud.
        settings: Skypilot orchestrator settings.
        default_instance_type: Default instance type.
        kubernetes_image: Image to use for Kubernetes (if applicable).

    Returns:
        Resources keyword arguments dictionary.
    """
    resources_kwargs = {
        "cloud": cloud,
        "instance_type": settings.instance_type or default_instance_type,
        "cpus": settings.cpus,
        "memory": settings.memory,
        "accelerators": settings.accelerators,
        "accelerator_args": settings.accelerator_args,
        "use_spot": settings.use_spot,
        "job_recovery": settings.job_recovery,
        "region": settings.region,
        "zone": settings.zone,
        "image_id": kubernetes_image
        if kubernetes_image
        else settings.image_id,
        "disk_size": settings.disk_size,
        "disk_tier": settings.disk_tier,
        "ports": settings.ports,
        "labels": settings.labels,
        "any_of": settings.any_of,
        "ordered": settings.ordered,
        **settings.resources_settings,  # Add any arbitrary resource settings
    }

    # Remove None values to avoid overriding SkyPilot defaults
    return {k: v for k, v in resources_kwargs.items() if v is not None}
prepare_task_kwargs(settings: SkypilotBaseOrchestratorSettings, run_command: str, setup: Optional[str], task_envs: Dict[str, str], task_name: str) -> Dict[str, Any]

Prepare task keyword arguments for sky.Task.

Parameters:

Name Type Description Default
settings SkypilotBaseOrchestratorSettings

Skypilot orchestrator settings.

required
run_command str

Command to run.

required
setup Optional[str]

Setup command.

required
task_envs Dict[str, str]

Task environment variables.

required
task_name str

Task name.

required

Returns:

Type Description
Dict[str, Any]

Task keyword arguments dictionary.

Source code in src/zenml/integrations/skypilot/utils.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def prepare_task_kwargs(
    settings: SkypilotBaseOrchestratorSettings,
    run_command: str,
    setup: Optional[str],
    task_envs: Dict[str, str],
    task_name: str,
) -> Dict[str, Any]:
    """Prepare task keyword arguments for sky.Task.

    Args:
        settings: Skypilot orchestrator settings.
        run_command: Command to run.
        setup: Setup command.
        task_envs: Task environment variables.
        task_name: Task name.

    Returns:
        Task keyword arguments dictionary.
    """
    # Merge envs from settings with existing task_envs
    merged_envs = {}

    # First add user-provided envs
    if settings.envs:
        merged_envs.update(settings.envs)

    # Then add task_envs which take precedence
    if task_envs:
        merged_envs.update(task_envs)

    task_kwargs = {
        "run": run_command,
        "setup": setup,
        "envs": merged_envs,
        "name": settings.task_name or task_name,
        "workdir": settings.workdir,
        "file_mounts_mapping": settings.file_mounts,
        **settings.task_settings,  # Add any arbitrary task settings
    }

    # Remove None values to avoid overriding SkyPilot defaults
    return {k: v for k, v in task_kwargs.items() if v is not None}
sanitize_cluster_name(name: str) -> str

Sanitize the value to be used in a cluster name.

Parameters:

Name Type Description Default
name str

Arbitrary input cluster name.

required

Returns:

Type Description
str

Sanitized cluster name.

Source code in src/zenml/integrations/skypilot/utils.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def sanitize_cluster_name(name: str) -> str:
    """Sanitize the value to be used in a cluster name.

    Args:
        name: Arbitrary input cluster name.

    Returns:
        Sanitized cluster name.
    """
    name = re.sub(
        r"[^a-z0-9-]", "-", name.lower()
    )  # replaces any character that is not a lowercase letter, digit, or hyphen with a hyphen
    name = re.sub(r"^[-]+", "", name)  # trim leading hyphens
    name = re.sub(r"[-]+$", "", name)  # trim trailing hyphens
    return name
sky_job_get(request_id: str, stream_logs: bool, cluster_name: str) -> Any

Handle SkyPilot request results based on stream_logs setting.

SkyPilot API exec and launch methods are asynchronous and return a request ID. This method waits for the operation to complete and returns the result. If stream_logs is True, it will also stream the logs and wait for the job to complete.

Parameters:

Name Type Description Default
request_id str

The request ID returned from a SkyPilot operation.

required
stream_logs bool

Whether to stream logs while waiting for completion.

required
cluster_name str

The name of the cluster to tail logs for.

required

Returns:

Type Description
Any

The result of the SkyPilot operation.

Raises:

Type Description
Exception

If the SkyPilot job fails.

Source code in src/zenml/integrations/skypilot/utils.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def sky_job_get(request_id: str, stream_logs: bool, cluster_name: str) -> Any:
    """Handle SkyPilot request results based on stream_logs setting.

    SkyPilot API exec and launch methods are asynchronous and return a request ID.
    This method waits for the operation to complete and returns the result.
    If stream_logs is True, it will also stream the logs and wait for the
    job to complete.

    Args:
        request_id: The request ID returned from a SkyPilot operation.
        stream_logs: Whether to stream logs while waiting for completion.
        cluster_name: The name of the cluster to tail logs for.

    Returns:
        The result of the SkyPilot operation.

    Raises:
        Exception: If the SkyPilot job fails.
    """
    if stream_logs:
        # Stream logs and wait for completion
        job_id, _ = sky.stream_and_get(request_id)
    else:
        # Just wait for completion without streaming logs
        job_id, _ = sky.get(request_id)

    status = 0  # 0=Successful, 100=Failed
    if stream_logs:
        status = sky.tail_logs(
            cluster_name=cluster_name, job_id=job_id, follow=True
        )

    if status != 0:
        raise Exception(f"SkyPilot job {job_id} failed with status {status}")

    return job_id