Skip to content

Skypilot

zenml.integrations.skypilot special

flavors special

skypilot_orchestrator_base_vm_config

Skypilot orchestrator base config and settings.

SkypilotBaseOrchestratorConfig (BaseOrchestratorConfig, SkypilotBaseOrchestratorSettings)

Skypilot orchestrator base config.

Attributes:

Name Type Description
disable_step_based_settings bool

whether to disable step-based settings. If True, the orchestrator will run all steps with the pipeline settings in one single VM. If False, the orchestrator will run each step with its own settings in separate VMs if provided.

Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py
class SkypilotBaseOrchestratorConfig(
    BaseOrchestratorConfig, SkypilotBaseOrchestratorSettings
):
    """Skypilot orchestrator base config.

    Attributes:
        disable_step_based_settings: whether to disable step-based settings.
            If True, the orchestrator will run all steps with the pipeline
            settings in one single VM. If False, the orchestrator will run
            each step with its own settings in separate VMs if provided.
    """

    disable_step_based_settings: bool = False

    @property
    def is_local(self) -> bool:
        """Checks if this stack component is running locally.

        Returns:
            True if this config is for a local component, False otherwise.
        """
        return False
is_local: bool property readonly

Checks if this stack component is running locally.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

SkypilotBaseOrchestratorSettings (BaseSettings)

Skypilot orchestrator base settings.

Attributes:

Name Type Description
instance_type Optional[str]

the instance type to use.

cpus Union[NoneType, int, float, str]

the number of CPUs required for the task. If a str, must be a string of the form '2' or '2+', where the + indicates that the task requires at least 2 CPUs.

memory Union[NoneType, int, float, str]

the amount of memory in GiB required. If a str, must be a string of the form '16' or '16+', where the + indicates that the task requires at least 16 GB of memory.

accelerators Union[NoneType, str, Dict[str, int]]

the accelerators required. If a str, must be a string of the form 'V100' or 'V100:2', where the :2 indicates that the task requires 2 V100 GPUs. If a dict, must be a dict of the form {'V100': 2} or {'tpu-v2-8': 1}.

accelerator_args Optional[Dict[str, str]]

accelerator-specific arguments. For example, {'tpu_vm': True, 'runtime_version': 'tpu-vm-base'} for TPUs.

use_spot Optional[bool]

whether to use spot instances. If None, defaults to False.

job_recovery Optional[str]

the spot recovery strategy to use for the managed spot to recover the cluster from preemption. Refer to recovery_strategy module <https://github.com/skypilot-org/skypilot/blob/master/sky/spot/recovery_strategy.py>__ # pylint: disable=line-too-long for more details.

region Optional[str]

the region to use.

zone Optional[str]

the zone to use.

image_id Union[Dict[str, str], str]

the image ID to use. If a str, must be a string of the image id from the cloud, such as AWS: 'ami-1234567890abcdef0', GCP: 'projects/my-project-id/global/images/my-image-name'; Or, a image tag provided by SkyPilot, such as AWS: 'skypilot:gpu-ubuntu-2004'. If a dict, must be a dict mapping from region to image ID, such as:

.. code-block:: python

{
'us-west1': 'ami-1234567890abcdef0',
'us-east1': 'ami-1234567890abcdef0'
}
disk_size Optional[int]

the size of the OS disk in GiB.

disk_tier Optional[Literal['high', 'medium', 'low']]

the disk performance tier to use. If None, defaults to 'medium'.

cluster_name Optional[str]

name of the cluster to create/reuse. If None, auto-generate a name.

retry_until_up bool

whether to retry launching the cluster until it is up.

idle_minutes_to_autostop Optional[int]

automatically stop the cluster after this many minute of idleness, i.e., no running or pending jobs in the cluster's job queue. Idleness gets reset whenever setting-up/ running/pending jobs are found in the job queue. Setting this flag is equivalent to running sky.launch(..., detach_run=True, ...) and then sky.autostop(idle_minutes=<minutes>). If not set, the cluster will not be autostopped.

down bool

Tear down the cluster after all jobs finish (successfully or abnormally). If --idle-minutes-to-autostop is also set, the cluster will be torn down after the specified idle time. Note that if errors occur during provisioning/data syncing/setting up, the cluster will not be torn down for debugging purposes.

stream_logs bool

if True, show the logs in the terminal.

docker_run_args List[str]

