Skypilot
zenml.integrations.skypilot
special
Initialization of the Skypilot integration for ZenML.
The Skypilot integration sub-module powers an alternative to the local orchestrator for a remote orchestration of ZenML pipelines on VMs.
SkypilotAWSIntegration (Integration)
Definition of Skypilot AWS Integration for ZenML.
Source code in zenml/integrations/skypilot/__init__.py
class SkypilotAWSIntegration(Integration):
"""Definition of Skypilot AWS Integration for ZenML."""
NAME = SKYPILOT_AWS
REQUIREMENTS = ["skypilot[aws]"]
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Skypilot AWS integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.skypilot.flavors import (
SkypilotAWSOrchestratorFlavor,
)
return [SkypilotAWSOrchestratorFlavor]
flavors()
classmethod
Declare the stack component flavors for the Skypilot AWS integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/skypilot/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Skypilot AWS integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.skypilot.flavors import (
SkypilotAWSOrchestratorFlavor,
)
return [SkypilotAWSOrchestratorFlavor]
SkypilotAzureIntegration (Integration)
Definition of Skypilot Integration for ZenML.
Source code in zenml/integrations/skypilot/__init__.py
class SkypilotAzureIntegration(Integration):
"""Definition of Skypilot Integration for ZenML."""
NAME = SKYPILOT_AZURE
REQUIREMENTS = ["skypilot[azure]"]
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Skypilot Azure integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.skypilot.flavors import (
SkypilotAzureOrchestratorFlavor,
)
return [SkypilotAzureOrchestratorFlavor]
flavors()
classmethod
Declare the stack component flavors for the Skypilot Azure integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/skypilot/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Skypilot Azure integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.skypilot.flavors import (
SkypilotAzureOrchestratorFlavor,
)
return [SkypilotAzureOrchestratorFlavor]
SkypilotGCPIntegration (Integration)
Definition of Skypilot Integration for ZenML.
Source code in zenml/integrations/skypilot/__init__.py
class SkypilotGCPIntegration(Integration):
"""Definition of Skypilot Integration for ZenML."""
NAME = SKYPILOT_GCP
REQUIREMENTS = ["skypilot[gcp]"]
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Skypilot GCP integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.skypilot.flavors import (
SkypilotGCPOrchestratorFlavor,
)
return [SkypilotGCPOrchestratorFlavor]
flavors()
classmethod
Declare the stack component flavors for the Skypilot GCP integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/skypilot/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Skypilot GCP integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.skypilot.flavors import (
SkypilotGCPOrchestratorFlavor,
)
return [SkypilotGCPOrchestratorFlavor]
flavors
special
Skypilot integration flavors.
skypilot_orchestrator_aws_vm_flavor
Skypilot orchestrator AWS flavor.
SkypilotAWSOrchestratorConfig (SkypilotBaseOrchestratorConfig, SkypilotAWSOrchestratorSettings)
pydantic-model
Skypilot orchestrator config.
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_aws_vm_flavor.py
class SkypilotAWSOrchestratorConfig( # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
SkypilotBaseOrchestratorConfig, SkypilotAWSOrchestratorSettings
):
"""Skypilot orchestrator config."""
SkypilotAWSOrchestratorFlavor (BaseOrchestratorFlavor)
Flavor for the Skypilot AWS orchestrator.
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_aws_vm_flavor.py
class SkypilotAWSOrchestratorFlavor(BaseOrchestratorFlavor):
"""Flavor for the Skypilot AWS orchestrator."""
@property
def name(self) -> str:
"""Name of the orchestrator flavor.
Returns:
Name of the orchestrator flavor.
"""
return SKYPILOT_AWS_ORCHESTRATOR_FLAVOR
@property
def service_connector_requirements(
self,
) -> Optional[ServiceConnectorRequirements]:
"""Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available
service connector types that are compatible with this flavor.
Returns:
Requirements for compatible service connectors, if a service
connector is required for this flavor.
"""
return ServiceConnectorRequirements(
resource_type="aws-generic",
)
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/aws-skypilot.png"
@property
def config_class(self) -> Type[BaseOrchestratorConfig]:
"""Config class for the base orchestrator flavor.
Returns:
The config class.
"""
return SkypilotAWSOrchestratorConfig
@property
def implementation_class(self) -> Type["SkypilotAWSOrchestrator"]:
"""Implementation class for this flavor.
Returns:
Implementation class for this flavor.
"""
from zenml.integrations.skypilot.orchestrators import (
SkypilotAWSOrchestrator,
)
return SkypilotAWSOrchestrator
config_class: Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig]
property
readonly
Config class for the base orchestrator flavor.
Returns:
Type | Description |
---|---|
Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[SkypilotAWSOrchestrator]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[SkypilotAWSOrchestrator] |
Implementation class for this flavor. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the orchestrator flavor.
Returns:
Type | Description |
---|---|
str |
Name of the orchestrator flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
service_connector_requirements: Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements]
property
readonly
Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.
Returns:
Type | Description |
---|---|
Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements] |
Requirements for compatible service connectors, if a service connector is required for this flavor. |
SkypilotAWSOrchestratorSettings (SkypilotBaseOrchestratorSettings)
pydantic-model
Skypilot orchestrator settings.
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_aws_vm_flavor.py
class SkypilotAWSOrchestratorSettings(SkypilotBaseOrchestratorSettings):
"""Skypilot orchestrator settings."""
skypilot_orchestrator_azure_vm_flavor
Skypilot orchestrator Azure flavor.
SkypilotAzureOrchestratorConfig (SkypilotBaseOrchestratorConfig, SkypilotAzureOrchestratorSettings)
pydantic-model
Skypilot orchestrator config for Azure.
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_azure_vm_flavor.py
class SkypilotAzureOrchestratorConfig( # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
SkypilotBaseOrchestratorConfig, SkypilotAzureOrchestratorSettings
):
"""Skypilot orchestrator config for Azure."""
SkypilotAzureOrchestratorFlavor (BaseOrchestratorFlavor)
Flavor for the Skypilot orchestrator for Azure.
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_azure_vm_flavor.py
class SkypilotAzureOrchestratorFlavor(BaseOrchestratorFlavor):
"""Flavor for the Skypilot orchestrator for Azure."""
@property
def name(self) -> str:
"""Name of the orchestrator flavor.
Returns:
Name of the orchestrator flavor.
"""
return SKYPILOT_AZURE_ORCHESTRATOR_FLAVOR
@property
def service_connector_requirements(
self,
) -> Optional[ServiceConnectorRequirements]:
"""Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available
service connector types that are compatible with this flavor.
Returns:
Requirements for compatible service connectors, if a service
connector is required for this flavor.
"""
return ServiceConnectorRequirements(
resource_type="azure-generic",
)
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/azure-skypilot.png"
@property
def config_class(self) -> Type[BaseOrchestratorConfig]:
"""Config class for the base orchestrator flavor.
Returns:
The config class.
"""
return SkypilotAzureOrchestratorConfig
@property
def implementation_class(self) -> Type["SkypilotAzureOrchestrator"]:
"""Implementation class for this flavor.
Returns:
Implementation class for this flavor.
"""
from zenml.integrations.skypilot.orchestrators import (
SkypilotAzureOrchestrator,
)
return SkypilotAzureOrchestrator
config_class: Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig]
property
readonly
Config class for the base orchestrator flavor.
Returns:
Type | Description |
---|---|
Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[SkypilotAzureOrchestrator]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[SkypilotAzureOrchestrator] |
Implementation class for this flavor. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the orchestrator flavor.
Returns:
Type | Description |
---|---|
str |
Name of the orchestrator flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
service_connector_requirements: Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements]
property
readonly
Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.
Returns:
Type | Description |
---|---|
Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements] |
Requirements for compatible service connectors, if a service connector is required for this flavor. |
SkypilotAzureOrchestratorSettings (SkypilotBaseOrchestratorSettings)
pydantic-model
Skypilot orchestrator settings for Azure.
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_azure_vm_flavor.py
class SkypilotAzureOrchestratorSettings(SkypilotBaseOrchestratorSettings):
"""Skypilot orchestrator settings for Azure."""
skypilot_orchestrator_base_vm_config
Skypilot orchestrator base config and settings.
SkypilotBaseOrchestratorConfig (BaseOrchestratorConfig, SkypilotBaseOrchestratorSettings)
pydantic-model
Skypilot orchestrator base config.
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py
class SkypilotBaseOrchestratorConfig( # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
BaseOrchestratorConfig, SkypilotBaseOrchestratorSettings
):
"""Skypilot orchestrator base config."""
@property
def is_local(self) -> bool:
"""Checks if this stack component is running locally.
Returns:
True if this config is for a local component, False otherwise.
"""
return False
is_local: bool
property
readonly
Checks if this stack component is running locally.
Returns:
Type | Description |
---|---|
bool |
True if this config is for a local component, False otherwise. |
SkypilotBaseOrchestratorSettings (BaseSettings)
pydantic-model
Skypilot orchestrator base settings.
Attributes:
Name | Type | Description |
---|---|---|
instance_type |
Optional[str] |
the instance type to use. |
cpus |
Union[NoneType, int, float, str] |
the number of CPUs required for the task.
If a str, must be a string of the form |
memory |
Union[NoneType, int, float, str] |
the amount of memory in GiB required. If a
str, must be a string of the form |
accelerators |
Union[NoneType, str, Dict[str, int]] |
the accelerators required. If a str, must be
a string of the form |
accelerator_args |
Optional[Dict[str, str]] |
accelerator-specific arguments. For example,
|
use_spot |
Optional[bool] |
whether to use spot instances. If None, defaults to False. |
spot_recovery |
Optional[str] |
the spot recovery strategy to use for the managed
spot to recover the cluster from preemption. Refer to
|
region |
Optional[str] |
the region to use. |
zone |
Optional[str] |
the zone to use. |
image_id |
Union[Dict[str, str], str] |
the image ID to use. If a str, must be a string
of the image id from the cloud, such as AWS:
.. code-block:: python
|
disk_size |
Optional[int] |
the size of the OS disk in GiB. |
disk_tier |
Optional[Literal['high', 'medium', 'low']] |
the disk performance tier to use. If None, defaults to
|
cluster_name |
Optional[str] |
name of the cluster to create/reuse. If None, auto-generate a name. |
retry_until_up |
bool |
whether to retry launching the cluster until it is up. |
idle_minutes_to_autostop |
Optional[int] |
automatically stop the cluster after this
many minute of idleness, i.e., no running or pending jobs in the
cluster's job queue. Idleness gets reset whenever setting-up/
running/pending jobs are found in the job queue. Setting this
flag is equivalent to running
|
down |
bool |
Tear down the cluster after all jobs finish (successfully or abnormally). If --idle-minutes-to-autostop is also set, the cluster will be torn down after the specified idle time. Note that if errors occur during provisioning/data syncing/setting up, the cluster will not be torn down for debugging purposes. |
stream_logs |
bool |
if True, show the logs in the terminal. |
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py
class SkypilotBaseOrchestratorSettings(BaseSettings):
"""Skypilot orchestrator base settings.
Attributes:
instance_type: the instance type to use.
cpus: the number of CPUs required for the task.
If a str, must be a string of the form `'2'` or `'2+'`, where
the `+` indicates that the task requires at least 2 CPUs.
memory: the amount of memory in GiB required. If a
str, must be a string of the form `'16'` or `'16+'`, where
the `+` indicates that the task requires at least 16 GB of memory.
accelerators: the accelerators required. If a str, must be
a string of the form `'V100'` or `'V100:2'`, where the `:2`
indicates that the task requires 2 V100 GPUs. If a dict, must be a
dict of the form `{'V100': 2}` or `{'tpu-v2-8': 1}`.
accelerator_args: accelerator-specific arguments. For example,
`{'tpu_vm': True, 'runtime_version': 'tpu-vm-base'}` for TPUs.
use_spot: whether to use spot instances. If None, defaults to
False.
spot_recovery: the spot recovery strategy to use for the managed
spot to recover the cluster from preemption. Refer to
`recovery_strategy module <https://github.com/skypilot-org/skypilot/blob/master/sky/spot/recovery_strategy.py>`__ # pylint: disable=line-too-long
for more details.
region: the region to use.
zone: the zone to use.
image_id: the image ID to use. If a str, must be a string
of the image id from the cloud, such as AWS:
``'ami-1234567890abcdef0'``, GCP:
``'projects/my-project-id/global/images/my-image-name'``;
Or, a image tag provided by SkyPilot, such as AWS:
``'skypilot:gpu-ubuntu-2004'``. If a dict, must be a dict mapping
from region to image ID, such as:
.. code-block:: python
{
'us-west1': 'ami-1234567890abcdef0',
'us-east1': 'ami-1234567890abcdef0'
}
disk_size: the size of the OS disk in GiB.
disk_tier: the disk performance tier to use. If None, defaults to
``'medium'``.
cluster_name: name of the cluster to create/reuse. If None,
auto-generate a name.
retry_until_up: whether to retry launching the cluster until it is
up.
idle_minutes_to_autostop: automatically stop the cluster after this
many minute of idleness, i.e., no running or pending jobs in the
cluster's job queue. Idleness gets reset whenever setting-up/
running/pending jobs are found in the job queue. Setting this
flag is equivalent to running
``sky.launch(..., detach_run=True, ...)`` and then
``sky.autostop(idle_minutes=<minutes>)``. If not set, the cluster
will not be autostopped.
down: Tear down the cluster after all jobs finish (successfully or
abnormally). If --idle-minutes-to-autostop is also set, the
cluster will be torn down after the specified idle time.
Note that if errors occur during provisioning/data syncing/setting
up, the cluster will not be torn down for debugging purposes.
stream_logs: if True, show the logs in the terminal.
"""
# Resources
instance_type: Optional[str] = None
cpus: Union[None, int, float, str] = None
memory: Union[None, int, float, str] = None
accelerators: Union[None, str, Dict[str, int]] = None
accelerator_args: Optional[Dict[str, str]] = None
use_spot: Optional[bool] = None
spot_recovery: Optional[str] = None
region: Optional[str] = None
zone: Optional[str] = None
image_id: Union[Dict[str, str], str, None] = None
disk_size: Optional[int] = None
disk_tier: Optional[Literal["high", "medium", "low"]] = None
# Run settings
cluster_name: Optional[str] = None
retry_until_up: bool = False
idle_minutes_to_autostop: Optional[int] = 30
down: bool = True
stream_logs: bool = True
skypilot_orchestrator_gcp_vm_flavor
Skypilot orchestrator GCP flavor.
SkypilotGCPOrchestratorConfig (SkypilotBaseOrchestratorConfig, GoogleCredentialsConfigMixin, SkypilotGCPOrchestratorSettings)
pydantic-model
Skypilot orchestrator config for GCP.
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_gcp_vm_flavor.py
class SkypilotGCPOrchestratorConfig( # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
SkypilotBaseOrchestratorConfig,
GoogleCredentialsConfigMixin,
SkypilotGCPOrchestratorSettings,
):
"""Skypilot orchestrator config for GCP."""
SkypilotGCPOrchestratorFlavor (BaseOrchestratorFlavor)
Flavor for the Skypilot orchestrator for GCP.
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_gcp_vm_flavor.py
class SkypilotGCPOrchestratorFlavor(BaseOrchestratorFlavor):
"""Flavor for the Skypilot orchestrator for GCP."""
@property
def name(self) -> str:
"""Name of the orchestrator flavor.
Returns:
Name of the orchestrator flavor.
"""
return SKYPILOT_GCP_ORCHESTRATOR_FLAVOR
@property
def service_connector_requirements(
self,
) -> Optional[ServiceConnectorRequirements]:
"""Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available
service connector types that are compatible with this flavor.
Returns:
Requirements for compatible service connectors, if a service
connector is required for this flavor.
"""
return ServiceConnectorRequirements(
resource_type="gcp-generic",
)
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/gcp-skypilot.png"
@property
def config_class(self) -> Type[BaseOrchestratorConfig]:
"""Config class for the base orchestrator flavor.
Returns:
The config class.
"""
return SkypilotGCPOrchestratorConfig
@property
def implementation_class(self) -> Type["SkypilotGCPOrchestrator"]:
"""Implementation class for this flavor.
Returns:
Implementation class for this flavor.
"""
from zenml.integrations.skypilot.orchestrators import (
SkypilotGCPOrchestrator,
)
return SkypilotGCPOrchestrator
config_class: Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig]
property
readonly
Config class for the base orchestrator flavor.
Returns:
Type | Description |
---|---|
Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[SkypilotGCPOrchestrator]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[SkypilotGCPOrchestrator] |
Implementation class for this flavor. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the orchestrator flavor.
Returns:
Type | Description |
---|---|
str |
Name of the orchestrator flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
service_connector_requirements: Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements]
property
readonly
Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.
Returns:
Type | Description |
---|---|
Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements] |
Requirements for compatible service connectors, if a service connector is required for this flavor. |
SkypilotGCPOrchestratorSettings (SkypilotBaseOrchestratorSettings)
pydantic-model
Skypilot orchestrator settings for GCP.
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_gcp_vm_flavor.py
class SkypilotGCPOrchestratorSettings(SkypilotBaseOrchestratorSettings):
"""Skypilot orchestrator settings for GCP."""
orchestrators
special
Initialization of the Skypilot ZenML orchestrators.
skypilot_aws_vm_orchestrator
Implementation of the a Skypilot based AWS VM orchestrator.
SkypilotAWSOrchestrator (SkypilotBaseOrchestrator)
Orchestrator responsible for running pipelines remotely in a VM on AWS.
This orchestrator does not support running on a schedule.
Source code in zenml/integrations/skypilot/orchestrators/skypilot_aws_vm_orchestrator.py
class SkypilotAWSOrchestrator(SkypilotBaseOrchestrator):
"""Orchestrator responsible for running pipelines remotely in a VM on AWS.
This orchestrator does not support running on a schedule.
"""
DEFAULT_INSTANCE_TYPE: str = "t3.xlarge"
@property
def cloud(self) -> sky.clouds.Cloud:
"""The type of sky cloud to use.
Returns:
A `sky.clouds.Cloud` instance.
"""
return sky.clouds.AWS()
@property
def config(self) -> SkypilotAWSOrchestratorConfig:
"""Returns the `SkypilotAWSOrchestratorConfig` config.
Returns:
The configuration.
"""
return cast(SkypilotAWSOrchestratorConfig, self._config)
@property
def settings_class(self) -> Optional[Type["BaseSettings"]]:
"""Settings class for the Skypilot orchestrator.
Returns:
The settings class.
"""
return SkypilotAWSOrchestratorSettings
def prepare_environment_variable(self, set: bool = True) -> None:
"""Set up Environment variables that are required for the orchestrator.
Args:
set: Whether to set the environment variables or not.
Raises:
ValueError: If no service connector is found.
"""
connector = self.get_connector()
if connector is None:
raise ValueError(
"No service connector found. Please make sure to set up a connector "
"that is compatible with this orchestrator."
)
if set:
# The AWS connector creates a local configuration profile with the name computed from
# the first 8 digits of its UUID.
aws_profile = f"zenml-{str(connector.id)[:8]}"
os.environ[ENV_AWS_PROFILE] = aws_profile
else:
os.environ.pop(ENV_AWS_PROFILE, None)
cloud: Cloud
property
readonly
The type of sky cloud to use.
Returns:
Type | Description |
---|---|
Cloud |
A |
config: SkypilotAWSOrchestratorConfig
property
readonly
Returns the SkypilotAWSOrchestratorConfig
config.
Returns:
Type | Description |
---|---|
SkypilotAWSOrchestratorConfig |
The configuration. |
settings_class: Optional[Type[BaseSettings]]
property
readonly
Settings class for the Skypilot orchestrator.
Returns:
Type | Description |
---|---|
Optional[Type[BaseSettings]] |
The settings class. |
prepare_environment_variable(self, set=True)
Set up Environment variables that are required for the orchestrator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
set |
bool |
Whether to set the environment variables or not. |
True |
Exceptions:
Type | Description |
---|---|
ValueError |
If no service connector is found. |
Source code in zenml/integrations/skypilot/orchestrators/skypilot_aws_vm_orchestrator.py
def prepare_environment_variable(self, set: bool = True) -> None:
"""Set up Environment variables that are required for the orchestrator.
Args:
set: Whether to set the environment variables or not.
Raises:
ValueError: If no service connector is found.
"""
connector = self.get_connector()
if connector is None:
raise ValueError(
"No service connector found. Please make sure to set up a connector "
"that is compatible with this orchestrator."
)
if set:
# The AWS connector creates a local configuration profile with the name computed from
# the first 8 digits of its UUID.
aws_profile = f"zenml-{str(connector.id)[:8]}"
os.environ[ENV_AWS_PROFILE] = aws_profile
else:
os.environ.pop(ENV_AWS_PROFILE, None)
skypilot_azure_vm_orchestrator
Implementation of the a Skypilot based Azure VM orchestrator.
SkypilotAzureOrchestrator (SkypilotBaseOrchestrator)
Orchestrator responsible for running pipelines remotely in a VM on Azure.
This orchestrator does not support running on a schedule.
Source code in zenml/integrations/skypilot/orchestrators/skypilot_azure_vm_orchestrator.py
class SkypilotAzureOrchestrator(SkypilotBaseOrchestrator):
"""Orchestrator responsible for running pipelines remotely in a VM on Azure.
This orchestrator does not support running on a schedule.
"""
DEFAULT_INSTANCE_TYPE: str = "Standard_B1ms"
@property
def cloud(self) -> sky.clouds.Cloud:
"""The type of sky cloud to use.
Returns:
A `sky.clouds.Cloud` instance.
"""
return sky.clouds.Azure()
@property
def config(self) -> SkypilotAzureOrchestratorConfig:
"""Returns the `SkypilotAzureOrchestratorConfig` config.
Returns:
The configuration.
"""
return cast(SkypilotAzureOrchestratorConfig, self._config)
@property
def settings_class(self) -> Optional[Type["BaseSettings"]]:
"""Settings class for the Skypilot orchestrator.
Returns:
The settings class.
"""
return SkypilotAzureOrchestratorSettings
def prepare_environment_variable(self, set: bool = True) -> None:
"""Set up Environment variables that are required for the orchestrator.
Args:
set: Whether to set the environment variables or not.
"""
pass
cloud: Cloud
property
readonly
The type of sky cloud to use.
Returns:
Type | Description |
---|---|
Cloud |
A |
config: SkypilotAzureOrchestratorConfig
property
readonly
Returns the SkypilotAzureOrchestratorConfig
config.
Returns:
Type | Description |
---|---|
SkypilotAzureOrchestratorConfig |
The configuration. |
settings_class: Optional[Type[BaseSettings]]
property
readonly
Settings class for the Skypilot orchestrator.
Returns:
Type | Description |
---|---|
Optional[Type[BaseSettings]] |
The settings class. |
prepare_environment_variable(self, set=True)
Set up Environment variables that are required for the orchestrator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
set |
bool |
Whether to set the environment variables or not. |
True |
Source code in zenml/integrations/skypilot/orchestrators/skypilot_azure_vm_orchestrator.py
def prepare_environment_variable(self, set: bool = True) -> None:
"""Set up Environment variables that are required for the orchestrator.
Args:
set: Whether to set the environment variables or not.
"""
pass
skypilot_base_vm_orchestrator
Implementation of the Skypilot base VM orchestrator.
SkypilotBaseOrchestrator (ContainerizedOrchestrator)
Base class for Orchestrator responsible for running pipelines remotely in a VM.
This orchestrator does not support running on a schedule.
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
class SkypilotBaseOrchestrator(ContainerizedOrchestrator):
"""Base class for Orchestrator responsible for running pipelines remotely in a VM.
This orchestrator does not support running on a schedule.
"""
# The default instance type to use if none is specified in settings
DEFAULT_INSTANCE_TYPE: Optional[str] = None
@property
def validator(self) -> Optional[StackValidator]:
"""Validates the stack.
In the remote case, checks that the stack contains a container registry,
image builder and only remote components.
Returns:
A `StackValidator` instance.
"""
def _validate_remote_components(
stack: "Stack",
) -> Tuple[bool, str]:
for component in stack.components.values():
if not component.config.is_local:
continue
return False, (
f"The Skypilot orchestrator runs pipelines remotely, "
f"but the '{component.name}' {component.type.value} is "
"a local stack component and will not be available in "
"the Skypilot step.\nPlease ensure that you always "
"use non-local stack components with the Skypilot "
"orchestrator."
)
return True, ""
return StackValidator(
required_components={
StackComponentType.CONTAINER_REGISTRY,
StackComponentType.IMAGE_BUILDER,
},
custom_validation_function=_validate_remote_components,
)
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If the environment variable specifying the run id
is not set.
Returns:
The orchestrator run id.
"""
try:
return os.environ[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID]
except KeyError:
raise RuntimeError(
"Unable to read run id from environment variable "
f"{ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID}."
)
@property
@abstractmethod
def cloud(self) -> sky.clouds.Cloud:
"""The type of sky cloud to use.
Returns:
A `sky.clouds.Cloud` instance.
"""
def setup_credentials(self) -> None:
"""Set up credentials for the orchestrator."""
connector = self.get_connector()
assert connector is not None
connector.configure_local_client()
@abstractmethod
def prepare_environment_variable(self, set: bool = True) -> None:
"""Set up Environment variables that are required for the orchestrator.
Args:
set: Whether to set the environment variables or not.
"""
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeploymentResponse",
stack: "Stack",
environment: Dict[str, str],
) -> Any:
"""Runs all pipeline steps in Skypilot containers.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
environment: Environment variables to set in the orchestration
environment.
Raises:
Exception: If the pipeline run fails.
"""
if deployment.schedule:
logger.warning(
"Skypilot Orchestrator currently does not support the"
"use of schedules. The `schedule` will be ignored "
"and the pipeline will be run immediately."
)
# Set up some variables for configuration
orchestrator_run_id = str(uuid4())
environment[
ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID
] = orchestrator_run_id
settings = cast(
SkypilotBaseOrchestratorSettings,
self.get_settings(deployment),
)
entrypoint = PipelineEntrypointConfiguration.get_entrypoint_command()
entrypoint_str = " ".join(entrypoint)
arguments = PipelineEntrypointConfiguration.get_entrypoint_arguments(
deployment_id=deployment.id
)
arguments_str = " ".join(arguments)
# Set up docker run command
image = self.get_image(deployment=deployment)
docker_environment_str = " ".join(
f"-e {k}={v}" for k, v in environment.items()
)
start_time = time.time()
instance_type = settings.instance_type or self.DEFAULT_INSTANCE_TYPE
# Set up credentials
self.setup_credentials()
# Guaranteed by stack validation
assert stack is not None and stack.container_registry is not None
docker_creds = stack.container_registry.credentials
if docker_creds:
docker_username, docker_password = docker_creds
setup = (
f"docker login --username $DOCKER_USERNAME --password "
f"$DOCKER_PASSWORD {stack.container_registry.config.uri}"
)
task_envs = {
"DOCKER_USERNAME": docker_username,
"DOCKER_PASSWORD": docker_password,
}
else:
setup = None
task_envs = None
# Run the entire pipeline
# Set the service connector AWS profile ENV variable
self.prepare_environment_variable(set=True)
try:
task = sky.Task(
run=f"docker run --rm {docker_environment_str} {image} {entrypoint_str} {arguments_str}",
setup=setup,
envs=task_envs,
)
task = task.set_resources(
sky.Resources(
cloud=self.cloud,
instance_type=instance_type,
cpus=settings.cpus,
memory=settings.memory,
accelerators=settings.accelerators,
accelerator_args=settings.accelerator_args,
use_spot=settings.use_spot,
spot_recovery=settings.spot_recovery,
region=settings.region,
zone=settings.zone,
image_id=settings.image_id,
disk_size=settings.disk_size,
disk_tier=settings.disk_tier,
)
)
cluster_name = settings.cluster_name
if cluster_name is None:
# Find existing cluster
for i in sky.status(refresh=True):
if isinstance(
i["handle"].launched_resources.cloud, type(self.cloud)
):
cluster_name = i["handle"].cluster_name
logger.info(
f"Found existing cluster {cluster_name}. Reusing..."
)
# Launch the cluster
sky.launch(
task,
cluster_name,
retry_until_up=settings.retry_until_up,
idle_minutes_to_autostop=settings.idle_minutes_to_autostop,
down=settings.down,
stream_logs=settings.stream_logs,
)
except Exception as e:
raise e
finally:
# Unset the service connector AWS profile ENV variable
self.prepare_environment_variable(set=False)
run_duration = time.time() - start_time
run_id = orchestrator_utils.get_run_id_for_orchestrator_run_id(
orchestrator=self, orchestrator_run_id=orchestrator_run_id
)
run_model = Client().zen_store.get_run(run_id)
logger.info(
"Pipeline run `%s` has finished in `%s`.\n",
run_model.name,
string_utils.get_human_readable_time(run_duration),
)
cloud: Cloud
property
readonly
The type of sky cloud to use.
Returns:
Type | Description |
---|---|
Cloud |
A |
validator: Optional[zenml.stack.stack_validator.StackValidator]
property
readonly
Validates the stack.
In the remote case, checks that the stack contains a container registry, image builder and only remote components.
Returns:
Type | Description |
---|---|
Optional[zenml.stack.stack_validator.StackValidator] |
A |
get_orchestrator_run_id(self)
Returns the active orchestrator run id.
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the environment variable specifying the run id is not set. |
Returns:
Type | Description |
---|---|
str |
The orchestrator run id. |
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If the environment variable specifying the run id
is not set.
Returns:
The orchestrator run id.
"""
try:
return os.environ[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID]
except KeyError:
raise RuntimeError(
"Unable to read run id from environment variable "
f"{ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID}."
)
prepare_environment_variable(self, set=True)
Set up Environment variables that are required for the orchestrator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
set |
bool |
Whether to set the environment variables or not. |
True |
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
@abstractmethod
def prepare_environment_variable(self, set: bool = True) -> None:
"""Set up Environment variables that are required for the orchestrator.
Args:
set: Whether to set the environment variables or not.
"""
prepare_or_run_pipeline(self, deployment, stack, environment)
Runs all pipeline steps in Skypilot containers.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentResponse |
The pipeline deployment to prepare or run. |
required |
stack |
Stack |
The stack the pipeline will run on. |
required |
environment |
Dict[str, str] |
Environment variables to set in the orchestration environment. |
required |
Exceptions:
Type | Description |
---|---|
Exception |
If the pipeline run fails. |
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeploymentResponse",
stack: "Stack",
environment: Dict[str, str],
) -> Any:
"""Runs all pipeline steps in Skypilot containers.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
environment: Environment variables to set in the orchestration
environment.
Raises:
Exception: If the pipeline run fails.
"""
if deployment.schedule:
logger.warning(
"Skypilot Orchestrator currently does not support the"
"use of schedules. The `schedule` will be ignored "
"and the pipeline will be run immediately."
)
# Set up some variables for configuration
orchestrator_run_id = str(uuid4())
environment[
ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID
] = orchestrator_run_id
settings = cast(
SkypilotBaseOrchestratorSettings,
self.get_settings(deployment),
)
entrypoint = PipelineEntrypointConfiguration.get_entrypoint_command()
entrypoint_str = " ".join(entrypoint)
arguments = PipelineEntrypointConfiguration.get_entrypoint_arguments(
deployment_id=deployment.id
)
arguments_str = " ".join(arguments)
# Set up docker run command
image = self.get_image(deployment=deployment)
docker_environment_str = " ".join(
f"-e {k}={v}" for k, v in environment.items()
)
start_time = time.time()
instance_type = settings.instance_type or self.DEFAULT_INSTANCE_TYPE
# Set up credentials
self.setup_credentials()
# Guaranteed by stack validation
assert stack is not None and stack.container_registry is not None
docker_creds = stack.container_registry.credentials
if docker_creds:
docker_username, docker_password = docker_creds
setup = (
f"docker login --username $DOCKER_USERNAME --password "
f"$DOCKER_PASSWORD {stack.container_registry.config.uri}"
)
task_envs = {
"DOCKER_USERNAME": docker_username,
"DOCKER_PASSWORD": docker_password,
}
else:
setup = None
task_envs = None
# Run the entire pipeline
# Set the service connector AWS profile ENV variable
self.prepare_environment_variable(set=True)
try:
task = sky.Task(
run=f"docker run --rm {docker_environment_str} {image} {entrypoint_str} {arguments_str}",
setup=setup,
envs=task_envs,
)
task = task.set_resources(
sky.Resources(
cloud=self.cloud,
instance_type=instance_type,
cpus=settings.cpus,
memory=settings.memory,
accelerators=settings.accelerators,
accelerator_args=settings.accelerator_args,
use_spot=settings.use_spot,
spot_recovery=settings.spot_recovery,
region=settings.region,
zone=settings.zone,
image_id=settings.image_id,
disk_size=settings.disk_size,
disk_tier=settings.disk_tier,
)
)
cluster_name = settings.cluster_name
if cluster_name is None:
# Find existing cluster
for i in sky.status(refresh=True):
if isinstance(
i["handle"].launched_resources.cloud, type(self.cloud)
):
cluster_name = i["handle"].cluster_name
logger.info(
f"Found existing cluster {cluster_name}. Reusing..."
)
# Launch the cluster
sky.launch(
task,
cluster_name,
retry_until_up=settings.retry_until_up,
idle_minutes_to_autostop=settings.idle_minutes_to_autostop,
down=settings.down,
stream_logs=settings.stream_logs,
)
except Exception as e:
raise e
finally:
# Unset the service connector AWS profile ENV variable
self.prepare_environment_variable(set=False)
run_duration = time.time() - start_time
run_id = orchestrator_utils.get_run_id_for_orchestrator_run_id(
orchestrator=self, orchestrator_run_id=orchestrator_run_id
)
run_model = Client().zen_store.get_run(run_id)
logger.info(
"Pipeline run `%s` has finished in `%s`.\n",
run_model.name,
string_utils.get_human_readable_time(run_duration),
)
setup_credentials(self)
Set up credentials for the orchestrator.
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
def setup_credentials(self) -> None:
"""Set up credentials for the orchestrator."""
connector = self.get_connector()
assert connector is not None
connector.configure_local_client()
skypilot_gcp_vm_orchestrator
Implementation of the a Skypilot-based GCP VM orchestrator.
SkypilotGCPOrchestrator (SkypilotBaseOrchestrator, GoogleCredentialsMixin)
Orchestrator responsible for running pipelines remotely in a VM on GCP.
This orchestrator does not support running on a schedule.
Source code in zenml/integrations/skypilot/orchestrators/skypilot_gcp_vm_orchestrator.py
class SkypilotGCPOrchestrator(
SkypilotBaseOrchestrator, GoogleCredentialsMixin
):
"""Orchestrator responsible for running pipelines remotely in a VM on GCP.
This orchestrator does not support running on a schedule.
"""
DEFAULT_INSTANCE_TYPE: str = "n1-standard-4"
@property
def cloud(self) -> sky.clouds.Cloud:
"""The type of sky cloud to use.
Returns:
A `sky.clouds.Cloud` instance.
"""
return sky.clouds.GCP()
@property
def config(self) -> SkypilotGCPOrchestratorConfig:
"""Returns the `SkypilotGCPOrchestratorConfig` config.
Returns:
The configuration.
"""
return cast(SkypilotGCPOrchestratorConfig, self._config)
@property
def settings_class(self) -> Optional[Type["BaseSettings"]]:
"""Settings class for the Skypilot orchestrator.
Returns:
The settings class.
"""
return SkypilotGCPOrchestratorSettings
def prepare_environment_variable(self, set: bool = True) -> None:
"""Set up Environment variables that are required for the orchestrator.
Args:
set: Whether to set the environment variables or not.
"""
pass
cloud: Cloud
property
readonly
The type of sky cloud to use.
Returns:
Type | Description |
---|---|
Cloud |
A |
config: SkypilotGCPOrchestratorConfig
property
readonly
Returns the SkypilotGCPOrchestratorConfig
config.
Returns:
Type | Description |
---|---|
SkypilotGCPOrchestratorConfig |
The configuration. |
settings_class: Optional[Type[BaseSettings]]
property
readonly
Settings class for the Skypilot orchestrator.
Returns:
Type | Description |
---|---|
Optional[Type[BaseSettings]] |
The settings class. |
prepare_environment_variable(self, set=True)
Set up Environment variables that are required for the orchestrator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
set |
bool |
Whether to set the environment variables or not. |
True |
Source code in zenml/integrations/skypilot/orchestrators/skypilot_gcp_vm_orchestrator.py
def prepare_environment_variable(self, set: bool = True) -> None:
"""Set up Environment variables that are required for the orchestrator.
Args:
set: Whether to set the environment variables or not.
"""
pass