Skypilot
        zenml.integrations.skypilot
  
      special
  
    
        flavors
  
      special
  
    
        skypilot_orchestrator_base_vm_config
    Skypilot orchestrator base config and settings.
        
SkypilotBaseOrchestratorConfig            (BaseOrchestratorConfig, SkypilotBaseOrchestratorSettings)
        
    Skypilot orchestrator base config.
Attributes:
| Name | Type | Description | 
|---|---|---|
| disable_step_based_settings | bool | whether to disable step-based settings. If True, the orchestrator will run all steps with the pipeline settings in one single VM. If False, the orchestrator will run each step with its own settings in separate VMs if provided. | 
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py
          class SkypilotBaseOrchestratorConfig(
    BaseOrchestratorConfig, SkypilotBaseOrchestratorSettings
):
    """Skypilot orchestrator base config.
    Attributes:
        disable_step_based_settings: whether to disable step-based settings.
            If True, the orchestrator will run all steps with the pipeline
            settings in one single VM. If False, the orchestrator will run
            each step with its own settings in separate VMs if provided.
    """
    disable_step_based_settings: bool = False
    @property
    def is_local(self) -> bool:
        """Checks if this stack component is running locally.
        Returns:
            True if this config is for a local component, False otherwise.
        """
        return False
is_local: bool
  
      property
      readonly
  
    Checks if this stack component is running locally.
Returns:
| Type | Description | 
|---|---|
| bool | True if this config is for a local component, False otherwise. | 
        
SkypilotBaseOrchestratorSettings            (BaseSettings)
        
    Skypilot orchestrator base settings.
Attributes:
| Name | Type | Description | 
|---|---|---|
| instance_type | Optional[str] | the instance type to use. | 
| cpus | Union[NoneType, int, float, str] | the number of CPUs required for the task.
If a str, must be a string of the form  | 
| memory | Union[NoneType, int, float, str] | the amount of memory in GiB required. If a
str, must be a string of the form  | 
| accelerators | Union[NoneType, str, Dict[str, int]] | the accelerators required. If a str, must be
a string of the form  | 
| accelerator_args | Optional[Dict[str, str]] | accelerator-specific arguments. For example,
 | 
| use_spot | Optional[bool] | whether to use spot instances. If None, defaults to False. | 
| job_recovery | Optional[str] | the spot recovery strategy to use for the managed
spot to recover the cluster from preemption. Refer to
 | 
| region | Optional[str] | the region to use. | 
| zone | Optional[str] | the zone to use. | 
| image_id | Union[Dict[str, str], str] | the image ID to use. If a str, must be a string
of the image id from the cloud, such as AWS:
 .. code-block:: python  | 
| disk_size | Optional[int] | the size of the OS disk in GiB. | 
| disk_tier | Optional[Literal['high', 'medium', 'low']] | the disk performance tier to use. If None, defaults to
 | 
| cluster_name | Optional[str] | name of the cluster to create/reuse. If None, auto-generate a name. | 
| retry_until_up | bool | whether to retry launching the cluster until it is up. | 
| idle_minutes_to_autostop | Optional[int] | automatically stop the cluster after this
many minute of idleness, i.e., no running or pending jobs in the
cluster's job queue. Idleness gets reset whenever setting-up/
running/pending jobs are found in the job queue. Setting this
flag is equivalent to running
 | 