Optional arguments to pass to the docker run command running inside the VM.

Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py
class SkypilotBaseOrchestratorSettings(BaseSettings):
    """Skypilot orchestrator base settings.

    Attributes:
        instance_type: the instance type to use.
        cpus: the number of CPUs required for the task.
            If a str, must be a string of the form `'2'` or `'2+'`, where
            the `+` indicates that the task requires at least 2 CPUs.
        memory: the amount of memory in GiB required. If a
            str, must be a string of the form `'16'` or `'16+'`, where
            the `+` indicates that the task requires at least 16 GB of memory.
        accelerators: the accelerators required. If a str, must be
            a string of the form `'V100'` or `'V100:2'`, where the `:2`
            indicates that the task requires 2 V100 GPUs. If a dict, must be a
            dict of the form `{'V100': 2}` or `{'tpu-v2-8': 1}`.
        accelerator_args: accelerator-specific arguments. For example,
            `{'tpu_vm': True, 'runtime_version': 'tpu-vm-base'}` for TPUs.
        use_spot: whether to use spot instances. If None, defaults to
            False.
        job_recovery: the spot recovery strategy to use for the managed
            spot to recover the cluster from preemption. Refer to
            `recovery_strategy module <https://github.com/skypilot-org/skypilot/blob/master/sky/spot/recovery_strategy.py>`__ # pylint: disable=line-too-long
            for more details.
        region: the region to use.
        zone: the zone to use.
        image_id: the image ID to use. If a str, must be a string
            of the image id from the cloud, such as AWS:
            ``'ami-1234567890abcdef0'``, GCP:
            ``'projects/my-project-id/global/images/my-image-name'``;
            Or, a image tag provided by SkyPilot, such as AWS:
            ``'skypilot:gpu-ubuntu-2004'``. If a dict, must be a dict mapping
            from region to image ID, such as:

            .. code-block:: python

                {
                'us-west1': 'ami-1234567890abcdef0',
                'us-east1': 'ami-1234567890abcdef0'
                }

        disk_size: the size of the OS disk in GiB.
        disk_tier: the disk performance tier to use. If None, defaults to
            ``'medium'``.

        cluster_name: name of the cluster to create/reuse.  If None,
            auto-generate a name.
        retry_until_up: whether to retry launching the cluster until it is
            up.
        idle_minutes_to_autostop: automatically stop the cluster after this
            many minute of idleness, i.e., no running or pending jobs in the
            cluster's job queue. Idleness gets reset whenever setting-up/
            running/pending jobs are found in the job queue. Setting this
            flag is equivalent to running
            ``sky.launch(..., detach_run=True, ...)`` and then
            ``sky.autostop(idle_minutes=<minutes>)``. If not set, the cluster
            will not be autostopped.
        down: Tear down the cluster after all jobs finish (successfully or
            abnormally). If --idle-minutes-to-autostop is also set, the
            cluster will be torn down after the specified idle time.
            Note that if errors occur during provisioning/data syncing/setting
            up, the cluster will not be torn down for debugging purposes.
        stream_logs: if True, show the logs in the terminal.
        docker_run_args: Optional arguments to pass to the `docker run` command
            running inside the VM.
    """

    # Resources
    instance_type: Optional[str] = None
    cpus: Union[None, int, float, str] = Field(
        default=None, union_mode="left_to_right"
    )
    memory: Union[None, int, float, str] = Field(
        default=None, union_mode="left_to_right"
    )
    accelerators: Union[None, str, Dict[str, int]] = Field(
        default=None, union_mode="left_to_right"
    )
    accelerator_args: Optional[Dict[str, str]] = None
    use_spot: Optional[bool] = None
    job_recovery: Optional[str] = None
    region: Optional[str] = None
    zone: Optional[str] = None
    image_id: Union[Dict[str, str], str, None] = Field(
        default=None, union_mode="left_to_right"
    )
    disk_size: Optional[int] = None
    disk_tier: Optional[Literal["high", "medium", "low"]] = None

    # Run settings
    cluster_name: Optional[str] = None
    retry_until_up: bool = False
    idle_minutes_to_autostop: Optional[int] = 30
    down: bool = True
    stream_logs: bool = True

    docker_run_args: List[str] = []

orchestrators special

Initialization of the Skypilot ZenML orchestrators.

skypilot_base_vm_orchestrator

Implementation of the Skypilot base VM orchestrator.

SkypilotBaseOrchestrator (ContainerizedOrchestrator)

Base class for Orchestrator responsible for running pipelines remotely in a VM.

