Skypilot
zenml.integrations.skypilot
special
flavors
special
skypilot_orchestrator_base_vm_config
Skypilot orchestrator base config and settings.
SkypilotBaseOrchestratorConfig (BaseOrchestratorConfig, SkypilotBaseOrchestratorSettings)
Skypilot orchestrator base config.
Attributes:
Name | Type | Description |
---|---|---|
disable_step_based_settings |
bool |
whether to disable step-based settings. If True, the orchestrator will run all steps with the pipeline settings in one single VM. If False, the orchestrator will run each step with its own settings in separate VMs if provided. |
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py
class SkypilotBaseOrchestratorConfig(
BaseOrchestratorConfig, SkypilotBaseOrchestratorSettings
):
"""Skypilot orchestrator base config.
Attributes:
disable_step_based_settings: whether to disable step-based settings.
If True, the orchestrator will run all steps with the pipeline
settings in one single VM. If False, the orchestrator will run
each step with its own settings in separate VMs if provided.
"""
disable_step_based_settings: bool = False
@property
def is_local(self) -> bool:
"""Checks if this stack component is running locally.
Returns:
True if this config is for a local component, False otherwise.
"""
return False
@property
def supports_client_side_caching(self) -> bool:
"""Whether the orchestrator supports client side caching.
Returns:
Whether the orchestrator supports client side caching.
"""
# The Skypilot orchestrator runs the entire pipeline in a single VM, or
# starts additional VMs from the root VM. Both of those cases are
# currently not supported when using client-side caching.
return False
is_local: bool
property
readonly
Checks if this stack component is running locally.
Returns:
Type | Description |
---|---|
bool |
True if this config is for a local component, False otherwise. |
supports_client_side_caching: bool
property
readonly
Whether the orchestrator supports client side caching.
Returns:
Type | Description |
---|---|
bool |
Whether the orchestrator supports client side caching. |
SkypilotBaseOrchestratorSettings (BaseSettings)
Skypilot orchestrator base settings.
Attributes:
Name | Type | Description |
---|---|---|
instance_type |
Optional[str] |
the instance type to use. |
cpus |
Union[NoneType, int, float, str] |
the number of CPUs required for the task.
If a str, must be a string of the form |
memory |
Union[NoneType, int, float, str] |
the amount of memory in GiB required. If a
str, must be a string of the form |
accelerators |
Union[NoneType, str, Dict[str, int]] |
the accelerators required. If a str, must be
a string of the form |
accelerator_args |
Optional[Dict[str, str]] |
accelerator-specific arguments. For example,
|
use_spot |
Optional[bool] |
whether to use spot instances. If None, defaults to False. |
job_recovery |
Optional[str] |
the spot recovery strategy to use for the managed
spot to recover the cluster from preemption. Refer to
|
region |
Optional[str] |
the region to use. |
zone |
Optional[str] |
the zone to use. |
image_id |
Union[Dict[str, str], str] |
the image ID to use. If a str, must be a string
of the image id from the cloud, such as AWS:
.. code-block:: python
|
disk_size |
Optional[int] |
the size of the OS disk in GiB. |
disk_tier |
Optional[Literal['high', 'medium', 'low']] |
the disk performance tier to use. If None, defaults to
|
cluster_name |
Optional[str] |
name of the cluster to create/reuse. If None, auto-generate a name. |
retry_until_up |
bool |
whether to retry launching the cluster until it is up. |
idle_minutes_to_autostop |
Optional[int] |
automatically stop the cluster after this
many minute of idleness, i.e., no running or pending jobs in the
cluster's job queue. Idleness gets reset whenever setting-up/
running/pending jobs are found in the job queue. Setting this
flag is equivalent to running
|
down |
bool |
Tear down the cluster after all jobs finish (successfully or abnormally). If --idle-minutes-to-autostop is also set, the cluster will be torn down after the specified idle time. Note that if errors occur during provisioning/data syncing/setting up, the cluster will not be torn down for debugging purposes. |
stream_logs |
bool |
if True, show the logs in the terminal. |
docker_run_args |
List[str] |
Optional arguments to pass to the |
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py
class SkypilotBaseOrchestratorSettings(BaseSettings):
"""Skypilot orchestrator base settings.
Attributes:
instance_type: the instance type to use.
cpus: the number of CPUs required for the task.
If a str, must be a string of the form `'2'` or `'2+'`, where
the `+` indicates that the task requires at least 2 CPUs.
memory: the amount of memory in GiB required. If a
str, must be a string of the form `'16'` or `'16+'`, where
the `+` indicates that the task requires at least 16 GB of memory.
accelerators: the accelerators required. If a str, must be
a string of the form `'V100'` or `'V100:2'`, where the `:2`
indicates that the task requires 2 V100 GPUs. If a dict, must be a
dict of the form `{'V100': 2}` or `{'tpu-v2-8': 1}`.
accelerator_args: accelerator-specific arguments. For example,
`{'tpu_vm': True, 'runtime_version': 'tpu-vm-base'}` for TPUs.
use_spot: whether to use spot instances. If None, defaults to
False.
job_recovery: the spot recovery strategy to use for the managed
spot to recover the cluster from preemption. Refer to
`recovery_strategy module <https://github.com/skypilot-org/skypilot/blob/master/sky/spot/recovery_strategy.py>`__ # pylint: disable=line-too-long
for more details.
region: the region to use.
zone: the zone to use.
image_id: the image ID to use. If a str, must be a string
of the image id from the cloud, such as AWS:
``'ami-1234567890abcdef0'``, GCP:
``'projects/my-project-id/global/images/my-image-name'``;
Or, a image tag provided by SkyPilot, such as AWS:
``'skypilot:gpu-ubuntu-2004'``. If a dict, must be a dict mapping
from region to image ID, such as:
.. code-block:: python
{
'us-west1': 'ami-1234567890abcdef0',
'us-east1': 'ami-1234567890abcdef0'
}
disk_size: the size of the OS disk in GiB.
disk_tier: the disk performance tier to use. If None, defaults to
``'medium'``.
cluster_name: name of the cluster to create/reuse. If None,
auto-generate a name.
retry_until_up: whether to retry launching the cluster until it is
up.
idle_minutes_to_autostop: automatically stop the cluster after this
many minute of idleness, i.e., no running or pending jobs in the
cluster's job queue. Idleness gets reset whenever setting-up/
running/pending jobs are found in the job queue. Setting this
flag is equivalent to running
``sky.launch(..., detach_run=True, ...)`` and then
``sky.autostop(idle_minutes=<minutes>)``. If not set, the cluster
will not be autostopped.
down: Tear down the cluster after all jobs finish (successfully or
abnormally). If --idle-minutes-to-autostop is also set, the
cluster will be torn down after the specified idle time.
Note that if errors occur during provisioning/data syncing/setting
up, the cluster will not be torn down for debugging purposes.
stream_logs: if True, show the logs in the terminal.
docker_run_args: Optional arguments to pass to the `docker run` command
running inside the VM.
"""
# Resources
instance_type: Optional[str] = None
cpus: Union[None, int, float, str] = Field(
default=None, union_mode="left_to_right"
)
memory: Union[None, int, float, str] = Field(
default=None, union_mode="left_to_right"
)
accelerators: Union[None, str, Dict[str, int]] = Field(
default=None, union_mode="left_to_right"
)
accelerator_args: Optional[Dict[str, str]] = None
use_spot: Optional[bool] = None
job_recovery: Optional[str] = None
region: Optional[str] = None
zone: Optional[str] = None
image_id: Union[Dict[str, str], str, None] = Field(
default=None, union_mode="left_to_right"
)
disk_size: Optional[int] = None
disk_tier: Optional[Literal["high", "medium", "low"]] = None
# Run settings
cluster_name: Optional[str] = None
retry_until_up: bool = False
idle_minutes_to_autostop: Optional[int] = 30
down: bool = True
stream_logs: bool = True
docker_run_args: List[str] = []
orchestrators
special
Initialization of the Skypilot ZenML orchestrators.
skypilot_base_vm_orchestrator
Implementation of the Skypilot base VM orchestrator.
SkypilotBaseOrchestrator (ContainerizedOrchestrator)
Base class for Orchestrator responsible for running pipelines remotely in a VM.
This orchestrator does not support running on a schedule.
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
class SkypilotBaseOrchestrator(ContainerizedOrchestrator):
"""Base class for Orchestrator responsible for running pipelines remotely in a VM.
This orchestrator does not support running on a schedule.
"""
# The default instance type to use if none is specified in settings
DEFAULT_INSTANCE_TYPE: Optional[str] = None
@property
def validator(self) -> Optional[StackValidator]:
"""Validates the stack.
In the remote case, checks that the stack contains a container registry,
image builder and only remote components.
Returns:
A `StackValidator` instance.
"""
def _validate_remote_components(
stack: "Stack",
) -> Tuple[bool, str]:
for component in stack.components.values():
if not component.config.is_local:
continue
return False, (
f"The Skypilot orchestrator runs pipelines remotely, "
f"but the '{component.name}' {component.type.value} is "
"a local stack component and will not be available in "
"the Skypilot step.\nPlease ensure that you always "
"use non-local stack components with the Skypilot "
"orchestrator."
)
return True, ""
return StackValidator(
required_components={
StackComponentType.CONTAINER_REGISTRY,
StackComponentType.IMAGE_BUILDER,
},
custom_validation_function=_validate_remote_components,
)
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If the environment variable specifying the run id
is not set.
Returns:
The orchestrator run id.
"""
try:
return os.environ[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID]
except KeyError:
raise RuntimeError(
"Unable to read run id from environment variable "
f"{ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID}."
)
@property
def config(self) -> SkypilotBaseOrchestratorConfig:
"""Returns the `SkypilotBaseOrchestratorConfig` config.
Returns:
The configuration.
"""
return cast(SkypilotBaseOrchestratorConfig, self._config)
@property
@abstractmethod
def cloud(self) -> sky.clouds.Cloud:
"""The type of sky cloud to use.
Returns:
A `sky.clouds.Cloud` instance.
"""
def setup_credentials(self) -> None:
"""Set up credentials for the orchestrator."""
connector = self.get_connector()
assert connector is not None
connector.configure_local_client()
@abstractmethod
def prepare_environment_variable(self, set: bool = True) -> None:
"""Set up Environment variables that are required for the orchestrator.
Args:
set: Whether to set the environment variables or not.
"""
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeploymentResponse",
stack: "Stack",
environment: Dict[str, str],
) -> Any:
"""Runs each pipeline step in a separate Skypilot container.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
environment: Environment variables to set in the orchestration
environment.
Raises:
Exception: If the pipeline run fails.
RuntimeError: If the code is running in a notebook.
"""
# First check whether the code is running in a notebook.
if Environment.in_notebook():
raise RuntimeError(
"The Skypilot orchestrator cannot run pipelines in a notebook "
"environment. The reason is that it is non-trivial to create "
"a Docker image of a notebook. Please consider refactoring "
"your notebook cells into separate scripts in a Python module "
"and run the code outside of a notebook when using this "
"orchestrator."
)
if deployment.schedule:
logger.warning(
"Skypilot Orchestrator currently does not support the "
"use of schedules. The `schedule` will be ignored "
"and the pipeline will be run immediately."
)
# Set up some variables for configuration
orchestrator_run_id = str(uuid4())
environment[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID] = (
orchestrator_run_id
)
settings = cast(
SkypilotBaseOrchestratorSettings,
self.get_settings(deployment),
)
pipeline_name = deployment.pipeline_configuration.name
orchestrator_run_name = get_orchestrator_run_name(pipeline_name)
assert stack.container_registry
# Get Docker image for the orchestrator pod
try:
image = self.get_image(deployment=deployment)
except KeyError:
# If no generic pipeline image exists (which means all steps have
# custom builds) we use a random step image as all of them include
# dependencies for the active stack
pipeline_step_name = next(iter(deployment.step_configurations))
image = self.get_image(
deployment=deployment, step_name=pipeline_step_name
)
different_settings_found = False
if not self.config.disable_step_based_settings:
for _, step in deployment.step_configurations.items():
step_settings = cast(
SkypilotBaseOrchestratorSettings,
self.get_settings(step),
)
if step_settings != settings:
different_settings_found = True
logger.info(
"At least one step has different settings than the "
"pipeline. The step with different settings will be "
"run in a separate VM.\n"
"You can configure the orchestrator to disable this "
"behavior by updating the `disable_step_based_settings` "
"in your orchestrator configuration "
"by running the following command: "
"`zenml orchestrator update --disable-step-based-settings=True`"
)
break
# Decide which configuration to use based on whether different settings were found
if (
not self.config.disable_step_based_settings
and different_settings_found
):
# Run each step in a separate VM using SkypilotOrchestratorEntrypointConfiguration
command = SkypilotOrchestratorEntrypointConfiguration.get_entrypoint_command()
args = SkypilotOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
run_name=orchestrator_run_name,
deployment_id=deployment.id,
)
else:
# Run the entire pipeline in one VM using PipelineEntrypointConfiguration
command = PipelineEntrypointConfiguration.get_entrypoint_command()
args = PipelineEntrypointConfiguration.get_entrypoint_arguments(
deployment_id=deployment.id
)
entrypoint_str = " ".join(command)
arguments_str = " ".join(args)
task_envs = environment
docker_environment_str = " ".join(
f"-e {k}={v}" for k, v in environment.items()
)
custom_run_args = " ".join(settings.docker_run_args)
if custom_run_args:
custom_run_args += " "
instance_type = settings.instance_type or self.DEFAULT_INSTANCE_TYPE
# Set up credentials
self.setup_credentials()
# Guaranteed by stack validation
assert stack is not None and stack.container_registry is not None
if docker_creds := stack.container_registry.credentials:
docker_username, docker_password = docker_creds
setup = (
f"sudo docker login --username $DOCKER_USERNAME --password "
f"$DOCKER_PASSWORD {stack.container_registry.config.uri}"
)
task_envs["DOCKER_USERNAME"] = docker_username
task_envs["DOCKER_PASSWORD"] = docker_password
else:
setup = None
# Run the entire pipeline
# Set the service connector AWS profile ENV variable
self.prepare_environment_variable(set=True)
try:
if isinstance(self.cloud, sky.clouds.Kubernetes):
run_command = f"${{VIRTUAL_ENV:+$VIRTUAL_ENV/bin/}}{entrypoint_str} {arguments_str}"
setup = None
down = False
idle_minutes_to_autostop = None
else:
run_command = f"sudo docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}"
down = settings.down
idle_minutes_to_autostop = settings.idle_minutes_to_autostop
task = sky.Task(
run=run_command,
setup=setup,
envs=task_envs,
)
logger.debug(f"Running run: {run_command}")
task = task.set_resources(
sky.Resources(
cloud=self.cloud,
instance_type=instance_type,
cpus=settings.cpus,
memory=settings.memory,
accelerators=settings.accelerators,
accelerator_args=settings.accelerator_args,
use_spot=settings.use_spot,
job_recovery=settings.job_recovery,
region=settings.region,
zone=settings.zone,
image_id=image
if isinstance(self.cloud, sky.clouds.Kubernetes)
else settings.image_id,
disk_size=settings.disk_size,
disk_tier=settings.disk_tier,
)
)
# Set the cluster name
if settings.cluster_name:
sky.exec(
task,
settings.cluster_name,
down=down,
stream_logs=settings.stream_logs,
backend=None,
detach_run=True,
)
else:
# Find existing cluster
for i in sky.status(refresh=True):
if isinstance(
i["handle"].launched_resources.cloud, type(self.cloud)
):
cluster_name = i["handle"].cluster_name
logger.info(
f"Found existing cluster {cluster_name}. Reusing..."
)
cluster_name = self.sanitize_cluster_name(
f"{orchestrator_run_name}"
)
# Launch the cluster
sky.launch(
task,
cluster_name,
retry_until_up=settings.retry_until_up,
idle_minutes_to_autostop=idle_minutes_to_autostop,
down=down,
stream_logs=settings.stream_logs,
detach_setup=True,
)
except Exception as e:
logger.error(f"Pipeline run failed: {e}")
raise
finally:
# Unset the service connector AWS profile ENV variable
self.prepare_environment_variable(set=False)
def sanitize_cluster_name(self, name: str) -> str:
"""Sanitize the value to be used in a cluster name.
Args:
name: Arbitrary input cluster name.
Returns:
Sanitized cluster name.
"""
name = re.sub(
r"[^a-z0-9-]", "-", name.lower()
) # replaces any character that is not a lowercase letter, digit, or hyphen with a hyphen
name = re.sub(r"^[-]+", "", name) # trim leading hyphens
name = re.sub(r"[-]+$", "", name) # trim trailing hyphens
return name
cloud: sky.clouds.Cloud
property
readonly
The type of sky cloud to use.
Returns:
Type | Description |
---|---|
sky.clouds.Cloud |
A |
config: SkypilotBaseOrchestratorConfig
property
readonly
Returns the SkypilotBaseOrchestratorConfig
config.
Returns:
Type | Description |
---|---|
SkypilotBaseOrchestratorConfig |
The configuration. |
validator: Optional[zenml.stack.stack_validator.StackValidator]
property
readonly
Validates the stack.
In the remote case, checks that the stack contains a container registry, image builder and only remote components.
Returns:
Type | Description |
---|---|
Optional[zenml.stack.stack_validator.StackValidator] |
A |
get_orchestrator_run_id(self)
Returns the active orchestrator run id.
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the environment variable specifying the run id is not set. |
Returns:
Type | Description |
---|---|
str |
The orchestrator run id. |
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If the environment variable specifying the run id
is not set.
Returns:
The orchestrator run id.
"""
try:
return os.environ[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID]
except KeyError:
raise RuntimeError(
"Unable to read run id from environment variable "
f"{ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID}."
)
prepare_environment_variable(self, set=True)
Set up Environment variables that are required for the orchestrator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
set |
bool |
Whether to set the environment variables or not. |
True |
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
@abstractmethod
def prepare_environment_variable(self, set: bool = True) -> None:
"""Set up Environment variables that are required for the orchestrator.
Args:
set: Whether to set the environment variables or not.
"""
prepare_or_run_pipeline(self, deployment, stack, environment)
Runs each pipeline step in a separate Skypilot container.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentResponse |
The pipeline deployment to prepare or run. |
required |
stack |
Stack |
The stack the pipeline will run on. |
required |
environment |
Dict[str, str] |
Environment variables to set in the orchestration environment. |
required |
Exceptions:
Type | Description |
---|---|
Exception |
If the pipeline run fails. |
RuntimeError |
If the code is running in a notebook. |
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeploymentResponse",
stack: "Stack",
environment: Dict[str, str],
) -> Any:
"""Runs each pipeline step in a separate Skypilot container.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
environment: Environment variables to set in the orchestration
environment.
Raises:
Exception: If the pipeline run fails.
RuntimeError: If the code is running in a notebook.
"""
# First check whether the code is running in a notebook.
if Environment.in_notebook():
raise RuntimeError(
"The Skypilot orchestrator cannot run pipelines in a notebook "
"environment. The reason is that it is non-trivial to create "
"a Docker image of a notebook. Please consider refactoring "
"your notebook cells into separate scripts in a Python module "
"and run the code outside of a notebook when using this "
"orchestrator."
)
if deployment.schedule:
logger.warning(
"Skypilot Orchestrator currently does not support the "
"use of schedules. The `schedule` will be ignored "
"and the pipeline will be run immediately."
)
# Set up some variables for configuration
orchestrator_run_id = str(uuid4())
environment[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID] = (
orchestrator_run_id
)
settings = cast(
SkypilotBaseOrchestratorSettings,
self.get_settings(deployment),
)
pipeline_name = deployment.pipeline_configuration.name
orchestrator_run_name = get_orchestrator_run_name(pipeline_name)
assert stack.container_registry
# Get Docker image for the orchestrator pod
try:
image = self.get_image(deployment=deployment)
except KeyError:
# If no generic pipeline image exists (which means all steps have
# custom builds) we use a random step image as all of them include
# dependencies for the active stack
pipeline_step_name = next(iter(deployment.step_configurations))
image = self.get_image(
deployment=deployment, step_name=pipeline_step_name
)
different_settings_found = False
if not self.config.disable_step_based_settings:
for _, step in deployment.step_configurations.items():
step_settings = cast(
SkypilotBaseOrchestratorSettings,
self.get_settings(step),
)
if step_settings != settings:
different_settings_found = True
logger.info(
"At least one step has different settings than the "
"pipeline. The step with different settings will be "
"run in a separate VM.\n"
"You can configure the orchestrator to disable this "
"behavior by updating the `disable_step_based_settings` "
"in your orchestrator configuration "
"by running the following command: "
"`zenml orchestrator update --disable-step-based-settings=True`"
)
break
# Decide which configuration to use based on whether different settings were found
if (
not self.config.disable_step_based_settings
and different_settings_found
):
# Run each step in a separate VM using SkypilotOrchestratorEntrypointConfiguration
command = SkypilotOrchestratorEntrypointConfiguration.get_entrypoint_command()
args = SkypilotOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
run_name=orchestrator_run_name,
deployment_id=deployment.id,
)
else:
# Run the entire pipeline in one VM using PipelineEntrypointConfiguration
command = PipelineEntrypointConfiguration.get_entrypoint_command()
args = PipelineEntrypointConfiguration.get_entrypoint_arguments(
deployment_id=deployment.id
)
entrypoint_str = " ".join(command)
arguments_str = " ".join(args)
task_envs = environment
docker_environment_str = " ".join(
f"-e {k}={v}" for k, v in environment.items()
)
custom_run_args = " ".join(settings.docker_run_args)
if custom_run_args:
custom_run_args += " "
instance_type = settings.instance_type or self.DEFAULT_INSTANCE_TYPE
# Set up credentials
self.setup_credentials()
# Guaranteed by stack validation
assert stack is not None and stack.container_registry is not None
if docker_creds := stack.container_registry.credentials:
docker_username, docker_password = docker_creds
setup = (
f"sudo docker login --username $DOCKER_USERNAME --password "
f"$DOCKER_PASSWORD {stack.container_registry.config.uri}"
)
task_envs["DOCKER_USERNAME"] = docker_username
task_envs["DOCKER_PASSWORD"] = docker_password
else:
setup = None
# Run the entire pipeline
# Set the service connector AWS profile ENV variable
self.prepare_environment_variable(set=True)
try:
if isinstance(self.cloud, sky.clouds.Kubernetes):
run_command = f"${{VIRTUAL_ENV:+$VIRTUAL_ENV/bin/}}{entrypoint_str} {arguments_str}"
setup = None
down = False
idle_minutes_to_autostop = None
else:
run_command = f"sudo docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}"
down = settings.down
idle_minutes_to_autostop = settings.idle_minutes_to_autostop
task = sky.Task(
run=run_command,
setup=setup,
envs=task_envs,
)
logger.debug(f"Running run: {run_command}")
task = task.set_resources(
sky.Resources(
cloud=self.cloud,
instance_type=instance_type,
cpus=settings.cpus,
memory=settings.memory,
accelerators=settings.accelerators,
accelerator_args=settings.accelerator_args,
use_spot=settings.use_spot,
job_recovery=settings.job_recovery,
region=settings.region,
zone=settings.zone,
image_id=image
if isinstance(self.cloud, sky.clouds.Kubernetes)
else settings.image_id,
disk_size=settings.disk_size,
disk_tier=settings.disk_tier,
)
)
# Set the cluster name
if settings.cluster_name:
sky.exec(
task,
settings.cluster_name,
down=down,
stream_logs=settings.stream_logs,
backend=None,
detach_run=True,
)
else:
# Find existing cluster
for i in sky.status(refresh=True):
if isinstance(
i["handle"].launched_resources.cloud, type(self.cloud)
):
cluster_name = i["handle"].cluster_name
logger.info(
f"Found existing cluster {cluster_name}. Reusing..."
)
cluster_name = self.sanitize_cluster_name(
f"{orchestrator_run_name}"
)
# Launch the cluster
sky.launch(
task,
cluster_name,
retry_until_up=settings.retry_until_up,
idle_minutes_to_autostop=idle_minutes_to_autostop,
down=down,
stream_logs=settings.stream_logs,
detach_setup=True,
)
except Exception as e:
logger.error(f"Pipeline run failed: {e}")
raise
finally:
# Unset the service connector AWS profile ENV variable
self.prepare_environment_variable(set=False)
sanitize_cluster_name(self, name)
Sanitize the value to be used in a cluster name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Arbitrary input cluster name. |
required |
Returns:
Type | Description |
---|---|
str |
Sanitized cluster name. |
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
def sanitize_cluster_name(self, name: str) -> str:
"""Sanitize the value to be used in a cluster name.
Args:
name: Arbitrary input cluster name.
Returns:
Sanitized cluster name.
"""
name = re.sub(
r"[^a-z0-9-]", "-", name.lower()
) # replaces any character that is not a lowercase letter, digit, or hyphen with a hyphen
name = re.sub(r"^[-]+", "", name) # trim leading hyphens
name = re.sub(r"[-]+$", "", name) # trim trailing hyphens
return name
setup_credentials(self)
Set up credentials for the orchestrator.
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
def setup_credentials(self) -> None:
"""Set up credentials for the orchestrator."""
connector = self.get_connector()
assert connector is not None
connector.configure_local_client()
skypilot_orchestrator_entrypoint
Entrypoint of the Skypilot master/orchestrator VM.
main()
Entrypoint of the Skypilot master/orchestrator VM.
This is the entrypoint of the Skypilot master/orchestrator VM. It is responsible for provisioning the VM and running the pipeline steps in separate VMs.
The VM is provisioned using the sky
library. The pipeline steps are run
using the sky
library as well.
Exceptions:
Type | Description |
---|---|
TypeError |
If the active stack's orchestrator is not an instance of SkypilotBaseOrchestrator. |
ValueError |
If the active stack's container registry is None. |
Source code in zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint.py
def main() -> None:
"""Entrypoint of the Skypilot master/orchestrator VM.
This is the entrypoint of the Skypilot master/orchestrator VM. It is
responsible for provisioning the VM and running the pipeline steps in
separate VMs.
The VM is provisioned using the `sky` library. The pipeline steps are run
using the `sky` library as well.
Raises:
TypeError: If the active stack's orchestrator is not an instance of
SkypilotBaseOrchestrator.
ValueError: If the active stack's container registry is None.
"""
# Log to the container's stdout so it can be streamed by the client.
logger.info("Skypilot orchestrator VM started.")
# Parse / extract args.
args = parse_args()
orchestrator_run_id = socket.gethostname()
deployment = Client().get_deployment(args.deployment_id)
pipeline_dag = {
step_name: step.spec.upstream_steps
for step_name, step in deployment.step_configurations.items()
}
step_command = StepEntrypointConfiguration.get_entrypoint_command()
entrypoint_str = " ".join(step_command)
active_stack = Client().active_stack
orchestrator = active_stack.orchestrator
if not isinstance(orchestrator, SkypilotBaseOrchestrator):
raise TypeError(
"The active stack's orchestrator is not an instance of SkypilotBaseOrchestrator."
)
# Set up credentials
orchestrator.setup_credentials()
# Set the service connector AWS profile ENV variable
orchestrator.prepare_environment_variable(set=True)
# get active container registry
container_registry = active_stack.container_registry
if container_registry is None:
raise ValueError("Container registry cannot be None.")
if docker_creds := container_registry.credentials:
docker_username, docker_password = docker_creds
setup = (
f"docker login --username $DOCKER_USERNAME --password "
f"$DOCKER_PASSWORD {container_registry.config.uri}"
)
task_envs = {
"DOCKER_USERNAME": docker_username,
"DOCKER_PASSWORD": docker_password,
}
else:
setup = None
task_envs = None
unique_resource_configs: Dict[str, str] = {}
for step_name, step in deployment.step_configurations.items():
settings = cast(
SkypilotBaseOrchestratorSettings,
orchestrator.get_settings(step),
)
# Handle both str and Dict[str, int] types for accelerators
if isinstance(settings.accelerators, dict):
accelerators_hashable = frozenset(settings.accelerators.items())
elif isinstance(settings.accelerators, str):
accelerators_hashable = frozenset({(settings.accelerators, 1)})
else:
accelerators_hashable = None
resource_config = (
settings.instance_type,
settings.cpus,
settings.memory,
settings.disk_size, # Assuming disk_size is part of the settings
settings.disk_tier, # Assuming disk_tier is part of the settings
settings.use_spot,
settings.job_recovery,
settings.region,
settings.zone,
accelerators_hashable,
)
cluster_name_parts = [
orchestrator.sanitize_cluster_name(str(part))
for part in resource_config
if part is not None
]
cluster_name = f"cluster-{orchestrator_run_id}" + "-".join(
cluster_name_parts
)
unique_resource_configs[step_name] = cluster_name
run = Client().list_pipeline_runs(
sort_by="asc:created",
size=1,
deployment_id=args.deployment_id,
status=ExecutionStatus.INITIALIZING,
)[0]
logger.info("Fetching pipeline run: %s", run.id)
def run_step_on_skypilot_vm(step_name: str) -> None:
"""Run a pipeline step in a separate Skypilot VM.
Args:
step_name: Name of the step.
"""
cluster_name = unique_resource_configs[step_name]
image = SkypilotBaseOrchestrator.get_image(
deployment=deployment, step_name=step_name
)
step_args = StepEntrypointConfiguration.get_entrypoint_arguments(
step_name=step_name, deployment_id=deployment.id
)
arguments_str = " ".join(step_args)
step = deployment.step_configurations[step_name]
settings = cast(
SkypilotBaseOrchestratorSettings,
orchestrator.get_settings(step),
)
env = get_config_environment_vars()
env[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID] = orchestrator_run_id
docker_environment_str = " ".join(
f"-e {k}={v}" for k, v in env.items()
)
custom_run_args = " ".join(settings.docker_run_args)
if custom_run_args:
custom_run_args += " "
# Set up the task
run_command = f"docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}"
task_name = f"{deployment.id}-{step_name}-{time.time()}"
task = sky.Task(
run=run_command,
setup=setup,
envs=task_envs,
name=task_name,
)
task = task.set_resources(
sky.Resources(
cloud=orchestrator.cloud,
instance_type=settings.instance_type
or orchestrator.DEFAULT_INSTANCE_TYPE,
cpus=settings.cpus,
memory=settings.memory,
disk_size=settings.disk_size,
disk_tier=settings.disk_tier,
accelerators=settings.accelerators,
accelerator_args=settings.accelerator_args,
use_spot=settings.use_spot,
job_recovery=settings.job_recovery,
region=settings.region,
zone=settings.zone,
image_id=settings.image_id,
)
)
sky.launch(
task,
cluster_name,
retry_until_up=settings.retry_until_up,
idle_minutes_to_autostop=settings.idle_minutes_to_autostop,
down=settings.down,
stream_logs=settings.stream_logs,
detach_setup=True,
detach_run=True,
)
# Wait for pod to finish.
logger.info(f"Waiting for pod of step `{step_name}` to start...")
current_run = Client().get_pipeline_run(run.id)
step_is_finished = False
while not step_is_finished:
time.sleep(10)
current_run = Client().get_pipeline_run(run.id)
try:
step_is_finished = current_run.steps[
step_name
].status.is_finished
except KeyError:
# Step is not yet in the run, so we wait for it to appear
continue
# Pop the resource configuration for this step
unique_resource_configs.pop(step_name)
if cluster_name in unique_resource_configs.values():
# If there are more steps using this configuration, skip deprovisioning the cluster
logger.info(
f"Resource configuration for cluster '{cluster_name}' "
"is used by subsequent steps. Skipping the deprovisioning of "
"the cluster."
)
else:
# If there are no more steps using this configuration, down the cluster
logger.info(
f"Resource configuration for cluster '{cluster_name}' "
"is not used by subsequent steps. deprovisioning the cluster."
)
sky.down(cluster_name)
logger.info(f"Running step `{step_name}` on a VM is completed.")
ThreadedDagRunner(dag=pipeline_dag, run_fn=run_step_on_skypilot_vm).run()
logger.info("Orchestration VM provisioned.")
parse_args()
Parse entrypoint arguments.
Returns:
Type | Description |
---|---|
Namespace |
Parsed args. |
Source code in zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint.py
def parse_args() -> argparse.Namespace:
"""Parse entrypoint arguments.
Returns:
Parsed args.
"""
parser = argparse.ArgumentParser()
parser.add_argument("--run_name", type=str, required=True)
parser.add_argument("--deployment_id", type=str, required=True)
return parser.parse_args()
skypilot_orchestrator_entrypoint_configuration
Entrypoint configuration for the Skypilot master/orchestrator VM.
SkypilotOrchestratorEntrypointConfiguration
Entrypoint configuration for the Skypilot master/orchestrator VM.
Source code in zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint_configuration.py
class SkypilotOrchestratorEntrypointConfiguration:
"""Entrypoint configuration for the Skypilot master/orchestrator VM."""
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
"""Gets all the options required for running this entrypoint.
Returns:
Entrypoint options.
"""
options = {
RUN_NAME_OPTION,
DEPLOYMENT_ID_OPTION,
}
return options
@classmethod
def get_entrypoint_command(cls) -> List[str]:
"""Returns a command that runs the entrypoint module.
Returns:
Entrypoint command.
"""
command = [
"python",
"-m",
"zenml.integrations.skypilot.orchestrators.skypilot_orchestrator_entrypoint",
]
return command
@classmethod
def get_entrypoint_arguments(
cls,
run_name: str,
deployment_id: "UUID",
) -> List[str]:
"""Gets all arguments that the entrypoint command should be called with.
Args:
run_name: Name of the ZenML run.
deployment_id: ID of the deployment.
Returns:
List of entrypoint arguments.
"""
args = [
f"--{RUN_NAME_OPTION}",
run_name,
f"--{DEPLOYMENT_ID_OPTION}",
str(deployment_id),
]
return args
get_entrypoint_arguments(run_name, deployment_id)
classmethod
Gets all arguments that the entrypoint command should be called with.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_name |
str |
Name of the ZenML run. |
required |
deployment_id |
UUID |
ID of the deployment. |
required |
Returns:
Type | Description |
---|---|
List[str] |
List of entrypoint arguments. |
Source code in zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint_configuration.py
@classmethod
def get_entrypoint_arguments(
cls,
run_name: str,
deployment_id: "UUID",
) -> List[str]:
"""Gets all arguments that the entrypoint command should be called with.
Args:
run_name: Name of the ZenML run.
deployment_id: ID of the deployment.
Returns:
List of entrypoint arguments.
"""
args = [
f"--{RUN_NAME_OPTION}",
run_name,
f"--{DEPLOYMENT_ID_OPTION}",
str(deployment_id),
]
return args
get_entrypoint_command()
classmethod
Returns a command that runs the entrypoint module.
Returns:
Type | Description |
---|---|
List[str] |
Entrypoint command. |
Source code in zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint_configuration.py
@classmethod
def get_entrypoint_command(cls) -> List[str]:
"""Returns a command that runs the entrypoint module.
Returns:
Entrypoint command.
"""
command = [
"python",
"-m",
"zenml.integrations.skypilot.orchestrators.skypilot_orchestrator_entrypoint",
]
return command
get_entrypoint_options()
classmethod
Gets all the options required for running this entrypoint.
Returns:
Type | Description |
---|---|
Set[str] |
Entrypoint options. |
Source code in zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint_configuration.py
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
"""Gets all the options required for running this entrypoint.
Returns:
Entrypoint options.
"""
options = {
RUN_NAME_OPTION,
DEPLOYMENT_ID_OPTION,
}
return options