| down | bool | Tear down the cluster after all jobs finish (successfully or abnormally). If --idle-minutes-to-autostop is also set, the cluster will be torn down after the specified idle time. Note that if errors occur during provisioning/data syncing/setting up, the cluster will not be torn down for debugging purposes. | 
| stream_logs | bool | if True, show the logs in the terminal. | 
| docker_run_args | List[str] | Optional arguments to pass to the  | 
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py
          class SkypilotBaseOrchestratorSettings(BaseSettings):
    """Skypilot orchestrator base settings.
    Attributes:
        instance_type: the instance type to use.
        cpus: the number of CPUs required for the task.
            If a str, must be a string of the form `'2'` or `'2+'`, where
            the `+` indicates that the task requires at least 2 CPUs.
        memory: the amount of memory in GiB required. If a
            str, must be a string of the form `'16'` or `'16+'`, where
            the `+` indicates that the task requires at least 16 GB of memory.
        accelerators: the accelerators required. If a str, must be
            a string of the form `'V100'` or `'V100:2'`, where the `:2`
            indicates that the task requires 2 V100 GPUs. If a dict, must be a
            dict of the form `{'V100': 2}` or `{'tpu-v2-8': 1}`.
        accelerator_args: accelerator-specific arguments. For example,
            `{'tpu_vm': True, 'runtime_version': 'tpu-vm-base'}` for TPUs.
        use_spot: whether to use spot instances. If None, defaults to
            False.
        job_recovery: the spot recovery strategy to use for the managed
            spot to recover the cluster from preemption. Refer to
            `recovery_strategy module <https://github.com/skypilot-org/skypilot/blob/master/sky/spot/recovery_strategy.py>`__ # pylint: disable=line-too-long
            for more details.
        region: the region to use.
        zone: the zone to use.
        image_id: the image ID to use. If a str, must be a string
            of the image id from the cloud, such as AWS:
            ``'ami-1234567890abcdef0'``, GCP:
            ``'projects/my-project-id/global/images/my-image-name'``;
            Or, a image tag provided by SkyPilot, such as AWS:
            ``'skypilot:gpu-ubuntu-2004'``. If a dict, must be a dict mapping
            from region to image ID, such as:
            .. code-block:: python
                {
                'us-west1': 'ami-1234567890abcdef0',
                'us-east1': 'ami-1234567890abcdef0'
                }
        disk_size: the size of the OS disk in GiB.
        disk_tier: the disk performance tier to use. If None, defaults to
            ``'medium'``.
        cluster_name: name of the cluster to create/reuse.  If None,
            auto-generate a name.
        retry_until_up: whether to retry launching the cluster until it is
            up.
        idle_minutes_to_autostop: automatically stop the cluster after this
            many minute of idleness, i.e., no running or pending jobs in the
            cluster's job queue. Idleness gets reset whenever setting-up/
            running/pending jobs are found in the job queue. Setting this
            flag is equivalent to running
            ``sky.launch(..., detach_run=True, ...)`` and then
            ``sky.autostop(idle_minutes=<minutes>)``. If not set, the cluster
            will not be autostopped.
        down: Tear down the cluster after all jobs finish (successfully or
            abnormally). If --idle-minutes-to-autostop is also set, the
            cluster will be torn down after the specified idle time.
            Note that if errors occur during provisioning/data syncing/setting
            up, the cluster will not be torn down for debugging purposes.
        stream_logs: if True, show the logs in the terminal.
        docker_run_args: Optional arguments to pass to the `docker run` command
            running inside the VM.
    """
    # Resources
    instance_type: Optional[str] = None
    cpus: Union[None, int, float, str] = Field(
        default=None, union_mode="left_to_right"
    )
    memory: Union[None, int, float, str] = Field(
        default=None, union_mode="left_to_right"
    )
    accelerators: Union[None, str, Dict[str, int]] = Field(
        default=None, union_mode="left_to_right"
    )
    accelerator_args: Optional[Dict[str, str]] = None
    use_spot: Optional[bool] = None
    job_recovery: Optional[str] = None
    region: Optional[str] = None
    zone: Optional[str] = None
    image_id: Union[Dict[str, str], str, None] = Field(
        default=None, union_mode="left_to_right"
    )
    disk_size: Optional[int] = None
    disk_tier: Optional[Literal["high", "medium", "low"]] = None
    # Run settings
    cluster_name: Optional[str] = None
    retry_until_up: bool = False
    idle_minutes_to_autostop: Optional[int] = 30
    down: bool = True
    stream_logs: bool = True
    docker_run_args: List[str] = []
        orchestrators
  
      special
  
    Initialization of the Skypilot ZenML orchestrators.
        skypilot_base_vm_orchestrator
    Implementation of the Skypilot base VM orchestrator.
        
SkypilotBaseOrchestrator            (ContainerizedOrchestrator)
        
    Base class for Orchestrator responsible for running pipelines remotely in a VM.