This orchestrator does not support running on a schedule.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
class SkypilotBaseOrchestrator(ContainerizedOrchestrator):
    """Base class for Orchestrator responsible for running pipelines remotely in a VM.

    This orchestrator does not support running on a schedule.
    """

    # The default instance type to use if none is specified in settings
    DEFAULT_INSTANCE_TYPE: Optional[str] = None

    @property
    def validator(self) -> Optional[StackValidator]:
        """Validates the stack.

        In the remote case, checks that the stack contains a container registry,
        image builder and only remote components.

        Returns:
            A `StackValidator` instance.
        """

        def _validate_remote_components(
            stack: "Stack",
        ) -> Tuple[bool, str]:
            for component in stack.components.values():
                if not component.config.is_local:
                    continue

                return False, (
                    f"The Skypilot orchestrator runs pipelines remotely, "
                    f"but the '{component.name}' {component.type.value} is "
                    "a local stack component and will not be available in "
                    "the Skypilot step.\nPlease ensure that you always "
                    "use non-local stack components with the Skypilot "
                    "orchestrator."
                )

            return True, ""

        return StackValidator(
            required_components={
                StackComponentType.CONTAINER_REGISTRY,
                StackComponentType.IMAGE_BUILDER,
            },
            custom_validation_function=_validate_remote_components,
        )

    def get_orchestrator_run_id(self) -> str:
        """Returns the active orchestrator run id.

        Raises:
            RuntimeError: If the environment variable specifying the run id
                is not set.

        Returns:
            The orchestrator run id.
        """
        try:
            return os.environ[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID]
        except KeyError:
            raise RuntimeError(
                "Unable to read run id from environment variable "
                f"{ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID}."
            )

    @property
    def config(self) -> SkypilotBaseOrchestratorConfig:
        """Returns the `SkypilotBaseOrchestratorConfig` config.

        Returns:
            The configuration.
        """
        return cast(SkypilotBaseOrchestratorConfig, self._config)

    @property
    @abstractmethod
    def cloud(self) -> sky.clouds.Cloud:
        """The type of sky cloud to use.

        Returns:
            A `sky.clouds.Cloud` instance.
        """

    def setup_credentials(self) -> None:
        """Set up credentials for the orchestrator."""
        connector = self.get_connector()
        assert connector is not None
        connector.configure_local_client()

    @abstractmethod
    def prepare_environment_variable(self, set: bool = True) -> None:
        """Set up Environment variables that are required for the orchestrator.

        Args:
            set: Whether to set the environment variables or not.
        """

    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeploymentResponse",
        stack: "Stack",
        environment: Dict[str, str],
    ) -> Any:
        """Runs each pipeline step in a separate Skypilot container.

        Args:
            deployment: The pipeline deployment to prepare or run.
            stack: The stack the pipeline will run on.
            environment: Environment variables to set in the orchestration
                environment.

        Raises:
            Exception: If the pipeline run fails.
            RuntimeError: If the code is running in a notebook.
        """
        # First check whether the code is running in a notebook.
        if Environment.in_notebook():
            raise RuntimeError(
                "The Skypilot orchestrator cannot run pipelines in a notebook "
                "environment. The reason is that it is non-trivial to create "
                "a Docker image of a notebook. Please consider refactoring "
                "your notebook cells into separate scripts in a Python module "
                "and run the code outside of a notebook when using this "
                "orchestrator."
            )
        if deployment.schedule:
            logger.warning(
                "Skypilot Orchestrator currently does not support the "
                "use of schedules. The `schedule` will be ignored "
                "and the pipeline will be run immediately."
            )

        # Set up some variables for configuration
        orchestrator_run_id = str(uuid4())
        environment[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID] = (
            orchestrator_run_id
        )

        settings = cast(
            SkypilotBaseOrchestratorSettings,
            self.get_settings(deployment),
        )

        pipeline_name = deployment.pipeline_configuration.name
        orchestrator_run_name = get_orchestrator_run_name(pipeline_name)

        assert stack.container_registry

        # Get Docker image for the orchestrator pod
        try:
            image = self.get_image(deployment=deployment)
        except KeyError:
            # If no generic pipeline image exists (which means all steps have
            # custom builds) we use a random step image as all of them include
            # dependencies for the active stack
            pipeline_step_name = next(iter(deployment.step_configurations))
            image = self.get_image(
                deployment=deployment, step_name=pipeline_step_name
            )

        different_settings_found = False

        if not self.config.disable_step_based_settings:
            for _, step in deployment.step_configurations.items():
                step_settings = cast(
                    SkypilotBaseOrchestratorSettings,
                    self.get_settings(step),
                )
                if step_settings != settings:
                    different_settings_found = True
                    logger.info(
                        "At least one step has different settings than the "
                        "pipeline. The step with different settings will be "
                        "run in a separate VM.\n"
                        "You can configure the orchestrator to disable this "
                        "behavior by updating the `disable_step_based_settings` "
                        "in your orchestrator configuration "
                        "by running the following command: "
                        "`zenml orchestrator update --disable-step-based-settings=True`"
                    )
                    break

        # Decide which configuration to use based on whether different settings were found
        if (
            not self.config.disable_step_based_settings
            and different_settings_found
        ):
            # Run each step in a separate VM using SkypilotOrchestratorEntrypointConfiguration
            command = SkypilotOrchestratorEntrypointConfiguration.get_entrypoint_command()
            args = SkypilotOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
                run_name=orchestrator_run_name,
                deployment_id=deployment.id,
            )
        else:
            # Run the entire pipeline in one VM using PipelineEntrypointConfiguration
            command = PipelineEntrypointConfiguration.get_entrypoint_command()
            args = PipelineEntrypointConfiguration.get_entrypoint_arguments(
                deployment_id=deployment.id
            )

        entrypoint_str = " ".join(command)
        arguments_str = " ".join(args)

        task_envs = environment
        docker_environment_str = " ".join(
            f"-e {k}={v}" for k, v in environment.items()
        )
        custom_run_args = " ".join(settings.docker_run_args)
        if custom_run_args:
            custom_run_args += " "

        instance_type = settings.instance_type or self.DEFAULT_INSTANCE_TYPE

        # Set up credentials
        self.setup_credentials()

        # Guaranteed by stack validation
        assert stack is not None and stack.container_registry is not None

        if docker_creds := stack.container_registry.credentials:
            docker_username, docker_password = docker_creds
            setup = (
                f"sudo docker login --username $DOCKER_USERNAME --password "
                f"$DOCKER_PASSWORD {stack.container_registry.config.uri}"
            )
            task_envs["DOCKER_USERNAME"] = docker_username
            task_envs["DOCKER_PASSWORD"] = docker_password
        else:
            setup = None

        # Run the entire pipeline

        # Set the service connector AWS profile ENV variable
        self.prepare_environment_variable(set=True)

        try:
            if isinstance(self.cloud, sky.clouds.Kubernetes):
                run_command = f"${{VIRTUAL_ENV:+$VIRTUAL_ENV/bin/}}{entrypoint_str} {arguments_str}"
                setup = None
                down = False
                idle_minutes_to_autostop = None
            else:
                run_command = f"sudo docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}"
                down = settings.down
                idle_minutes_to_autostop = settings.idle_minutes_to_autostop
            task = sky.Task(
                run=run_command,
                setup=setup,
                envs=task_envs,
            )
            logger.debug(f"Running run: {run_command}")

            task = task.set_resources(
                sky.Resources(
                    cloud=self.cloud,
                    instance_type=instance_type,
                    cpus=settings.cpus,
                    memory=settings.memory,
                    accelerators=settings.accelerators,
                    accelerator_args=settings.accelerator_args,
                    use_spot=settings.use_spot,
                    job_recovery=settings.job_recovery,
                    region=settings.region,
                    zone=settings.zone,
                    image_id=image
                    if isinstance(self.cloud, sky.clouds.Kubernetes)
                    else settings.image_id,
                    disk_size=settings.disk_size,
                    disk_tier=settings.disk_tier,
                )
            )
            # Set the cluster name
            if settings.cluster_name:
                sky.exec(
                    task,
                    settings.cluster_name,
                    down=down,
                    stream_logs=settings.stream_logs,
                    backend=None,
                    detach_run=True,
                )
            else:
                # Find existing cluster
                for i in sky.status(refresh=True):
                    if isinstance(
                        i["handle"].launched_resources.cloud, type(self.cloud)
                    ):
                        cluster_name = i["handle"].cluster_name
                        logger.info(
                            f"Found existing cluster {cluster_name}. Reusing..."
                        )
                cluster_name = self.sanitize_cluster_name(
                    f"{orchestrator_run_name}"
                )
                # Launch the cluster
                sky.launch(
                    task,
                    cluster_name,
                    retry_until_up=settings.retry_until_up,
                    idle_minutes_to_autostop=idle_minutes_to_autostop,
                    down=down,
                    stream_logs=settings.stream_logs,
                    detach_setup=True,
                )

        except Exception as e:
            logger.error(f"Pipeline run failed: {e}")
            raise

        finally:
            # Unset the service connector AWS profile ENV variable
            self.prepare_environment_variable(set=False)

    def sanitize_cluster_name(self, name: str) -> str:
        """Sanitize the value to be used in a cluster name.

        Args:
            name: Arbitrary input cluster name.

        Returns:
            Sanitized cluster name.
        """
        name = re.sub(
            r"[^a-z0-9-]", "-", name.lower()
        )  # replaces any character that is not a lowercase letter, digit, or hyphen with a hyphen
        name = re.sub(r"^[-]+", "", name)  # trim leading hyphens
        name = re.sub(r"[-]+$", "", name)  # trim trailing hyphens
        return name
