Skypilot
zenml.integrations.skypilot
special
Initialization of the Skypilot integration for ZenML.
The Skypilot integration sub-module powers an alternative to the local orchestrator for a remote orchestration of ZenML pipelines on VMs.
SkypilotAWSIntegration (Integration)
Definition of Skypilot AWS Integration for ZenML.
Source code in zenml/integrations/skypilot/__init__.py
class SkypilotAWSIntegration(Integration):
"""Definition of Skypilot AWS Integration for ZenML."""
NAME = SKYPILOT_AWS
REQUIREMENTS = ["skypilot[aws]"]
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Skypilot AWS integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.skypilot.flavors import (
SkypilotAWSOrchestratorFlavor,
)
return [SkypilotAWSOrchestratorFlavor]
flavors()
classmethod
Declare the stack component flavors for the Skypilot AWS integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/skypilot/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Skypilot AWS integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.skypilot.flavors import (
SkypilotAWSOrchestratorFlavor,
)
return [SkypilotAWSOrchestratorFlavor]
SkypilotAzureIntegration (Integration)
Definition of Skypilot Integration for ZenML.
Source code in zenml/integrations/skypilot/__init__.py
class SkypilotAzureIntegration(Integration):
"""Definition of Skypilot Integration for ZenML."""
NAME = SKYPILOT_AZURE
REQUIREMENTS = ["skypilot[azure]"]
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Skypilot Azure integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.skypilot.flavors import (
SkypilotAzureOrchestratorFlavor,
)
return [SkypilotAzureOrchestratorFlavor]
flavors()
classmethod
Declare the stack component flavors for the Skypilot Azure integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/skypilot/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Skypilot Azure integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.skypilot.flavors import (
SkypilotAzureOrchestratorFlavor,
)
return [SkypilotAzureOrchestratorFlavor]
SkypilotGCPIntegration (Integration)
Definition of Skypilot Integration for ZenML.
Source code in zenml/integrations/skypilot/__init__.py
class SkypilotGCPIntegration(Integration):
"""Definition of Skypilot Integration for ZenML."""
NAME = SKYPILOT_GCP
REQUIREMENTS = ["skypilot[gcp]"]
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Skypilot GCP integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.skypilot.flavors import (
SkypilotGCPOrchestratorFlavor,
)
return [SkypilotGCPOrchestratorFlavor]
flavors()
classmethod
Declare the stack component flavors for the Skypilot GCP integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/skypilot/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Skypilot GCP integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.skypilot.flavors import (
SkypilotGCPOrchestratorFlavor,
)
return [SkypilotGCPOrchestratorFlavor]
flavors
special
Skypilot integration flavors.
skypilot_orchestrator_aws_vm_flavor
Skypilot orchestrator AWS flavor.
SkypilotAWSOrchestratorConfig (SkypilotBaseOrchestratorConfig, SkypilotAWSOrchestratorSettings)
pydantic-model
Skypilot orchestrator config.
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_aws_vm_flavor.py
class SkypilotAWSOrchestratorConfig( # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
SkypilotBaseOrchestratorConfig, SkypilotAWSOrchestratorSettings
):
"""Skypilot orchestrator config."""
SkypilotAWSOrchestratorFlavor (BaseOrchestratorFlavor)
Flavor for the Skypilot AWS orchestrator.
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_aws_vm_flavor.py
class SkypilotAWSOrchestratorFlavor(BaseOrchestratorFlavor):
"""Flavor for the Skypilot AWS orchestrator."""
@property
def name(self) -> str:
"""Name of the orchestrator flavor.
Returns:
Name of the orchestrator flavor.
"""
return SKYPILOT_AWS_ORCHESTRATOR_FLAVOR
@property
def service_connector_requirements(
self,
) -> Optional[ServiceConnectorRequirements]:
"""Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available
service connector types that are compatible with this flavor.
Returns:
Requirements for compatible service connectors, if a service
connector is required for this flavor.
"""
return ServiceConnectorRequirements(
resource_type="aws-generic",
)
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/aws-skypilot.png"
@property
def config_class(self) -> Type[BaseOrchestratorConfig]:
"""Config class for the base orchestrator flavor.
Returns:
The config class.
"""
return SkypilotAWSOrchestratorConfig
@property
def implementation_class(self) -> Type["SkypilotAWSOrchestrator"]:
"""Implementation class for this flavor.
Returns:
Implementation class for this flavor.
"""
from zenml.integrations.skypilot.orchestrators import (
SkypilotAWSOrchestrator,
)
return SkypilotAWSOrchestrator
config_class: Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig]
property
readonly
Config class for the base orchestrator flavor.
Returns:
Type | Description |
---|---|
Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[SkypilotAWSOrchestrator]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[SkypilotAWSOrchestrator] |
Implementation class for this flavor. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the orchestrator flavor.
Returns:
Type | Description |
---|---|
str |
Name of the orchestrator flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
service_connector_requirements: Optional[zenml.models.service_connector_models.ServiceConnectorRequirements]
property
readonly
Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.
Returns:
Type | Description |
---|---|
Optional[zenml.models.service_connector_models.ServiceConnectorRequirements] |
Requirements for compatible service connectors, if a service connector is required for this flavor. |
SkypilotAWSOrchestratorSettings (SkypilotBaseOrchestratorSettings)
pydantic-model
Skypilot orchestrator settings.
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_aws_vm_flavor.py
class SkypilotAWSOrchestratorSettings(SkypilotBaseOrchestratorSettings):
"""Skypilot orchestrator settings."""
skypilot_orchestrator_azure_vm_flavor
Skypilot orchestrator Azure flavor.
SkypilotAzureOrchestratorConfig (SkypilotBaseOrchestratorConfig, SkypilotAzureOrchestratorSettings)
pydantic-model
Skypilot orchestrator config for Azure.
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_azure_vm_flavor.py
class SkypilotAzureOrchestratorConfig( # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
SkypilotBaseOrchestratorConfig, SkypilotAzureOrchestratorSettings
):
"""Skypilot orchestrator config for Azure."""
SkypilotAzureOrchestratorFlavor (BaseOrchestratorFlavor)
Flavor for the Skypilot orchestrator for Azure.
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_azure_vm_flavor.py
class SkypilotAzureOrchestratorFlavor(BaseOrchestratorFlavor):
"""Flavor for the Skypilot orchestrator for Azure."""
@property
def name(self) -> str:
"""Name of the orchestrator flavor.
Returns:
Name of the orchestrator flavor.
"""
return SKYPILOT_AZURE_ORCHESTRATOR_FLAVOR
@property
def service_connector_requirements(
self,
) -> Optional[ServiceConnectorRequirements]:
"""Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available
service connector types that are compatible with this flavor.
Returns:
Requirements for compatible service connectors, if a service
connector is required for this flavor.
"""
return ServiceConnectorRequirements(
resource_type="azure-generic",
)
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/azure-skypilot.png"
@property
def config_class(self) -> Type[BaseOrchestratorConfig]:
"""Config class for the base orchestrator flavor.
Returns:
The config class.
"""
return SkypilotAzureOrchestratorConfig
@property
def implementation_class(self) -> Type["SkypilotAzureOrchestrator"]:
"""Implementation class for this flavor.
Returns:
Implementation class for this flavor.
"""
from zenml.integrations.skypilot.orchestrators import (
SkypilotAzureOrchestrator,
)
return SkypilotAzureOrchestrator
config_class: Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig]
property
readonly
Config class for the base orchestrator flavor.
Returns:
Type | Description |
---|---|
Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[SkypilotAzureOrchestrator]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[SkypilotAzureOrchestrator] |
Implementation class for this flavor. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the orchestrator flavor.
Returns:
Type | Description |
---|---|
str |
Name of the orchestrator flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
service_connector_requirements: Optional[zenml.models.service_connector_models.ServiceConnectorRequirements]
property
readonly
Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.
Returns:
Type | Description |
---|---|
Optional[zenml.models.service_connector_models.ServiceConnectorRequirements] |
Requirements for compatible service connectors, if a service connector is required for this flavor. |
SkypilotAzureOrchestratorSettings (SkypilotBaseOrchestratorSettings)
pydantic-model
Skypilot orchestrator settings for Azure.
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_azure_vm_flavor.py
class SkypilotAzureOrchestratorSettings(SkypilotBaseOrchestratorSettings):
"""Skypilot orchestrator settings for Azure."""
skypilot_orchestrator_base_vm_config
Skypilot orchestrator base config and settings.
SkypilotBaseOrchestratorConfig (BaseOrchestratorConfig, SkypilotBaseOrchestratorSettings)
pydantic-model
Skypilot orchestrator base config.
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py
class SkypilotBaseOrchestratorConfig( # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
BaseOrchestratorConfig, SkypilotBaseOrchestratorSettings
):
"""Skypilot orchestrator base config."""
@property
def is_local(self) -> bool:
"""Checks if this stack component is running locally.
This designation is used to determine if the stack component can be
shared with other users or if it is only usable on the local host.
Returns:
True if this config is for a local component, False otherwise.
"""
return False
is_local: bool
property
readonly
Checks if this stack component is running locally.
This designation is used to determine if the stack component can be shared with other users or if it is only usable on the local host.
Returns:
Type | Description |
---|---|
bool |
True if this config is for a local component, False otherwise. |
SkypilotBaseOrchestratorSettings (BaseSettings)
pydantic-model
Skypilot orchestrator base settings.
Attributes:
Name | Type | Description |
---|---|---|
instance_type |
Optional[str] |
the instance type to use. |
cpus |
Union[NoneType, int, float, str] |
the number of CPUs required for the task.
If a str, must be a string of the form |
memory |
Union[NoneType, int, float, str] |
the amount of memory in GiB required. If a
str, must be a string of the form |
accelerators |
Union[NoneType, str, Dict[str, int]] |
the accelerators required. If a str, must be
a string of the form |
accelerator_args |
Optional[Dict[str, str]] |
accelerator-specific arguments. For example,
|
use_spot |
Optional[bool] |
whether to use spot instances. If None, defaults to False. |
spot_recovery |
Optional[str] |
the spot recovery strategy to use for the managed
spot to recover the cluster from preemption. Refer to
|
region |
Optional[str] |
the region to use. |
zone |
Optional[str] |
the zone to use. |
image_id |
Union[Dict[str, str], str] |
the image ID to use. If a str, must be a string
of the image id from the cloud, such as AWS:
.. code-block:: python
|
disk_size |
Optional[int] |
the size of the OS disk in GiB. |
disk_tier |
Optional[Literal['high', 'medium', 'low']] |
the disk performance tier to use. If None, defaults to
|
cluster_name |
Optional[str] |
name of the cluster to create/reuse. If None, auto-generate a name. |
retry_until_up |
bool |
whether to retry launching the cluster until it is up. |
idle_minutes_to_autostop |
Optional[int] |
automatically stop the cluster after this
many minute of idleness, i.e., no running or pending jobs in the
cluster's job queue. Idleness gets reset whenever setting-up/
running/pending jobs are found in the job queue. Setting this
flag is equivalent to running
|
down |
bool |
Tear down the cluster after all jobs finish (successfully or abnormally). If --idle-minutes-to-autostop is also set, the cluster will be torn down after the specified idle time. Note that if errors occur during provisioning/data syncing/setting up, the cluster will not be torn down for debugging purposes. |
stream_logs |
bool |
if True, show the logs in the terminal. |
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py
class SkypilotBaseOrchestratorSettings(BaseSettings):
"""Skypilot orchestrator base settings.
Attributes:
instance_type: the instance type to use.
cpus: the number of CPUs required for the task.
If a str, must be a string of the form `'2'` or `'2+'`, where
the `+` indicates that the task requires at least 2 CPUs.
memory: the amount of memory in GiB required. If a
str, must be a string of the form `'16'` or `'16+'`, where
the `+` indicates that the task requires at least 16 GB of memory.
accelerators: the accelerators required. If a str, must be
a string of the form `'V100'` or `'V100:2'`, where the `:2`
indicates that the task requires 2 V100 GPUs. If a dict, must be a
dict of the form `{'V100': 2}` or `{'tpu-v2-8': 1}`.
accelerator_args: accelerator-specific arguments. For example,
`{'tpu_vm': True, 'runtime_version': 'tpu-vm-base'}` for TPUs.
use_spot: whether to use spot instances. If None, defaults to
False.
spot_recovery: the spot recovery strategy to use for the managed
spot to recover the cluster from preemption. Refer to
`recovery_strategy module <https://github.com/skypilot-org/skypilot/blob/master/sky/spot/recovery_strategy.py>`__ # pylint: disable=line-too-long
for more details.
region: the region to use.
zone: the zone to use.
image_id: the image ID to use. If a str, must be a string
of the image id from the cloud, such as AWS:
``'ami-1234567890abcdef0'``, GCP:
``'projects/my-project-id/global/images/my-image-name'``;
Or, a image tag provided by SkyPilot, such as AWS:
``'skypilot:gpu-ubuntu-2004'``. If a dict, must be a dict mapping
from region to image ID, such as:
.. code-block:: python
{
'us-west1': 'ami-1234567890abcdef0',
'us-east1': 'ami-1234567890abcdef0'
}
disk_size: the size of the OS disk in GiB.
disk_tier: the disk performance tier to use. If None, defaults to
``'medium'``.
cluster_name: name of the cluster to create/reuse. If None,
auto-generate a name.
retry_until_up: whether to retry launching the cluster until it is
up.
idle_minutes_to_autostop: automatically stop the cluster after this
many minute of idleness, i.e., no running or pending jobs in the
cluster's job queue. Idleness gets reset whenever setting-up/
running/pending jobs are found in the job queue. Setting this
flag is equivalent to running
``sky.launch(..., detach_run=True, ...)`` and then
``sky.autostop(idle_minutes=<minutes>)``. If not set, the cluster
will not be autostopped.
down: Tear down the cluster after all jobs finish (successfully or
abnormally). If --idle-minutes-to-autostop is also set, the
cluster will be torn down after the specified idle time.
Note that if errors occur during provisioning/data syncing/setting
up, the cluster will not be torn down for debugging purposes.
stream_logs: if True, show the logs in the terminal.
"""
# Resources
instance_type: Optional[str] = None
cpus: Union[None, int, float, str] = None
memory: Union[None, int, float, str] = None
accelerators: Union[None, str, Dict[str, int]] = None
accelerator_args: Optional[Dict[str, str]] = None
use_spot: Optional[bool] = None
spot_recovery: Optional[str] = None
region: Optional[str] = None
zone: Optional[str] = None
image_id: Union[Dict[str, str], str, None] = None
disk_size: Optional[int] = None
disk_tier: Optional[Literal["high", "medium", "low"]] = None
# Run settings
cluster_name: Optional[str] = None
retry_until_up: bool = False
idle_minutes_to_autostop: Optional[int] = 30
down: bool = True
stream_logs: bool = True
skypilot_orchestrator_gcp_vm_flavor
Skypilot orchestrator GCP flavor.
SkypilotGCPOrchestratorConfig (SkypilotBaseOrchestratorConfig, GoogleCredentialsConfigMixin, SkypilotGCPOrchestratorSettings)
pydantic-model
Skypilot orchestrator config for GCP.
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_gcp_vm_flavor.py
class SkypilotGCPOrchestratorConfig( # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
SkypilotBaseOrchestratorConfig,
GoogleCredentialsConfigMixin,
SkypilotGCPOrchestratorSettings,
):
"""Skypilot orchestrator config for GCP."""
SkypilotGCPOrchestratorFlavor (BaseOrchestratorFlavor)
Flavor for the Skypilot orchestrator for GCP.
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_gcp_vm_flavor.py
class SkypilotGCPOrchestratorFlavor(BaseOrchestratorFlavor):
"""Flavor for the Skypilot orchestrator for GCP."""
@property
def name(self) -> str:
"""Name of the orchestrator flavor.
Returns:
Name of the orchestrator flavor.
"""
return SKYPILOT_GCP_ORCHESTRATOR_FLAVOR
@property
def service_connector_requirements(
self,
) -> Optional[ServiceConnectorRequirements]:
"""Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available
service connector types that are compatible with this flavor.
Returns:
Requirements for compatible service connectors, if a service
connector is required for this flavor.
"""
return ServiceConnectorRequirements(
resource_type="gcp-generic",
)
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/gcp-skypilot.png"
@property
def config_class(self) -> Type[BaseOrchestratorConfig]:
"""Config class for the base orchestrator flavor.
Returns:
The config class.
"""
return SkypilotGCPOrchestratorConfig
@property
def implementation_class(self) -> Type["SkypilotGCPOrchestrator"]:
"""Implementation class for this flavor.
Returns:
Implementation class for this flavor.
"""
from zenml.integrations.skypilot.orchestrators import (
SkypilotGCPOrchestrator,
)
return SkypilotGCPOrchestrator
config_class: Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig]
property
readonly
Config class for the base orchestrator flavor.
Returns:
Type | Description |
---|---|
Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[SkypilotGCPOrchestrator]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[SkypilotGCPOrchestrator] |
Implementation class for this flavor. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the orchestrator flavor.
Returns:
Type | Description |
---|---|
str |
Name of the orchestrator flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
service_connector_requirements: Optional[zenml.models.service_connector_models.ServiceConnectorRequirements]
property
readonly
Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.
Returns:
Type | Description |
---|---|
Optional[zenml.models.service_connector_models.ServiceConnectorRequirements] |
Requirements for compatible service connectors, if a service connector is required for this flavor. |
SkypilotGCPOrchestratorSettings (SkypilotBaseOrchestratorSettings)
pydantic-model
Skypilot orchestrator settings for GCP.
Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_gcp_vm_flavor.py
class SkypilotGCPOrchestratorSettings(SkypilotBaseOrchestratorSettings):
"""Skypilot orchestrator settings for GCP."""
orchestrators
special
Initialization of the Skypilot ZenML orchestrators.
skypilot_aws_vm_orchestrator
Implementation of the a Skypilot based AWS VM orchestrator.
SkypilotAWSOrchestrator (SkypilotBaseOrchestrator)
Orchestrator responsible for running pipelines remotely in a VM on AWS.
This orchestrator does not support running on a schedule.
Source code in zenml/integrations/skypilot/orchestrators/skypilot_aws_vm_orchestrator.py
class SkypilotAWSOrchestrator(SkypilotBaseOrchestrator):
"""Orchestrator responsible for running pipelines remotely in a VM on AWS.
This orchestrator does not support running on a schedule.
"""
DEFAULT_INSTANCE_TYPE: str = "t3.xlarge"
@property
def cloud(self) -> sky.clouds.Cloud:
"""The type of sky cloud to use.
Returns:
A `sky.clouds.Cloud` instance.
"""
return sky.clouds.AWS()
def get_setup(self, stack: Optional["Stack"]) -> Optional[str]:
"""Run to set up the sky job.
Args:
stack: The stack to use.
Returns:
A `setup` string.
"""
assert stack is not None and stack.container_registry is not None
assert hasattr(stack.container_registry, "_get_region")
return f"aws ecr get-login-password --region {stack.container_registry._get_region()} | docker login --username AWS --password-stdin {stack.container_registry.config.uri}"
@property
def config(self) -> SkypilotAWSOrchestratorConfig:
"""Returns the `SkypilotAWSOrchestratorConfig` config.
Returns:
The configuration.
"""
return cast(SkypilotAWSOrchestratorConfig, self._config)
@property
def settings_class(self) -> Optional[Type["BaseSettings"]]:
"""Settings class for the Skypilot orchestrator.
Returns:
The settings class.
"""
return SkypilotAWSOrchestratorSettings
def prepare_environment_variable(self, set: bool = True) -> None:
"""Set up Environment variables that are required for the orchestrator.
Args:
set: Whether to set the environment variables or not.
Raises:
ValueError: If no service connector is found.
"""
connector = self.get_connector()
if connector is None:
raise ValueError(
"No service connector found. Please make sure to set up a connector "
"that is compatible with this orchestrator."
)
if set:
# The AWS connector creates a local configuration profile with the name computed from
# the first 8 digits of its UUID.
aws_profile = f"zenml-{str(connector.id)[:8]}"
os.environ[ENV_AWS_PROFILE] = aws_profile
else:
os.environ.pop(ENV_AWS_PROFILE, None)
cloud: Cloud
property
readonly
The type of sky cloud to use.
Returns:
Type | Description |
---|---|
Cloud |
A |
config: SkypilotAWSOrchestratorConfig
property
readonly
Returns the SkypilotAWSOrchestratorConfig
config.
Returns:
Type | Description |
---|---|
SkypilotAWSOrchestratorConfig |
The configuration. |
settings_class: Optional[Type[BaseSettings]]
property
readonly
Settings class for the Skypilot orchestrator.
Returns:
Type | Description |
---|---|
Optional[Type[BaseSettings]] |
The settings class. |
get_setup(self, stack)
Run to set up the sky job.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stack |
Optional[Stack] |
The stack to use. |
required |
Returns:
Type | Description |
---|---|
Optional[str] |
A |
Source code in zenml/integrations/skypilot/orchestrators/skypilot_aws_vm_orchestrator.py
def get_setup(self, stack: Optional["Stack"]) -> Optional[str]:
"""Run to set up the sky job.
Args:
stack: The stack to use.
Returns:
A `setup` string.
"""
assert stack is not None and stack.container_registry is not None
assert hasattr(stack.container_registry, "_get_region")
return f"aws ecr get-login-password --region {stack.container_registry._get_region()} | docker login --username AWS --password-stdin {stack.container_registry.config.uri}"
prepare_environment_variable(self, set=True)
Set up Environment variables that are required for the orchestrator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
set |
bool |
Whether to set the environment variables or not. |
True |
Exceptions:
Type | Description |
---|---|
ValueError |
If no service connector is found. |
Source code in zenml/integrations/skypilot/orchestrators/skypilot_aws_vm_orchestrator.py
def prepare_environment_variable(self, set: bool = True) -> None:
"""Set up Environment variables that are required for the orchestrator.
Args:
set: Whether to set the environment variables or not.
Raises:
ValueError: If no service connector is found.
"""
connector = self.get_connector()
if connector is None:
raise ValueError(
"No service connector found. Please make sure to set up a connector "
"that is compatible with this orchestrator."
)
if set:
# The AWS connector creates a local configuration profile with the name computed from
# the first 8 digits of its UUID.
aws_profile = f"zenml-{str(connector.id)[:8]}"
os.environ[ENV_AWS_PROFILE] = aws_profile
else:
os.environ.pop(ENV_AWS_PROFILE, None)
skypilot_azure_vm_orchestrator
Implementation of the a Skypilot based Azure VM orchestrator.
SkypilotAzureOrchestrator (SkypilotBaseOrchestrator)
Orchestrator responsible for running pipelines remotely in a VM on Azure.
This orchestrator does not support running on a schedule.
Source code in zenml/integrations/skypilot/orchestrators/skypilot_azure_vm_orchestrator.py
class SkypilotAzureOrchestrator(SkypilotBaseOrchestrator):
"""Orchestrator responsible for running pipelines remotely in a VM on Azure.
This orchestrator does not support running on a schedule.
"""
DEFAULT_INSTANCE_TYPE: str = "Standard_B1ms"
@property
def cloud(self) -> sky.clouds.Cloud:
"""The type of sky cloud to use.
Returns:
A `sky.clouds.Cloud` instance.
"""
return sky.clouds.Azure()
@property
def config(self) -> SkypilotAzureOrchestratorConfig:
"""Returns the `SkypilotAzureOrchestratorConfig` config.
Returns:
The configuration.
"""
return cast(SkypilotAzureOrchestratorConfig, self._config)
@property
def settings_class(self) -> Optional[Type["BaseSettings"]]:
"""Settings class for the Skypilot orchestrator.
Returns:
The settings class.
"""
return SkypilotAzureOrchestratorSettings
def prepare_environment_variable(self, set: bool = True) -> None:
"""Set up Environment variables that are required for the orchestrator.
Args:
set: Whether to set the environment variables or not.
"""
pass
cloud: Cloud
property
readonly
The type of sky cloud to use.
Returns:
Type | Description |
---|---|
Cloud |
A |
config: SkypilotAzureOrchestratorConfig
property
readonly
Returns the SkypilotAzureOrchestratorConfig
config.
Returns:
Type | Description |
---|---|
SkypilotAzureOrchestratorConfig |
The configuration. |
settings_class: Optional[Type[BaseSettings]]
property
readonly
Settings class for the Skypilot orchestrator.
Returns:
Type | Description |
---|---|
Optional[Type[BaseSettings]] |
The settings class. |
prepare_environment_variable(self, set=True)
Set up Environment variables that are required for the orchestrator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
set |
bool |
Whether to set the environment variables or not. |
True |
Source code in zenml/integrations/skypilot/orchestrators/skypilot_azure_vm_orchestrator.py
def prepare_environment_variable(self, set: bool = True) -> None:
"""Set up Environment variables that are required for the orchestrator.
Args:
set: Whether to set the environment variables or not.
"""
pass
skypilot_base_vm_orchestrator
Implementation of the a Skypilot base VM orchestrator.
SkypilotBaseOrchestrator (ContainerizedOrchestrator)
Base class for Orchestrator responsible for running pipelines remotely in a VM.
This orchestrator does not support running on a schedule.
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
class SkypilotBaseOrchestrator(ContainerizedOrchestrator):
"""Base class for Orchestrator responsible for running pipelines remotely in a VM.
This orchestrator does not support running on a schedule.
"""
# The default instance type to use if none is specified in settings
DEFAULT_INSTANCE_TYPE: Optional[str] = None
@property
def validator(self) -> Optional[StackValidator]:
"""Validates the stack.
In the remote case, checks that the stack contains a container registry,
image builder and only remote components.
Returns:
A `StackValidator` instance.
"""
def _validate_remote_components(
stack: "Stack",
) -> Tuple[bool, str]:
for component in stack.components.values():
if not component.config.is_local:
continue
return False, (
f"The Skypilot orchestrator runs pipelines remotely, "
f"but the '{component.name}' {component.type.value} is "
"a local stack component and will not be available in "
"the Skypilot step.\nPlease ensure that you always "
"use non-local stack components with the Skypilot "
"orchestrator."
)
return True, ""
return StackValidator(
required_components={
StackComponentType.CONTAINER_REGISTRY,
StackComponentType.IMAGE_BUILDER,
},
custom_validation_function=_validate_remote_components,
)
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If the environment variable specifying the run id
is not set.
Returns:
The orchestrator run id.
"""
try:
return os.environ[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID]
except KeyError:
raise RuntimeError(
"Unable to read run id from environment variable "
f"{ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID}."
)
@property
@abstractmethod
def cloud(self) -> sky.clouds.Cloud:
"""The type of sky cloud to use.
Returns:
A `sky.clouds.Cloud` instance.
"""
def setup_credentials(self) -> None:
"""Set up credentials for the orchestrator."""
connector = self.get_connector()
assert connector is not None
connector.configure_local_client()
def get_setup(self, stack: Optional["Stack"]) -> Optional[str]:
"""Run to set up the sky job.
Args:
stack: The stack to use.
Returns:
A `setup` string.
"""
return None
@abstractmethod
def prepare_environment_variable(self, set: bool = True) -> None:
"""Set up Environment variables that are required for the orchestrator.
Args:
set: Whether to set the environment variables or not.
"""
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeploymentResponseModel",
stack: "Stack",
environment: Dict[str, str],
) -> Any:
"""Runs all pipeline steps in Skypilot containers.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
environment: Environment variables to set in the orchestration
environment.
Raises:
Exception: If the pipeline run fails.
"""
if deployment.schedule:
logger.warning(
"Skypilot Orchestrator currently does not support the"
"use of schedules. The `schedule` will be ignored "
"and the pipeline will be run immediately."
)
# Set up some variables for configuration
orchestrator_run_id = str(uuid4())
environment[
ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID
] = orchestrator_run_id
settings = cast(
SkypilotBaseOrchestratorSettings,
self.get_settings(deployment),
)
entrypoint = PipelineEntrypointConfiguration.get_entrypoint_command()
entrypoint_str = " ".join(entrypoint)
arguments = PipelineEntrypointConfiguration.get_entrypoint_arguments(
deployment_id=deployment.id
)
arguments_str = " ".join(arguments)
# Set up docker run command
image = self.get_image(deployment=deployment)
docker_environment_str = " ".join(
f"-e {k}={v}" for k, v in environment.items()
)
start_time = time.time()
instance_type = settings.instance_type or self.DEFAULT_INSTANCE_TYPE
# Set up credentials
self.setup_credentials()
# Run the entire pipeline
try:
task = sky.Task(
run=f"docker run --rm {docker_environment_str} {image} {entrypoint_str} {arguments_str}",
setup=self.get_setup(stack),
)
task = task.set_resources(
sky.Resources(
cloud=self.cloud,
instance_type=instance_type,
cpus=settings.cpus,
memory=settings.memory,
accelerators=settings.accelerators,
accelerator_args=settings.accelerator_args,
use_spot=settings.use_spot,
spot_recovery=settings.spot_recovery,
region=settings.region,
zone=settings.zone,
image_id=settings.image_id,
disk_size=settings.disk_size,
disk_tier=settings.disk_tier,
)
)
cluster_name = settings.cluster_name
if cluster_name is None:
# Find existing cluster
for i in sky.status(refresh=True):
if isinstance(
i["handle"].launched_resources.cloud, type(self.cloud)
):
cluster_name = i["handle"].cluster_name
logger.info(
f"Found existing cluster {cluster_name}. Reusing..."
)
# Set the service connector AWS profile ENV variable
self.prepare_environment_variable(set=True)
# Launch the cluster
sky.launch(
task,
cluster_name,
retry_until_up=settings.retry_until_up,
idle_minutes_to_autostop=settings.idle_minutes_to_autostop,
down=settings.down,
stream_logs=settings.stream_logs,
)
except Exception as e:
raise e
finally:
# Unset the service connector AWS profile ENV variable
self.prepare_environment_variable(set=False)
run_duration = time.time() - start_time
run_id = orchestrator_utils.get_run_id_for_orchestrator_run_id(
orchestrator=self, orchestrator_run_id=orchestrator_run_id
)
run_model = Client().zen_store.get_run(run_id)
logger.info(
"Pipeline run `%s` has finished in `%s`.\n",
run_model.name,
string_utils.get_human_readable_time(run_duration),
)
cloud: Cloud
property
readonly
The type of sky cloud to use.
Returns:
Type | Description |
---|---|
Cloud |
A |
validator: Optional[zenml.stack.stack_validator.StackValidator]
property
readonly
Validates the stack.
In the remote case, checks that the stack contains a container registry, image builder and only remote components.
Returns:
Type | Description |
---|---|
Optional[zenml.stack.stack_validator.StackValidator] |
A |
get_orchestrator_run_id(self)
Returns the active orchestrator run id.
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the environment variable specifying the run id is not set. |
Returns:
Type | Description |
---|---|
str |
The orchestrator run id. |
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If the environment variable specifying the run id
is not set.
Returns:
The orchestrator run id.
"""
try:
return os.environ[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID]
except KeyError:
raise RuntimeError(
"Unable to read run id from environment variable "
f"{ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID}."
)
get_setup(self, stack)
Run to set up the sky job.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stack |
Optional[Stack] |
The stack to use. |
required |
Returns:
Type | Description |
---|---|
Optional[str] |
A |
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
def get_setup(self, stack: Optional["Stack"]) -> Optional[str]:
"""Run to set up the sky job.
Args:
stack: The stack to use.
Returns:
A `setup` string.
"""
return None
prepare_environment_variable(self, set=True)
Set up Environment variables that are required for the orchestrator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
set |
bool |
Whether to set the environment variables or not. |
True |
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
@abstractmethod
def prepare_environment_variable(self, set: bool = True) -> None:
"""Set up Environment variables that are required for the orchestrator.
Args:
set: Whether to set the environment variables or not.
"""
prepare_or_run_pipeline(self, deployment, stack, environment)
Runs all pipeline steps in Skypilot containers.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentResponseModel |
The pipeline deployment to prepare or run. |
required |
stack |
Stack |
The stack the pipeline will run on. |
required |
environment |
Dict[str, str] |
Environment variables to set in the orchestration environment. |
required |
Exceptions:
Type | Description |
---|---|
Exception |
If the pipeline run fails. |
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeploymentResponseModel",
stack: "Stack",
environment: Dict[str, str],
) -> Any:
"""Runs all pipeline steps in Skypilot containers.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
environment: Environment variables to set in the orchestration
environment.
Raises:
Exception: If the pipeline run fails.
"""
if deployment.schedule:
logger.warning(
"Skypilot Orchestrator currently does not support the"
"use of schedules. The `schedule` will be ignored "
"and the pipeline will be run immediately."
)
# Set up some variables for configuration
orchestrator_run_id = str(uuid4())
environment[
ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID
] = orchestrator_run_id
settings = cast(
SkypilotBaseOrchestratorSettings,
self.get_settings(deployment),
)
entrypoint = PipelineEntrypointConfiguration.get_entrypoint_command()
entrypoint_str = " ".join(entrypoint)
arguments = PipelineEntrypointConfiguration.get_entrypoint_arguments(
deployment_id=deployment.id
)
arguments_str = " ".join(arguments)
# Set up docker run command
image = self.get_image(deployment=deployment)
docker_environment_str = " ".join(
f"-e {k}={v}" for k, v in environment.items()
)
start_time = time.time()
instance_type = settings.instance_type or self.DEFAULT_INSTANCE_TYPE
# Set up credentials
self.setup_credentials()
# Run the entire pipeline
try:
task = sky.Task(
run=f"docker run --rm {docker_environment_str} {image} {entrypoint_str} {arguments_str}",
setup=self.get_setup(stack),
)
task = task.set_resources(
sky.Resources(
cloud=self.cloud,
instance_type=instance_type,
cpus=settings.cpus,
memory=settings.memory,
accelerators=settings.accelerators,
accelerator_args=settings.accelerator_args,
use_spot=settings.use_spot,
spot_recovery=settings.spot_recovery,
region=settings.region,
zone=settings.zone,
image_id=settings.image_id,
disk_size=settings.disk_size,
disk_tier=settings.disk_tier,
)
)
cluster_name = settings.cluster_name
if cluster_name is None:
# Find existing cluster
for i in sky.status(refresh=True):
if isinstance(
i["handle"].launched_resources.cloud, type(self.cloud)
):
cluster_name = i["handle"].cluster_name
logger.info(
f"Found existing cluster {cluster_name}. Reusing..."
)
# Set the service connector AWS profile ENV variable
self.prepare_environment_variable(set=True)
# Launch the cluster
sky.launch(
task,
cluster_name,
retry_until_up=settings.retry_until_up,
idle_minutes_to_autostop=settings.idle_minutes_to_autostop,
down=settings.down,
stream_logs=settings.stream_logs,
)
except Exception as e:
raise e
finally:
# Unset the service connector AWS profile ENV variable
self.prepare_environment_variable(set=False)
run_duration = time.time() - start_time
run_id = orchestrator_utils.get_run_id_for_orchestrator_run_id(
orchestrator=self, orchestrator_run_id=orchestrator_run_id
)
run_model = Client().zen_store.get_run(run_id)
logger.info(
"Pipeline run `%s` has finished in `%s`.\n",
run_model.name,
string_utils.get_human_readable_time(run_duration),
)
setup_credentials(self)
Set up credentials for the orchestrator.
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
def setup_credentials(self) -> None:
"""Set up credentials for the orchestrator."""
connector = self.get_connector()
assert connector is not None
connector.configure_local_client()
skypilot_gcp_vm_orchestrator
Implementation of the a Skypilot-based GCP VM orchestrator.
SkypilotGCPOrchestrator (SkypilotBaseOrchestrator, GoogleCredentialsMixin)
Orchestrator responsible for running pipelines remotely in a VM on GCP.
This orchestrator does not support running on a schedule.
Source code in zenml/integrations/skypilot/orchestrators/skypilot_gcp_vm_orchestrator.py
class SkypilotGCPOrchestrator(
SkypilotBaseOrchestrator, GoogleCredentialsMixin
):
"""Orchestrator responsible for running pipelines remotely in a VM on GCP.
This orchestrator does not support running on a schedule.
"""
DEFAULT_INSTANCE_TYPE: str = "n1-standard-4"
@property
def cloud(self) -> sky.clouds.Cloud:
"""The type of sky cloud to use.
Returns:
A `sky.clouds.Cloud` instance.
"""
return sky.clouds.GCP()
@property
def config(self) -> SkypilotGCPOrchestratorConfig:
"""Returns the `SkypilotGCPOrchestratorConfig` config.
Returns:
The configuration.
"""
return cast(SkypilotGCPOrchestratorConfig, self._config)
@property
def settings_class(self) -> Optional[Type["BaseSettings"]]:
"""Settings class for the Skypilot orchestrator.
Returns:
The settings class.
"""
return SkypilotGCPOrchestratorSettings
def prepare_environment_variable(self, set: bool = True) -> None:
"""Set up Environment variables that are required for the orchestrator.
Args:
set: Whether to set the environment variables or not.
"""
pass
cloud: Cloud
property
readonly
The type of sky cloud to use.
Returns:
Type | Description |
---|---|
Cloud |
A |
config: SkypilotGCPOrchestratorConfig
property
readonly
Returns the SkypilotGCPOrchestratorConfig
config.
Returns:
Type | Description |
---|---|
SkypilotGCPOrchestratorConfig |
The configuration. |
settings_class: Optional[Type[BaseSettings]]
property
readonly
Settings class for the Skypilot orchestrator.
Returns:
Type | Description |
---|---|
Optional[Type[BaseSettings]] |
The settings class. |
prepare_environment_variable(self, set=True)
Set up Environment variables that are required for the orchestrator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
set |
bool |
Whether to set the environment variables or not. |
True |
Source code in zenml/integrations/skypilot/orchestrators/skypilot_gcp_vm_orchestrator.py
def prepare_environment_variable(self, set: bool = True) -> None:
"""Set up Environment variables that are required for the orchestrator.
Args:
set: Whether to set the environment variables or not.
"""
pass