This orchestrator does not support running on a schedule.
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
          class SkypilotBaseOrchestrator(ContainerizedOrchestrator):
    """Base class for Orchestrator responsible for running pipelines remotely in a VM.
    This orchestrator does not support running on a schedule.
    """
    # The default instance type to use if none is specified in settings
    DEFAULT_INSTANCE_TYPE: Optional[str] = None
    @property
    def validator(self) -> Optional[StackValidator]:
        """Validates the stack.
        In the remote case, checks that the stack contains a container registry,
        image builder and only remote components.
        Returns:
            A `StackValidator` instance.
        """
        def _validate_remote_components(
            stack: "Stack",
        ) -> Tuple[bool, str]:
            for component in stack.components.values():
                if not component.config.is_local:
                    continue
                return False, (
                    f"The Skypilot orchestrator runs pipelines remotely, "
                    f"but the '{component.name}' {component.type.value} is "
                    "a local stack component and will not be available in "
                    "the Skypilot step.\nPlease ensure that you always "
                    "use non-local stack components with the Skypilot "
                    "orchestrator."
                )
            return True, ""
        return StackValidator(
            required_components={
                StackComponentType.CONTAINER_REGISTRY,
                StackComponentType.IMAGE_BUILDER,
            },
            custom_validation_function=_validate_remote_components,
        )
    def get_orchestrator_run_id(self) -> str:
        """Returns the active orchestrator run id.
        Raises:
            RuntimeError: If the environment variable specifying the run id
                is not set.
        Returns:
            The orchestrator run id.
        """
        try:
            return os.environ[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID]
        except KeyError:
            raise RuntimeError(
                "Unable to read run id from environment variable "
                f"{ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID}."
            )
    @property
    def config(self) -> SkypilotBaseOrchestratorConfig:
        """Returns the `SkypilotBaseOrchestratorConfig` config.
        Returns:
            The configuration.
        """
        return cast(SkypilotBaseOrchestratorConfig, self._config)
    @property
    @abstractmethod
    def cloud(self) -> sky.clouds.Cloud:
        """The type of sky cloud to use.
        Returns:
            A `sky.clouds.Cloud` instance.
        """
    def setup_credentials(self) -> None:
        """Set up credentials for the orchestrator."""
        connector = self.get_connector()
        assert connector is not None
        connector.configure_local_client()
    @abstractmethod
    def prepare_environment_variable(self, set: bool = True) -> None:
        """Set up Environment variables that are required for the orchestrator.
        Args:
            set: Whether to set the environment variables or not.
        """
    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeploymentResponse",
        stack: "Stack",
        environment: Dict[str, str],
    ) -> Any:
        """Runs each pipeline step in a separate Skypilot container.
        Args:
            deployment: The pipeline deployment to prepare or run.
            stack: The stack the pipeline will run on.
            environment: Environment variables to set in the orchestration
                environment.
        Raises:
            Exception: If the pipeline run fails.
            RuntimeError: If the code is running in a notebook.
        """
        # First check whether the code is running in a notebook.
        if Environment.in_notebook():
            raise RuntimeError(
                "The Skypilot orchestrator cannot run pipelines in a notebook "
                "environment. The reason is that it is non-trivial to create "
                "a Docker image of a notebook. Please consider refactoring "
                "your notebook cells into separate scripts in a Python module "
                "and run the code outside of a notebook when using this "
                "orchestrator."
            )
        if deployment.schedule:
            logger.warning(
                "Skypilot Orchestrator currently does not support the "
                "use of schedules. The `schedule` will be ignored "
                "and the pipeline will be run immediately."
            )
        # Set up some variables for configuration
        orchestrator_run_id = str(uuid4())
        environment[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID] = (
            orchestrator_run_id
        )
        settings = cast(
            SkypilotBaseOrchestratorSettings,
            self.get_settings(deployment),
        )
        pipeline_name = deployment.pipeline_configuration.name
        orchestrator_run_name = get_orchestrator_run_name(pipeline_name)
        assert stack.container_registry
        # Get Docker image for the orchestrator pod
        try:
            image = self.get_image(deployment=deployment)
        except KeyError:
            # If no generic pipeline image exists (which means all steps have
            # custom builds) we use a random step image as all of them include
            # dependencies for the active stack
            pipeline_step_name = next(iter(deployment.step_configurations))
            image = self.get_image(
                deployment=deployment, step_name=pipeline_step_name
            )
        different_settings_found = False
        if not self.config.disable_step_based_settings:
            for _, step in deployment.step_configurations.items():
                step_settings = cast(
                    SkypilotBaseOrchestratorSettings,
                    self.get_settings(step),
                )
                if step_settings != settings:
                    different_settings_found = True
                    logger.info(
                        "At least one step has different settings than the "
                        "pipeline. The step with different settings will be "
                        "run in a separate VM.\n"
                        "You can configure the orchestrator to disable this "
                        "behavior by updating the `disable_step_based_settings` "
                        "in your orchestrator configuration "
                        "by running the following command: "
                        "`zenml orchestrator update --disable-step-based-settings=True`"
                    )
                    break
        # Decide which configuration to use based on whether different settings were found
        if (
            not self.config.disable_step_based_settings
            and different_settings_found
        ):
            # Run each step in a separate VM using SkypilotOrchestratorEntrypointConfiguration
            command = SkypilotOrchestratorEntrypointConfiguration.get_entrypoint_command()
            args = SkypilotOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
                run_name=orchestrator_run_name,
                deployment_id=deployment.id,
            )
        else:
            # Run the entire pipeline in one VM using PipelineEntrypointConfiguration
            command = PipelineEntrypointConfiguration.get_entrypoint_command()
            args = PipelineEntrypointConfiguration.get_entrypoint_arguments(
                deployment_id=deployment.id
            )
        entrypoint_str = " ".join(command)
        arguments_str = " ".join(args)
        docker_environment_str = " ".join(
            f"-e {k}={v}" for k, v in environment.items()
        )
        custom_run_args = " ".join(settings.docker_run_args)
        if custom_run_args:
            custom_run_args += " "
        instance_type = settings.instance_type or self.DEFAULT_INSTANCE_TYPE
        # Set up credentials
        self.setup_credentials()
        # Guaranteed by stack validation
        assert stack is not None and stack.container_registry is not None
        if docker_creds := stack.container_registry.credentials:
            docker_username, docker_password = docker_creds
            setup = (
                f"sudo docker login --username $DOCKER_USERNAME --password "
                f"$DOCKER_PASSWORD {stack.container_registry.config.uri}"
            )
            task_envs = {
                "DOCKER_USERNAME": docker_username,
                "DOCKER_PASSWORD": docker_password,
            }
        else:
            setup = None
            task_envs = None
        # Run the entire pipeline
        # Set the service connector AWS profile ENV variable
        self.prepare_environment_variable(set=True)
        try:
            task = sky.Task(
                run=f"sudo docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}",
                setup=setup,
                envs=task_envs,
            )
            logger.debug(
                f"Running run: sudo docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}"
            )
            logger.debug(f"Running run: {setup}")
            task = task.set_resources(
                sky.Resources(
                    cloud=self.cloud,
                    instance_type=instance_type,
                    cpus=settings.cpus,
                    memory=settings.memory,
                    accelerators=settings.accelerators,
                    accelerator_args=settings.accelerator_args,
                    use_spot=settings.use_spot,
                    job_recovery=settings.job_recovery,
                    region=settings.region,
                    zone=settings.zone,
                    image_id=settings.image_id,
                    disk_size=settings.disk_size,
                    disk_tier=settings.disk_tier,
                )
            )
            # Set the cluster name
            cluster_name = settings.cluster_name
            if cluster_name is None:
                # Find existing cluster
                for i in sky.status(refresh=True):
                    if isinstance(
                        i["handle"].launched_resources.cloud, type(self.cloud)
                    ):
                        cluster_name = i["handle"].cluster_name
                        logger.info(
                            f"Found existing cluster {cluster_name}. Reusing..."
                        )
            if cluster_name is None:
                cluster_name = self.sanitize_cluster_name(
                    f"{orchestrator_run_name}"
                )
            # Launch the cluster
            sky.launch(
                task,
                cluster_name,
                retry_until_up=settings.retry_until_up,
                idle_minutes_to_autostop=settings.idle_minutes_to_autostop,
                down=settings.down,
                stream_logs=settings.stream_logs,
                detach_setup=True,
            )
        except Exception as e:
            logger.error(f"Pipeline run failed: {e}")
            raise
        finally:
            # Unset the service connector AWS profile ENV variable
            self.prepare_environment_variable(set=False)
    def sanitize_cluster_name(self, name: str) -> str:
        """Sanitize the value to be used in a cluster name.
        Args:
            name: Arbitrary input cluster name.
        Returns:
            Sanitized cluster name.
        """
        name = re.sub(
            r"[^a-z0-9-]", "-", name.lower()
        )  # replaces any character that is not a lowercase letter, digit, or hyphen with a hyphen
        name = re.sub(r"^[-]+", "", name)  # trim leading hyphens
        name = re.sub(r"[-]+$", "", name)  # trim trailing hyphens
        return name