cloud: sky.clouds.Cloud property readonly

The type of sky cloud to use.

Returns:

Type Description
sky.clouds.Cloud

A sky.clouds.Cloud instance.

config: SkypilotBaseOrchestratorConfig property readonly

Returns the SkypilotBaseOrchestratorConfig config.

Returns:

Type Description
SkypilotBaseOrchestratorConfig

The configuration.

validator: Optional[zenml.stack.stack_validator.StackValidator] property readonly

Validates the stack.

In the remote case, checks that the stack contains a container registry, image builder and only remote components.

Returns:

Type Description
Optional[zenml.stack.stack_validator.StackValidator]

A StackValidator instance.

get_orchestrator_run_id(self)

Returns the active orchestrator run id.

Exceptions:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    try:
        return os.environ[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID}."
        )
prepare_environment_variable(self, set=True)

Set up Environment variables that are required for the orchestrator.

Parameters:

Name Type Description Default
set bool

Whether to set the environment variables or not.

True
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
@abstractmethod
def prepare_environment_variable(self, set: bool = True) -> None:
    """Set up Environment variables that are required for the orchestrator.

    Args:
        set: Whether to set the environment variables or not.
    """
prepare_or_run_pipeline(self, deployment, stack, environment)

Runs each pipeline step in a separate Skypilot container.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required

Exceptions:

Type Description
Exception

If the pipeline run fails.

RuntimeError

If the code is running in a notebook.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
) -> Any:
    """Runs each pipeline step in a separate Skypilot container.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment.

    Raises:
        Exception: If the pipeline run fails.
        RuntimeError: If the code is running in a notebook.
    """
    # First check whether the code is running in a notebook.
    if Environment.in_notebook():
        raise RuntimeError(
            "The Skypilot orchestrator cannot run pipelines in a notebook "
            "environment. The reason is that it is non-trivial to create "
            "a Docker image of a notebook. Please consider refactoring "
            "your notebook cells into separate scripts in a Python module "
            "and run the code outside of a notebook when using this "
            "orchestrator."
        )
    if deployment.schedule:
        logger.warning(
            "Skypilot Orchestrator currently does not support the "
            "use of schedules. The `schedule` will be ignored "
            "and the pipeline will be run immediately."
        )

    # Set up some variables for configuration
    orchestrator_run_id = str(uuid4())
    environment[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID] = (
        orchestrator_run_id
    )

    settings = cast(
        SkypilotBaseOrchestratorSettings,
        self.get_settings(deployment),
    )

    pipeline_name = deployment.pipeline_configuration.name
    orchestrator_run_name = get_orchestrator_run_name(pipeline_name)

    assert stack.container_registry

    # Get Docker image for the orchestrator pod
    try:
        image = self.get_image(deployment=deployment)
    except KeyError:
        # If no generic pipeline image exists (which means all steps have
        # custom builds) we use a random step image as all of them include
        # dependencies for the active stack
        pipeline_step_name = next(iter(deployment.step_configurations))
        image = self.get_image(
            deployment=deployment, step_name=pipeline_step_name
        )

    different_settings_found = False

    if not self.config.disable_step_based_settings:
        for _, step in deployment.step_configurations.items():
            step_settings = cast(
                SkypilotBaseOrchestratorSettings,
                self.get_settings(step),
            )
            if step_settings != settings:
                different_settings_found = True
                logger.info(
                    "At least one step has different settings than the "
                    "pipeline. The step with different settings will be "
                    "run in a separate VM.\n"
                    "You can configure the orchestrator to disable this "
                    "behavior by updating the `disable_step_based_settings` "
                    "in your orchestrator configuration "
                    "by running the following command: "
                    "`zenml orchestrator update --disable-step-based-settings=True`"
                )
                break

    # Decide which configuration to use based on whether different settings were found
    if (
        not self.config.disable_step_based_settings
        and different_settings_found
    ):
        # Run each step in a separate VM using SkypilotOrchestratorEntrypointConfiguration
        command = SkypilotOrchestratorEntrypointConfiguration.get_entrypoint_command()
        args = SkypilotOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
            run_name=orchestrator_run_name,
            deployment_id=deployment.id,
        )
    else:
        # Run the entire pipeline in one VM using PipelineEntrypointConfiguration
        command = PipelineEntrypointConfiguration.get_entrypoint_command()
        args = PipelineEntrypointConfiguration.get_entrypoint_arguments(
            deployment_id=deployment.id
        )

    entrypoint_str = " ".join(command)
    arguments_str = " ".join(args)

    task_envs = environment
    docker_environment_str = " ".join(
        f"-e {k}={v}" for k, v in environment.items()
    )
    custom_run_args = " ".join(settings.docker_run_args)
    if custom_run_args:
        custom_run_args += " "

    instance_type = settings.instance_type or self.DEFAULT_INSTANCE_TYPE

    # Set up credentials
    self.setup_credentials()

    # Guaranteed by stack validation
    assert stack is not None and stack.container_registry is not None

    if docker_creds := stack.container_registry.credentials:
        docker_username, docker_password = docker_creds
        setup = (
            f"sudo docker login --username $DOCKER_USERNAME --password "
            f"$DOCKER_PASSWORD {stack.container_registry.config.uri}"
        )
        task_envs["DOCKER_USERNAME"] = docker_username
        task_envs["DOCKER_PASSWORD"] = docker_password
    else:
        setup = None

    # Run the entire pipeline

    # Set the service connector AWS profile ENV variable
    self.prepare_environment_variable(set=True)

    try:
        if isinstance(self.cloud, sky.clouds.Kubernetes):
            run_command = f"${{VIRTUAL_ENV:+$VIRTUAL_ENV/bin/}}{entrypoint_str} {arguments_str}"
            setup = None
            down = False
            idle_minutes_to_autostop = None
        else:
            run_command = f"sudo docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}"
            down = settings.down
            idle_minutes_to_autostop = settings.idle_minutes_to_autostop
        task = sky.Task(
            run=run_command,
            setup=setup,
            envs=task_envs,
        )
        logger.debug(f"Running run: {run_command}")

        task = task.set_resources(
            sky.Resources(
                cloud=self.cloud,
                instance_type=instance_type,
                cpus=settings.cpus,
                memory=settings.memory,
                accelerators=settings.accelerators,
                accelerator_args=settings.accelerator_args,
                use_spot=settings.use_spot,
                job_recovery=settings.job_recovery,
                region=settings.region,
                zone=settings.zone,
                image_id=image
                if isinstance(self.cloud, sky.clouds.Kubernetes)
                else settings.image_id,
                disk_size=settings.disk_size,
                disk_tier=settings.disk_tier,
            )
        )
        # Set the cluster name
        if settings.cluster_name:
            sky.exec(
                task,
                settings.cluster_name,
                down=down,
                stream_logs=settings.stream_logs,
                backend=None,
                detach_run=True,
            )
        else:
            # Find existing cluster
            for i in sky.status(refresh=True):
                if isinstance(
                    i["handle"].launched_resources.cloud, type(self.cloud)
                ):
                    cluster_name = i["handle"].cluster_name
                    logger.info(
                        f"Found existing cluster {cluster_name}. Reusing..."
                    )
            cluster_name = self.sanitize_cluster_name(
                f"{orchestrator_run_name}"
            )
            # Launch the cluster
            sky.launch(
                task,
                cluster_name,
                retry_until_up=settings.retry_until_up,
                idle_minutes_to_autostop=idle_minutes_to_autostop,
                down=down,
                stream_logs=settings.stream_logs,
                detach_setup=True,
            )

    except Exception as e:
        logger.error(f"Pipeline run failed: {e}")
        raise

    finally:
        # Unset the service connector AWS profile ENV variable
        self.prepare_environment_variable(set=False)