cloud: sky.clouds.Cloud
  
      property
      readonly
  
    The type of sky cloud to use.
Returns:
| Type | Description | 
|---|---|
| sky.clouds.Cloud | A  | 
config: SkypilotBaseOrchestratorConfig
  
      property
      readonly
  
    Returns the SkypilotBaseOrchestratorConfig config.
Returns:
| Type | Description | 
|---|---|
| SkypilotBaseOrchestratorConfig | The configuration. | 
validator: Optional[zenml.stack.stack_validator.StackValidator]
  
      property
      readonly
  
    Validates the stack.
In the remote case, checks that the stack contains a container registry, image builder and only remote components.
Returns:
| Type | Description | 
|---|---|
| Optional[zenml.stack.stack_validator.StackValidator] | A  | 
get_orchestrator_run_id(self)
    Returns the active orchestrator run id.
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If the environment variable specifying the run id is not set. | 
Returns:
| Type | Description | 
|---|---|
| str | The orchestrator run id. | 
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
          def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.
    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.
    Returns:
        The orchestrator run id.
    """
    try:
        return os.environ[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID}."
        )
prepare_environment_variable(self, set=True)
    Set up Environment variables that are required for the orchestrator.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| set | bool | Whether to set the environment variables or not. | True | 
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
          @abstractmethod
def prepare_environment_variable(self, set: bool = True) -> None:
    """Set up Environment variables that are required for the orchestrator.
    Args:
        set: Whether to set the environment variables or not.
    """
prepare_or_run_pipeline(self, deployment, stack, environment)
    Runs each pipeline step in a separate Skypilot container.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| deployment | PipelineDeploymentResponse | The pipeline deployment to prepare or run. | required | 
| stack | Stack | The stack the pipeline will run on. | required | 
| environment | Dict[str, str] | Environment variables to set in the orchestration environment. | required | 
Exceptions:
| Type | Description | 
|---|---|
| Exception | If the pipeline run fails. | 
| RuntimeError | If the code is running in a notebook. | 
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
          def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
) -> Any:
    """Runs each pipeline step in a separate Skypilot container.
    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment.
    Raises:
        Exception: If the pipeline run fails.
        RuntimeError: If the code is running in a notebook.
    """
    # First check whether the code is running in a notebook.
    if Environment.in_notebook():
        raise RuntimeError(
            "The Skypilot orchestrator cannot run pipelines in a notebook "
            "environment. The reason is that it is non-trivial to create "
            "a Docker image of a notebook. Please consider refactoring "
            "your notebook cells into separate scripts in a Python module "
            "and run the code outside of a notebook when using this "
            "orchestrator."
        )
    if deployment.schedule:
        logger.warning(
            "Skypilot Orchestrator currently does not support the "
            "use of schedules. The `schedule` will be ignored "
            "and the pipeline will be run immediately."
        )
    # Set up some variables for configuration
    orchestrator_run_id = str(uuid4())
    environment[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID] = (
        orchestrator_run_id
    )
    settings = cast(
        SkypilotBaseOrchestratorSettings,
        self.get_settings(deployment),
    )
    pipeline_name = deployment.pipeline_configuration.name
    orchestrator_run_name = get_orchestrator_run_name(pipeline_name)
    assert stack.container_registry
    # Get Docker image for the orchestrator pod
    try:
        image = self.get_image(deployment=deployment)
    except KeyError:
        # If no generic pipeline image exists (which means all steps have
        # custom builds) we use a random step image as all of them include
        # dependencies for the active stack
        pipeline_step_name = next(iter(deployment.step_configurations))
        image = self.get_image(
            deployment=deployment, step_name=pipeline_step_name
        )
    different_settings_found = False
    if not self.config.disable_step_based_settings:
        for _, step in deployment.step_configurations.items():
            step_settings = cast(
                SkypilotBaseOrchestratorSettings,
                self.get_settings(step),
            )
            if step_settings != settings:
                different_settings_found = True
                logger.info(
                    "At least one step has different settings than the "
                    "pipeline. The step with different settings will be "
                    "run in a separate VM.\n"
                    "You can configure the orchestrator to disable this "
                    "behavior by updating the `disable_step_based_settings` "
                    "in your orchestrator configuration "
                    "by running the following command: "
                    "`zenml orchestrator update --disable-step-based-settings=True`"
                )
                break
    # Decide which configuration to use based on whether different settings were found
    if (
        not self.config.disable_step_based_settings
        and different_settings_found
    ):
        # Run each step in a separate VM using SkypilotOrchestratorEntrypointConfiguration
        command = SkypilotOrchestratorEntrypointConfiguration.get_entrypoint_command()
        args = SkypilotOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
            run_name=orchestrator_run_name,
            deployment_id=deployment.id,
        )
    else:
        # Run the entire pipeline in one VM using PipelineEntrypointConfiguration
        command = PipelineEntrypointConfiguration.get_entrypoint_command()
        args = PipelineEntrypointConfiguration.get_entrypoint_arguments(
            deployment_id=deployment.id
        )
    entrypoint_str = " ".join(command)
    arguments_str = " ".join(args)
    docker_environment_str = " ".join(
        f"-e {k}={v}" for k, v in environment.items()
    )
    custom_run_args = " ".join(settings.docker_run_args)
    if custom_run_args:
        custom_run_args += " "
    instance_type = settings.instance_type or self.DEFAULT_INSTANCE_TYPE
    # Set up credentials
    self.setup_credentials()
    # Guaranteed by stack validation
    assert stack is not None and stack.container_registry is not None
    if docker_creds := stack.container_registry.credentials:
        docker_username, docker_password = docker_creds
        setup = (
            f"sudo docker login --username $DOCKER_USERNAME --password "
            f"$DOCKER_PASSWORD {stack.container_registry.config.uri}"
        )
        task_envs = {
            "DOCKER_USERNAME": docker_username,
            "DOCKER_PASSWORD": docker_password,
        }
    else:
        setup = None
        task_envs = None
    # Run the entire pipeline
    # Set the service connector AWS profile ENV variable
    self.prepare_environment_variable(set=True)
    try:
        task = sky.Task(
            run=f"sudo docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}",
            setup=setup,
            envs=task_envs,
        )
        logger.debug(
            f"Running run: sudo docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}"
        )
        logger.debug(f"Running run: {setup}")
        task = task.set_resources(
            sky.Resources(
                cloud=self.cloud,
                instance_type=instance_type,
                cpus=settings.cpus,
                memory=settings.memory,
                accelerators=settings.accelerators,
                accelerator_args=settings.accelerator_args,
                use_spot=settings.use_spot,
                job_recovery=settings.job_recovery,
                region=settings.region,
                zone=settings.zone,
                image_id=settings.image_id,
                disk_size=settings.disk_size,
                disk_tier=settings.disk_tier,
            )
        )
        # Set the cluster name
        cluster_name = settings.cluster_name
        if cluster_name is None:
            # Find existing cluster
            for i in sky.status(refresh=True):
                if isinstance(
                    i["handle"].launched_resources.cloud, type(self.cloud)
                ):
                    cluster_name = i["handle"].cluster_name
                    logger.info(
                        f"Found existing cluster {cluster_name}. Reusing..."
                    )
        if cluster_name is None:
            cluster_name = self.sanitize_cluster_name(
                f"{orchestrator_run_name}"
            )
        # Launch the cluster
        sky.launch(
            task,
            cluster_name,
            retry_until_up=settings.retry_until_up,
            idle_minutes_to_autostop=settings.idle_minutes_to_autostop,
            down=settings.down,
            stream_logs=settings.stream_logs,
            detach_setup=True,
        )
    except Exception as e:
        logger.error(f"Pipeline run failed: {e}")
        raise
    finally:
        # Unset the service connector AWS profile ENV variable
        self.prepare_environment_variable(set=False)
sanitize_cluster_name(self, name)
    Sanitize the value to be used in a cluster name.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| name | str | Arbitrary input cluster name. | required | 
Returns:
| Type | Description | 
|---|---|
| str | Sanitized cluster name. | 
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
          def sanitize_cluster_name(self, name: str) -> str:
    """Sanitize the value to be used in a cluster name.
    Args:
        name: Arbitrary input cluster name.
    Returns:
        Sanitized cluster name.
    """
    name = re.sub(
        r"[^a-z0-9-]", "-", name.lower()
    )  # replaces any character that is not a lowercase letter, digit, or hyphen with a hyphen
    name = re.sub(r"^[-]+", "", name)  # trim leading hyphens
    name = re.sub(r"[-]+$", "", name)  # trim trailing hyphens
    return name
setup_credentials(self)
    Set up credentials for the orchestrator.
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
          def setup_credentials(self) -> None:
    """Set up credentials for the orchestrator."""
    connector = self.get_connector()
    assert connector is not None
    connector.configure_local_client()
        skypilot_orchestrator_entrypoint
    Entrypoint of the Skypilot master/orchestrator VM.
main()
    Entrypoint of the Skypilot master/orchestrator VM.
This is the entrypoint of the Skypilot master/orchestrator VM. It is responsible for provisioning the VM and running the pipeline steps in separate VMs.
The VM is provisioned using the sky library. The pipeline steps are run
using the sky library as well.
Exceptions:
| Type | Description | 
|---|---|
| TypeError | If the active stack's orchestrator is not an instance of SkypilotBaseOrchestrator. | 
| ValueError | If the active stack's container registry is None. | 
Source code in zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint.py
          def main() -> None:
    """Entrypoint of the Skypilot master/orchestrator VM.
    This is the entrypoint of the Skypilot master/orchestrator VM. It is
    responsible for provisioning the VM and running the pipeline steps in
    separate VMs.
    The VM is provisioned using the `sky` library. The pipeline steps are run
    using the `sky` library as well.
    Raises:
        TypeError: If the active stack's orchestrator is not an instance of
            SkypilotBaseOrchestrator.
        ValueError: If the active stack's container registry is None.
    """
    # Log to the container's stdout so it can be streamed by the client.
    logger.info("Skypilot orchestrator VM started.")
    # Parse / extract args.
    args = parse_args()
    orchestrator_run_id = socket.gethostname()
    deployment = Client().get_deployment(args.deployment_id)
    pipeline_dag = {
        step_name: step.spec.upstream_steps
        for step_name, step in deployment.step_configurations.items()
    }
    step_command = StepEntrypointConfiguration.get_entrypoint_command()
    entrypoint_str = " ".join(step_command)
    active_stack = Client().active_stack
    orchestrator = active_stack.orchestrator
    if not isinstance(orchestrator, SkypilotBaseOrchestrator):
        raise TypeError(
            "The active stack's orchestrator is not an instance of SkypilotBaseOrchestrator."
        )
    # Set up credentials
    orchestrator.setup_credentials()
    # Set the service connector AWS profile ENV variable
    orchestrator.prepare_environment_variable(set=True)
    # get active container registry
    container_registry = active_stack.container_registry
    if container_registry is None:
        raise ValueError("Container registry cannot be None.")
    if docker_creds := container_registry.credentials:
        docker_username, docker_password = docker_creds
        setup = (
            f"docker login --username $DOCKER_USERNAME --password "
            f"$DOCKER_PASSWORD {container_registry.config.uri}"
        )
        task_envs = {
            "DOCKER_USERNAME": docker_username,
            "DOCKER_PASSWORD": docker_password,
        }
    else:
        setup = None
        task_envs = None
    unique_resource_configs: Dict[str, str] = {}
    for step_name, step in deployment.step_configurations.items():
        settings = cast(
            SkypilotBaseOrchestratorSettings,
            orchestrator.get_settings(step),
        )
        # Handle both str and Dict[str, int] types for accelerators
        if isinstance(settings.accelerators, dict):
            accelerators_hashable = frozenset(settings.accelerators.items())
        elif isinstance(settings.accelerators, str):
            accelerators_hashable = frozenset({(settings.accelerators, 1)})
        else:
            accelerators_hashable = None
        resource_config = (
            settings.instance_type,
            settings.cpus,
            settings.memory,
            settings.disk_size,  # Assuming disk_size is part of the settings
            settings.disk_tier,  # Assuming disk_tier is part of the settings
            settings.use_spot,
            settings.job_recovery,
            settings.region,
            settings.zone,
            accelerators_hashable,
        )
        cluster_name_parts = [
            orchestrator.sanitize_cluster_name(str(part))
            for part in resource_config
            if part is not None
        ]
        cluster_name = f"cluster-{orchestrator_run_id}" + "-".join(
            cluster_name_parts
        )
        unique_resource_configs[step_name] = cluster_name
    run = Client().list_pipeline_runs(
        sort_by="asc:created",
        size=1,
        deployment_id=args.deployment_id,
        status=ExecutionStatus.INITIALIZING,
    )[0]
    logger.info("Fetching pipeline run: %s", run.id)
    def run_step_on_skypilot_vm(step_name: str) -> None:
        """Run a pipeline step in a separate Skypilot VM.
        Args:
            step_name: Name of the step.
        """
        cluster_name = unique_resource_configs[step_name]
        image = SkypilotBaseOrchestrator.get_image(
            deployment=deployment, step_name=step_name
        )
        step_args = StepEntrypointConfiguration.get_entrypoint_arguments(
            step_name=step_name, deployment_id=deployment.id
        )
        arguments_str = " ".join(step_args)
        step = deployment.step_configurations[step_name]
        settings = cast(
            SkypilotBaseOrchestratorSettings,
            orchestrator.get_settings(step),
        )
        env = get_config_environment_vars()
        env[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID] = orchestrator_run_id
        docker_environment_str = " ".join(
            f"-e {k}={v}" for k, v in env.items()
        )
        custom_run_args = " ".join(settings.docker_run_args)
        if custom_run_args:
            custom_run_args += " "
        # Set up the task
        run_command = f"docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}"
        task_name = f"{deployment.id}-{step_name}-{time.time()}"
        task = sky.Task(
            run=run_command,
            setup=setup,
            envs=task_envs,
            name=task_name,
        )
        task = task.set_resources(
            sky.Resources(
                cloud=orchestrator.cloud,
                instance_type=settings.instance_type
                or orchestrator.DEFAULT_INSTANCE_TYPE,
                cpus=settings.cpus,
                memory=settings.memory,
                disk_size=settings.disk_size,
                disk_tier=settings.disk_tier,
                accelerators=settings.accelerators,
                accelerator_args=settings.accelerator_args,
                use_spot=settings.use_spot,
                job_recovery=settings.job_recovery,
                region=settings.region,
                zone=settings.zone,
                image_id=settings.image_id,
            )
        )
        sky.launch(
            task,
            cluster_name,
            retry_until_up=settings.retry_until_up,
            idle_minutes_to_autostop=settings.idle_minutes_to_autostop,
            down=settings.down,
            stream_logs=settings.stream_logs,
            detach_setup=True,
            detach_run=True,
        )
        # Wait for pod to finish.
        logger.info(f"Waiting for pod of step `{step_name}` to start...")
        current_run = Client().get_pipeline_run(run.id)
        step_is_finished = False
        while not step_is_finished:
            time.sleep(10)
            current_run = Client().get_pipeline_run(run.id)
            try:
                step_is_finished = current_run.steps[
                    step_name
                ].status.is_finished
            except KeyError:
                # Step is not yet in the run, so we wait for it to appear
                continue
        # Pop the resource configuration for this step
        unique_resource_configs.pop(step_name)
        if cluster_name in unique_resource_configs.values():
            # If there are more steps using this configuration, skip deprovisioning the cluster
            logger.info(
                f"Resource configuration for cluster '{cluster_name}' "
                "is used by subsequent steps. Skipping the deprovisioning of "
                "the cluster."
            )
        else:
            # If there are no more steps using this configuration, down the cluster
            logger.info(
                f"Resource configuration for cluster '{cluster_name}' "
                "is not used by subsequent steps. deprovisioning the cluster."
            )
            sky.down(cluster_name)
        logger.info(f"Running step `{step_name}` on a VM is completed.")
    ThreadedDagRunner(dag=pipeline_dag, run_fn=run_step_on_skypilot_vm).run()
    logger.info("Orchestration VM provisioned.")
parse_args()
    Parse entrypoint arguments.
Returns:
| Type | Description | 
|---|---|
| Namespace | Parsed args. | 
Source code in zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint.py
          def parse_args() -> argparse.Namespace:
    """Parse entrypoint arguments.
    Returns:
        Parsed args.
    """
    parser = argparse.ArgumentParser()
    parser.add_argument("--run_name", type=str, required=True)
    parser.add_argument("--deployment_id", type=str, required=True)
    return parser.parse_args()
        skypilot_orchestrator_entrypoint_configuration
    Entrypoint configuration for the Skypilot master/orchestrator VM.
        
SkypilotOrchestratorEntrypointConfiguration        
    Entrypoint configuration for the Skypilot master/orchestrator VM.
Source code in zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint_configuration.py
          class SkypilotOrchestratorEntrypointConfiguration:
    """Entrypoint configuration for the Skypilot master/orchestrator VM."""
    @classmethod
    def get_entrypoint_options(cls) -> Set[str]:
        """Gets all the options required for running this entrypoint.
        Returns:
            Entrypoint options.
        """
        options = {
            RUN_NAME_OPTION,
            DEPLOYMENT_ID_OPTION,
        }
        return options
    @classmethod
    def get_entrypoint_command(cls) -> List[str]:
        """Returns a command that runs the entrypoint module.
        Returns:
            Entrypoint command.
        """
        command = [
            "python",
            "-m",
            "zenml.integrations.skypilot.orchestrators.skypilot_orchestrator_entrypoint",
        ]
        return command
    @classmethod
    def get_entrypoint_arguments(
        cls,
        run_name: str,
        deployment_id: "UUID",
    ) -> List[str]:
        """Gets all arguments that the entrypoint command should be called with.
        Args:
            run_name: Name of the ZenML run.
            deployment_id: ID of the deployment.
        Returns:
            List of entrypoint arguments.
        """
        args = [
            f"--{RUN_NAME_OPTION}",
            run_name,
            f"--{DEPLOYMENT_ID_OPTION}",
            str(deployment_id),
        ]
        return args
get_entrypoint_arguments(run_name, deployment_id)
  
      classmethod
  
    Gets all arguments that the entrypoint command should be called with.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| run_name | str | Name of the ZenML run. | required | 
| deployment_id | UUID | ID of the deployment. | required | 
Returns:
| Type | Description | 
|---|---|
| List[str] | List of entrypoint arguments. | 
Source code in zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint_configuration.py
          @classmethod
def get_entrypoint_arguments(
    cls,
    run_name: str,
    deployment_id: "UUID",
) -> List[str]:
    """Gets all arguments that the entrypoint command should be called with.
    Args:
        run_name: Name of the ZenML run.
        deployment_id: ID of the deployment.
    Returns:
        List of entrypoint arguments.
    """
    args = [
        f"--{RUN_NAME_OPTION}",
        run_name,
        f"--{DEPLOYMENT_ID_OPTION}",
        str(deployment_id),
    ]
    return args
get_entrypoint_command()
  
      classmethod
  
    Returns a command that runs the entrypoint module.
Returns:
| Type | Description | 
|---|---|
| List[str] | Entrypoint command. | 
Source code in zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint_configuration.py
          @classmethod
def get_entrypoint_command(cls) -> List[str]:
    """Returns a command that runs the entrypoint module.
    Returns:
        Entrypoint command.
    """
    command = [
        "python",
        "-m",
        "zenml.integrations.skypilot.orchestrators.skypilot_orchestrator_entrypoint",
    ]
    return command
get_entrypoint_options()
  
      classmethod
  
    Gets all the options required for running this entrypoint.
Returns:
| Type | Description | 
|---|---|
| Set[str] | Entrypoint options. | 
Source code in zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint_configuration.py
          @classmethod
def get_entrypoint_options(cls) -> Set[str]:
    """Gets all the options required for running this entrypoint.
    Returns:
        Entrypoint options.
    """
    options = {
        RUN_NAME_OPTION,
        DEPLOYMENT_ID_OPTION,
    }
    return options