sanitize_cluster_name(self, name)

Sanitize the value to be used in a cluster name.

Parameters:

Name Type Description Default
name str

Arbitrary input cluster name.

required

Returns:

Type Description
str

Sanitized cluster name.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
def sanitize_cluster_name(self, name: str) -> str:
    """Sanitize the value to be used in a cluster name.

    Args:
        name: Arbitrary input cluster name.

    Returns:
        Sanitized cluster name.
    """
    name = re.sub(
        r"[^a-z0-9-]", "-", name.lower()
    )  # replaces any character that is not a lowercase letter, digit, or hyphen with a hyphen
    name = re.sub(r"^[-]+", "", name)  # trim leading hyphens
    name = re.sub(r"[-]+$", "", name)  # trim trailing hyphens
    return name
setup_credentials(self)

Set up credentials for the orchestrator.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
def setup_credentials(self) -> None:
    """Set up credentials for the orchestrator."""
    connector = self.get_connector()
    assert connector is not None
    connector.configure_local_client()

skypilot_orchestrator_entrypoint

Entrypoint of the Skypilot master/orchestrator VM.

main()

Entrypoint of the Skypilot master/orchestrator VM.

This is the entrypoint of the Skypilot master/orchestrator VM. It is responsible for provisioning the VM and running the pipeline steps in separate VMs.

The VM is provisioned using the sky library. The pipeline steps are run using the sky library as well.

Exceptions:

Type Description
TypeError

If the active stack's orchestrator is not an instance of SkypilotBaseOrchestrator.

ValueError

If the active stack's container registry is None.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint.py
def main() -> None:
    """Entrypoint of the Skypilot master/orchestrator VM.

    This is the entrypoint of the Skypilot master/orchestrator VM. It is
    responsible for provisioning the VM and running the pipeline steps in
    separate VMs.

    The VM is provisioned using the `sky` library. The pipeline steps are run
    using the `sky` library as well.

    Raises:
        TypeError: If the active stack's orchestrator is not an instance of
            SkypilotBaseOrchestrator.
        ValueError: If the active stack's container registry is None.
    """
    # Log to the container's stdout so it can be streamed by the client.
    logger.info("Skypilot orchestrator VM started.")

    # Parse / extract args.
    args = parse_args()

    orchestrator_run_id = socket.gethostname()

    deployment = Client().get_deployment(args.deployment_id)

    pipeline_dag = {
        step_name: step.spec.upstream_steps
        for step_name, step in deployment.step_configurations.items()
    }
    step_command = StepEntrypointConfiguration.get_entrypoint_command()
    entrypoint_str = " ".join(step_command)

    active_stack = Client().active_stack

    orchestrator = active_stack.orchestrator
    if not isinstance(orchestrator, SkypilotBaseOrchestrator):
        raise TypeError(
            "The active stack's orchestrator is not an instance of SkypilotBaseOrchestrator."
        )

    # Set up credentials
    orchestrator.setup_credentials()

    # Set the service connector AWS profile ENV variable
    orchestrator.prepare_environment_variable(set=True)

    # get active container registry
    container_registry = active_stack.container_registry
    if container_registry is None:
        raise ValueError("Container registry cannot be None.")

    if docker_creds := container_registry.credentials:
        docker_username, docker_password = docker_creds
        setup = (
            f"docker login --username $DOCKER_USERNAME --password "
            f"$DOCKER_PASSWORD {container_registry.config.uri}"
        )
        task_envs = {
            "DOCKER_USERNAME": docker_username,
            "DOCKER_PASSWORD": docker_password,
        }
    else:
        setup = None
        task_envs = None

    unique_resource_configs: Dict[str, str] = {}
    for step_name, step in deployment.step_configurations.items():
        settings = cast(
            SkypilotBaseOrchestratorSettings,
            orchestrator.get_settings(step),
        )
        # Handle both str and Dict[str, int] types for accelerators
        if isinstance(settings.accelerators, dict):
            accelerators_hashable = frozenset(settings.accelerators.items())
        elif isinstance(settings.accelerators, str):
            accelerators_hashable = frozenset({(settings.accelerators, 1)})
        else:
            accelerators_hashable = None
        resource_config = (
            settings.instance_type,
            settings.cpus,
            settings.memory,
            settings.disk_size,  # Assuming disk_size is part of the settings
            settings.disk_tier,  # Assuming disk_tier is part of the settings
            settings.use_spot,
            settings.job_recovery,
            settings.region,
            settings.zone,
            accelerators_hashable,
        )
        cluster_name_parts = [
            orchestrator.sanitize_cluster_name(str(part))
            for part in resource_config
            if part is not None
        ]
        cluster_name = f"cluster-{orchestrator_run_id}" + "-".join(
            cluster_name_parts
        )
        unique_resource_configs[step_name] = cluster_name

    run = Client().list_pipeline_runs(
        sort_by="asc:created",
        size=1,
        deployment_id=args.deployment_id,
        status=ExecutionStatus.INITIALIZING,
    )[0]

    logger.info("Fetching pipeline run: %s", run.id)

    def run_step_on_skypilot_vm(step_name: str) -> None:
        """Run a pipeline step in a separate Skypilot VM.

        Args:
            step_name: Name of the step.
        """
        cluster_name = unique_resource_configs[step_name]

        image = SkypilotBaseOrchestrator.get_image(
            deployment=deployment, step_name=step_name
        )

        step_args = StepEntrypointConfiguration.get_entrypoint_arguments(
            step_name=step_name, deployment_id=deployment.id
        )
        arguments_str = " ".join(step_args)

        step = deployment.step_configurations[step_name]
        settings = cast(
            SkypilotBaseOrchestratorSettings,
            orchestrator.get_settings(step),
        )
        env = get_config_environment_vars()
        env[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID] = orchestrator_run_id

        docker_environment_str = " ".join(
            f"-e {k}={v}" for k, v in env.items()
        )
        custom_run_args = " ".join(settings.docker_run_args)
        if custom_run_args:
            custom_run_args += " "

        # Set up the task
        run_command = f"docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}"
        task_name = f"{deployment.id}-{step_name}-{time.time()}"
        task = sky.Task(
            run=run_command,
            setup=setup,
            envs=task_envs,
            name=task_name,
        )
        task = task.set_resources(
            sky.Resources(
                cloud=orchestrator.cloud,
                instance_type=settings.instance_type
                or orchestrator.DEFAULT_INSTANCE_TYPE,
                cpus=settings.cpus,
                memory=settings.memory,
                disk_size=settings.disk_size,
                disk_tier=settings.disk_tier,
                accelerators=settings.accelerators,
                accelerator_args=settings.accelerator_args,
                use_spot=settings.use_spot,
                job_recovery=settings.job_recovery,
                region=settings.region,
                zone=settings.zone,
                image_id=settings.image_id,
            )
        )

        sky.launch(
            task,
            cluster_name,
            retry_until_up=settings.retry_until_up,
            idle_minutes_to_autostop=settings.idle_minutes_to_autostop,
            down=settings.down,
            stream_logs=settings.stream_logs,
            detach_setup=True,
            detach_run=True,
        )

        # Wait for pod to finish.
        logger.info(f"Waiting for pod of step `{step_name}` to start...")

        current_run = Client().get_pipeline_run(run.id)

        step_is_finished = False
        while not step_is_finished:
            time.sleep(10)
            current_run = Client().get_pipeline_run(run.id)
            try:
                step_is_finished = current_run.steps[
                    step_name
                ].status.is_finished
            except KeyError:
                # Step is not yet in the run, so we wait for it to appear
                continue

        # Pop the resource configuration for this step
        unique_resource_configs.pop(step_name)

        if cluster_name in unique_resource_configs.values():
            # If there are more steps using this configuration, skip deprovisioning the cluster
            logger.info(
                f"Resource configuration for cluster '{cluster_name}' "
                "is used by subsequent steps. Skipping the deprovisioning of "
                "the cluster."
            )
        else:
            # If there are no more steps using this configuration, down the cluster
            logger.info(
                f"Resource configuration for cluster '{cluster_name}' "
                "is not used by subsequent steps. deprovisioning the cluster."
            )
            sky.down(cluster_name)

        logger.info(f"Running step `{step_name}` on a VM is completed.")

    ThreadedDagRunner(dag=pipeline_dag, run_fn=run_step_on_skypilot_vm).run()

    logger.info("Orchestration VM provisioned.")
parse_args()

Parse entrypoint arguments.

Returns:

Type Description
Namespace

Parsed args.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint.py
def parse_args() -> argparse.Namespace:
    """Parse entrypoint arguments.

    Returns:
        Parsed args.
    """
    parser = argparse.ArgumentParser()
    parser.add_argument("--run_name", type=str, required=True)
    parser.add_argument("--deployment_id", type=str, required=True)
    return parser.parse_args()

skypilot_orchestrator_entrypoint_configuration

Entrypoint configuration for the Skypilot master/orchestrator VM.

SkypilotOrchestratorEntrypointConfiguration

Entrypoint configuration for the Skypilot master/orchestrator VM.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint_configuration.py
class SkypilotOrchestratorEntrypointConfiguration:
    """Entrypoint configuration for the Skypilot master/orchestrator VM."""

    @classmethod
    def get_entrypoint_options(cls) -> Set[str]:
        """Gets all the options required for running this entrypoint.

        Returns:
            Entrypoint options.
        """
        options = {
            RUN_NAME_OPTION,
            DEPLOYMENT_ID_OPTION,
        }
        return options

    @classmethod
    def get_entrypoint_command(cls) -> List[str]:
        """Returns a command that runs the entrypoint module.

        Returns:
            Entrypoint command.
        """
        command = [
            "python",
            "-m",
            "zenml.integrations.skypilot.orchestrators.skypilot_orchestrator_entrypoint",
        ]
        return command

    @classmethod
    def get_entrypoint_arguments(
        cls,
        run_name: str,
        deployment_id: "UUID",
    ) -> List[str]:
        """Gets all arguments that the entrypoint command should be called with.

        Args:
            run_name: Name of the ZenML run.
            deployment_id: ID of the deployment.

        Returns:
            List of entrypoint arguments.
        """
        args = [
            f"--{RUN_NAME_OPTION}",
            run_name,
            f"--{DEPLOYMENT_ID_OPTION}",
            str(deployment_id),
        ]

        return args
get_entrypoint_arguments(run_name, deployment_id) classmethod

Gets all arguments that the entrypoint command should be called with.

Parameters:

Name Type Description Default
run_name str

Name of the ZenML run.

required
deployment_id UUID

ID of the deployment.

required

Returns:

Type Description
List[str]

List of entrypoint arguments.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint_configuration.py
@classmethod
def get_entrypoint_arguments(
    cls,
    run_name: str,
    deployment_id: "UUID",
) -> List[str]:
    """Gets all arguments that the entrypoint command should be called with.

    Args:
        run_name: Name of the ZenML run.
        deployment_id: ID of the deployment.

    Returns:
        List of entrypoint arguments.
    """
    args = [
        f"--{RUN_NAME_OPTION}",
        run_name,
        f"--{DEPLOYMENT_ID_OPTION}",
        str(deployment_id),
    ]

    return args
get_entrypoint_command() classmethod

Returns a command that runs the entrypoint module.

Returns:

Type Description
List[str]

Entrypoint command.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint_configuration.py
@classmethod
def get_entrypoint_command(cls) -> List[str]:
    """Returns a command that runs the entrypoint module.

    Returns:
        Entrypoint command.
    """
    command = [
        "python",
        "-m",
        "zenml.integrations.skypilot.orchestrators.skypilot_orchestrator_entrypoint",
    ]
    return command
get_entrypoint_options() classmethod

Gets all the options required for running this entrypoint.

Returns:

Type Description
Set[str]

Entrypoint options.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint_configuration.py
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
    """Gets all the options required for running this entrypoint.

    Returns:
        Entrypoint options.
    """
    options = {
        RUN_NAME_OPTION,
        DEPLOYMENT_ID_OPTION,
    }
    return options