Skip to content

Skypilot

zenml.integrations.skypilot special

Initialization of the Skypilot integration for ZenML.

The Skypilot integration sub-module powers an alternative to the local orchestrator for a remote orchestration of ZenML pipelines on VMs.

SkypilotAWSIntegration (Integration)

Definition of Skypilot AWS Integration for ZenML.

Source code in zenml/integrations/skypilot/__init__.py
class SkypilotAWSIntegration(Integration):
    """Definition of Skypilot AWS Integration for ZenML."""

    NAME = SKYPILOT_AWS
    REQUIREMENTS = ["skypilot[aws]"]

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Skypilot AWS integration.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.skypilot.flavors import (
            SkypilotAWSOrchestratorFlavor,
        )

        return [SkypilotAWSOrchestratorFlavor]

flavors() classmethod

Declare the stack component flavors for the Skypilot AWS integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/skypilot/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Skypilot AWS integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.skypilot.flavors import (
        SkypilotAWSOrchestratorFlavor,
    )

    return [SkypilotAWSOrchestratorFlavor]

SkypilotAzureIntegration (Integration)

Definition of Skypilot Integration for ZenML.

Source code in zenml/integrations/skypilot/__init__.py
class SkypilotAzureIntegration(Integration):
    """Definition of Skypilot Integration for ZenML."""

    NAME = SKYPILOT_AZURE
    REQUIREMENTS = ["skypilot[azure]"]

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Skypilot Azure integration.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.skypilot.flavors import (
            SkypilotAzureOrchestratorFlavor,
        )

        return [SkypilotAzureOrchestratorFlavor]

flavors() classmethod

Declare the stack component flavors for the Skypilot Azure integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/skypilot/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Skypilot Azure integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.skypilot.flavors import (
        SkypilotAzureOrchestratorFlavor,
    )

    return [SkypilotAzureOrchestratorFlavor]

SkypilotGCPIntegration (Integration)

Definition of Skypilot Integration for ZenML.

Source code in zenml/integrations/skypilot/__init__.py
class SkypilotGCPIntegration(Integration):
    """Definition of Skypilot Integration for ZenML."""

    NAME = SKYPILOT_GCP
    REQUIREMENTS = ["skypilot[gcp]"]

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Skypilot GCP integration.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.skypilot.flavors import (
            SkypilotGCPOrchestratorFlavor,
        )

        return [SkypilotGCPOrchestratorFlavor]

flavors() classmethod

Declare the stack component flavors for the Skypilot GCP integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/skypilot/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Skypilot GCP integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.skypilot.flavors import (
        SkypilotGCPOrchestratorFlavor,
    )

    return [SkypilotGCPOrchestratorFlavor]

flavors special

Skypilot integration flavors.

skypilot_orchestrator_aws_vm_flavor

Skypilot orchestrator AWS flavor.

SkypilotAWSOrchestratorConfig (SkypilotBaseOrchestratorConfig, SkypilotAWSOrchestratorSettings) pydantic-model

Skypilot orchestrator config.

Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_aws_vm_flavor.py
class SkypilotAWSOrchestratorConfig(  # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
    SkypilotBaseOrchestratorConfig, SkypilotAWSOrchestratorSettings
):
    """Skypilot orchestrator config."""
SkypilotAWSOrchestratorFlavor (BaseOrchestratorFlavor)

Flavor for the Skypilot AWS orchestrator.

Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_aws_vm_flavor.py
class SkypilotAWSOrchestratorFlavor(BaseOrchestratorFlavor):
    """Flavor for the Skypilot AWS orchestrator."""

    @property
    def name(self) -> str:
        """Name of the orchestrator flavor.

        Returns:
            Name of the orchestrator flavor.
        """
        return SKYPILOT_AWS_ORCHESTRATOR_FLAVOR

    @property
    def service_connector_requirements(
        self,
    ) -> Optional[ServiceConnectorRequirements]:
        """Service connector resource requirements for service connectors.

        Specifies resource requirements that are used to filter the available
        service connector types that are compatible with this flavor.

        Returns:
            Requirements for compatible service connectors, if a service
            connector is required for this flavor.
        """
        return ServiceConnectorRequirements(
            resource_type="aws-generic",
        )

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/aws-skypilot.png"

    @property
    def config_class(self) -> Type[BaseOrchestratorConfig]:
        """Config class for the base orchestrator flavor.

        Returns:
            The config class.
        """
        return SkypilotAWSOrchestratorConfig

    @property
    def implementation_class(self) -> Type["SkypilotAWSOrchestrator"]:
        """Implementation class for this flavor.

        Returns:
            Implementation class for this flavor.
        """
        from zenml.integrations.skypilot.orchestrators import (
            SkypilotAWSOrchestrator,
        )

        return SkypilotAWSOrchestrator
config_class: Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig] property readonly

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[SkypilotAWSOrchestrator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[SkypilotAWSOrchestrator]

Implementation class for this flavor.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the orchestrator flavor.

Returns:

Type Description
str

Name of the orchestrator flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements] property readonly

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service connector is required for this flavor.

SkypilotAWSOrchestratorSettings (SkypilotBaseOrchestratorSettings) pydantic-model

Skypilot orchestrator settings.

Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_aws_vm_flavor.py
class SkypilotAWSOrchestratorSettings(SkypilotBaseOrchestratorSettings):
    """Skypilot orchestrator settings."""

skypilot_orchestrator_azure_vm_flavor

Skypilot orchestrator Azure flavor.

SkypilotAzureOrchestratorConfig (SkypilotBaseOrchestratorConfig, SkypilotAzureOrchestratorSettings) pydantic-model

Skypilot orchestrator config for Azure.

Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_azure_vm_flavor.py
class SkypilotAzureOrchestratorConfig(  # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
    SkypilotBaseOrchestratorConfig, SkypilotAzureOrchestratorSettings
):
    """Skypilot orchestrator config for Azure."""
SkypilotAzureOrchestratorFlavor (BaseOrchestratorFlavor)

Flavor for the Skypilot orchestrator for Azure.

Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_azure_vm_flavor.py
class SkypilotAzureOrchestratorFlavor(BaseOrchestratorFlavor):
    """Flavor for the Skypilot orchestrator for Azure."""

    @property
    def name(self) -> str:
        """Name of the orchestrator flavor.

        Returns:
            Name of the orchestrator flavor.
        """
        return SKYPILOT_AZURE_ORCHESTRATOR_FLAVOR

    @property
    def service_connector_requirements(
        self,
    ) -> Optional[ServiceConnectorRequirements]:
        """Service connector resource requirements for service connectors.

        Specifies resource requirements that are used to filter the available
        service connector types that are compatible with this flavor.

        Returns:
            Requirements for compatible service connectors, if a service
            connector is required for this flavor.
        """
        return ServiceConnectorRequirements(
            resource_type="azure-generic",
        )

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/azure-skypilot.png"

    @property
    def config_class(self) -> Type[BaseOrchestratorConfig]:
        """Config class for the base orchestrator flavor.

        Returns:
            The config class.
        """
        return SkypilotAzureOrchestratorConfig

    @property
    def implementation_class(self) -> Type["SkypilotAzureOrchestrator"]:
        """Implementation class for this flavor.

        Returns:
            Implementation class for this flavor.
        """
        from zenml.integrations.skypilot.orchestrators import (
            SkypilotAzureOrchestrator,
        )

        return SkypilotAzureOrchestrator
config_class: Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig] property readonly

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[SkypilotAzureOrchestrator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[SkypilotAzureOrchestrator]

Implementation class for this flavor.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the orchestrator flavor.

Returns:

Type Description
str

Name of the orchestrator flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements] property readonly

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service connector is required for this flavor.

SkypilotAzureOrchestratorSettings (SkypilotBaseOrchestratorSettings) pydantic-model

Skypilot orchestrator settings for Azure.

Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_azure_vm_flavor.py
class SkypilotAzureOrchestratorSettings(SkypilotBaseOrchestratorSettings):
    """Skypilot orchestrator settings for Azure."""

skypilot_orchestrator_base_vm_config

Skypilot orchestrator base config and settings.

SkypilotBaseOrchestratorConfig (BaseOrchestratorConfig, SkypilotBaseOrchestratorSettings) pydantic-model

Skypilot orchestrator base config.

Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py
class SkypilotBaseOrchestratorConfig(  # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
    BaseOrchestratorConfig, SkypilotBaseOrchestratorSettings
):
    """Skypilot orchestrator base config."""

    @property
    def is_local(self) -> bool:
        """Checks if this stack component is running locally.

        Returns:
            True if this config is for a local component, False otherwise.
        """
        return False
is_local: bool property readonly

Checks if this stack component is running locally.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

SkypilotBaseOrchestratorSettings (BaseSettings) pydantic-model

Skypilot orchestrator base settings.

Attributes:

Name Type Description
instance_type Optional[str]

the instance type to use.

cpus Union[NoneType, int, float, str]

the number of CPUs required for the task. If a str, must be a string of the form '2' or '2+', where the + indicates that the task requires at least 2 CPUs.

memory Union[NoneType, int, float, str]

the amount of memory in GiB required. If a str, must be a string of the form '16' or '16+', where the + indicates that the task requires at least 16 GB of memory.

accelerators Union[NoneType, str, Dict[str, int]]

the accelerators required. If a str, must be a string of the form 'V100' or 'V100:2', where the :2 indicates that the task requires 2 V100 GPUs. If a dict, must be a dict of the form {'V100': 2} or {'tpu-v2-8': 1}.

accelerator_args Optional[Dict[str, str]]

accelerator-specific arguments. For example, {'tpu_vm': True, 'runtime_version': 'tpu-vm-base'} for TPUs.

use_spot Optional[bool]

whether to use spot instances. If None, defaults to False.

spot_recovery Optional[str]

the spot recovery strategy to use for the managed spot to recover the cluster from preemption. Refer to recovery_strategy module <https://github.com/skypilot-org/skypilot/blob/master/sky/spot/recovery_strategy.py>__ # pylint: disable=line-too-long for more details.

region Optional[str]

the region to use.

zone Optional[str]

the zone to use.

image_id Union[Dict[str, str], str]

the image ID to use. If a str, must be a string of the image id from the cloud, such as AWS: 'ami-1234567890abcdef0', GCP: 'projects/my-project-id/global/images/my-image-name'; Or, a image tag provided by SkyPilot, such as AWS: 'skypilot:gpu-ubuntu-2004'. If a dict, must be a dict mapping from region to image ID, such as:

.. code-block:: python

{
'us-west1': 'ami-1234567890abcdef0',
'us-east1': 'ami-1234567890abcdef0'
}
disk_size Optional[int]

the size of the OS disk in GiB.

disk_tier Optional[Literal['high', 'medium', 'low']]

the disk performance tier to use. If None, defaults to 'medium'.

cluster_name Optional[str]

name of the cluster to create/reuse. If None, auto-generate a name.

retry_until_up bool

whether to retry launching the cluster until it is up.

idle_minutes_to_autostop Optional[int]

automatically stop the cluster after this many minute of idleness, i.e., no running or pending jobs in the cluster's job queue. Idleness gets reset whenever setting-up/ running/pending jobs are found in the job queue. Setting this flag is equivalent to running sky.launch(..., detach_run=True, ...) and then sky.autostop(idle_minutes=<minutes>). If not set, the cluster will not be autostopped.

down bool

Tear down the cluster after all jobs finish (successfully or abnormally). If --idle-minutes-to-autostop is also set, the cluster will be torn down after the specified idle time. Note that if errors occur during provisioning/data syncing/setting up, the cluster will not be torn down for debugging purposes.

stream_logs bool

if True, show the logs in the terminal.

Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py
class SkypilotBaseOrchestratorSettings(BaseSettings):
    """Skypilot orchestrator base settings.

    Attributes:
        instance_type: the instance type to use.
        cpus: the number of CPUs required for the task.
            If a str, must be a string of the form `'2'` or `'2+'`, where
            the `+` indicates that the task requires at least 2 CPUs.
        memory: the amount of memory in GiB required. If a
            str, must be a string of the form `'16'` or `'16+'`, where
            the `+` indicates that the task requires at least 16 GB of memory.
        accelerators: the accelerators required. If a str, must be
            a string of the form `'V100'` or `'V100:2'`, where the `:2`
            indicates that the task requires 2 V100 GPUs. If a dict, must be a
            dict of the form `{'V100': 2}` or `{'tpu-v2-8': 1}`.
        accelerator_args: accelerator-specific arguments. For example,
            `{'tpu_vm': True, 'runtime_version': 'tpu-vm-base'}` for TPUs.
        use_spot: whether to use spot instances. If None, defaults to
            False.
        spot_recovery: the spot recovery strategy to use for the managed
            spot to recover the cluster from preemption. Refer to
            `recovery_strategy module <https://github.com/skypilot-org/skypilot/blob/master/sky/spot/recovery_strategy.py>`__ # pylint: disable=line-too-long
            for more details.
        region: the region to use.
        zone: the zone to use.
        image_id: the image ID to use. If a str, must be a string
            of the image id from the cloud, such as AWS:
            ``'ami-1234567890abcdef0'``, GCP:
            ``'projects/my-project-id/global/images/my-image-name'``;
            Or, a image tag provided by SkyPilot, such as AWS:
            ``'skypilot:gpu-ubuntu-2004'``. If a dict, must be a dict mapping
            from region to image ID, such as:

            .. code-block:: python

                {
                'us-west1': 'ami-1234567890abcdef0',
                'us-east1': 'ami-1234567890abcdef0'
                }

        disk_size: the size of the OS disk in GiB.
        disk_tier: the disk performance tier to use. If None, defaults to
            ``'medium'``.

        cluster_name: name of the cluster to create/reuse.  If None,
            auto-generate a name.
        retry_until_up: whether to retry launching the cluster until it is
            up.
        idle_minutes_to_autostop: automatically stop the cluster after this
            many minute of idleness, i.e., no running or pending jobs in the
            cluster's job queue. Idleness gets reset whenever setting-up/
            running/pending jobs are found in the job queue. Setting this
            flag is equivalent to running
            ``sky.launch(..., detach_run=True, ...)`` and then
            ``sky.autostop(idle_minutes=<minutes>)``. If not set, the cluster
            will not be autostopped.
        down: Tear down the cluster after all jobs finish (successfully or
            abnormally). If --idle-minutes-to-autostop is also set, the
            cluster will be torn down after the specified idle time.
            Note that if errors occur during provisioning/data syncing/setting
            up, the cluster will not be torn down for debugging purposes.
        stream_logs: if True, show the logs in the terminal.
    """

    # Resources
    instance_type: Optional[str] = None
    cpus: Union[None, int, float, str] = None
    memory: Union[None, int, float, str] = None
    accelerators: Union[None, str, Dict[str, int]] = None
    accelerator_args: Optional[Dict[str, str]] = None
    use_spot: Optional[bool] = None
    spot_recovery: Optional[str] = None
    region: Optional[str] = None
    zone: Optional[str] = None
    image_id: Union[Dict[str, str], str, None] = None
    disk_size: Optional[int] = None
    disk_tier: Optional[Literal["high", "medium", "low"]] = None

    # Run settings
    cluster_name: Optional[str] = None
    retry_until_up: bool = False
    idle_minutes_to_autostop: Optional[int] = 30
    down: bool = True
    stream_logs: bool = True

skypilot_orchestrator_gcp_vm_flavor

Skypilot orchestrator GCP flavor.

SkypilotGCPOrchestratorConfig (SkypilotBaseOrchestratorConfig, GoogleCredentialsConfigMixin, SkypilotGCPOrchestratorSettings) pydantic-model

Skypilot orchestrator config for GCP.

Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_gcp_vm_flavor.py
class SkypilotGCPOrchestratorConfig(  # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
    SkypilotBaseOrchestratorConfig,
    GoogleCredentialsConfigMixin,
    SkypilotGCPOrchestratorSettings,
):
    """Skypilot orchestrator config for GCP."""
SkypilotGCPOrchestratorFlavor (BaseOrchestratorFlavor)

Flavor for the Skypilot orchestrator for GCP.

Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_gcp_vm_flavor.py
class SkypilotGCPOrchestratorFlavor(BaseOrchestratorFlavor):
    """Flavor for the Skypilot orchestrator for GCP."""

    @property
    def name(self) -> str:
        """Name of the orchestrator flavor.

        Returns:
            Name of the orchestrator flavor.
        """
        return SKYPILOT_GCP_ORCHESTRATOR_FLAVOR

    @property
    def service_connector_requirements(
        self,
    ) -> Optional[ServiceConnectorRequirements]:
        """Service connector resource requirements for service connectors.

        Specifies resource requirements that are used to filter the available
        service connector types that are compatible with this flavor.

        Returns:
            Requirements for compatible service connectors, if a service
            connector is required for this flavor.
        """
        return ServiceConnectorRequirements(
            resource_type="gcp-generic",
        )

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/gcp-skypilot.png"

    @property
    def config_class(self) -> Type[BaseOrchestratorConfig]:
        """Config class for the base orchestrator flavor.

        Returns:
            The config class.
        """
        return SkypilotGCPOrchestratorConfig

    @property
    def implementation_class(self) -> Type["SkypilotGCPOrchestrator"]:
        """Implementation class for this flavor.

        Returns:
            Implementation class for this flavor.
        """
        from zenml.integrations.skypilot.orchestrators import (
            SkypilotGCPOrchestrator,
        )

        return SkypilotGCPOrchestrator
config_class: Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig] property readonly

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[SkypilotGCPOrchestrator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[SkypilotGCPOrchestrator]

Implementation class for this flavor.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the orchestrator flavor.

Returns:

Type Description
str

Name of the orchestrator flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements] property readonly

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service connector is required for this flavor.

SkypilotGCPOrchestratorSettings (SkypilotBaseOrchestratorSettings) pydantic-model

Skypilot orchestrator settings for GCP.

Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_gcp_vm_flavor.py
class SkypilotGCPOrchestratorSettings(SkypilotBaseOrchestratorSettings):
    """Skypilot orchestrator settings for GCP."""

orchestrators special

Initialization of the Skypilot ZenML orchestrators.

skypilot_aws_vm_orchestrator

Implementation of the a Skypilot based AWS VM orchestrator.

SkypilotAWSOrchestrator (SkypilotBaseOrchestrator)

Orchestrator responsible for running pipelines remotely in a VM on AWS.

This orchestrator does not support running on a schedule.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_aws_vm_orchestrator.py
class SkypilotAWSOrchestrator(SkypilotBaseOrchestrator):
    """Orchestrator responsible for running pipelines remotely in a VM on AWS.

    This orchestrator does not support running on a schedule.
    """

    DEFAULT_INSTANCE_TYPE: str = "t3.xlarge"

    @property
    def cloud(self) -> sky.clouds.Cloud:
        """The type of sky cloud to use.

        Returns:
            A `sky.clouds.Cloud` instance.
        """
        return sky.clouds.AWS()

    @property
    def config(self) -> SkypilotAWSOrchestratorConfig:
        """Returns the `SkypilotAWSOrchestratorConfig` config.

        Returns:
            The configuration.
        """
        return cast(SkypilotAWSOrchestratorConfig, self._config)

    @property
    def settings_class(self) -> Optional[Type["BaseSettings"]]:
        """Settings class for the Skypilot orchestrator.

        Returns:
            The settings class.
        """
        return SkypilotAWSOrchestratorSettings

    def prepare_environment_variable(self, set: bool = True) -> None:
        """Set up Environment variables that are required for the orchestrator.

        Args:
            set: Whether to set the environment variables or not.

        Raises:
            ValueError: If no service connector is found.
        """
        connector = self.get_connector()
        if connector is None:
            raise ValueError(
                "No service connector found. Please make sure to set up a connector "
                "that is compatible with this orchestrator."
            )
        if set:
            # The AWS connector creates a local configuration profile with the name computed from
            # the first 8 digits of its UUID.
            aws_profile = f"zenml-{str(connector.id)[:8]}"
            os.environ[ENV_AWS_PROFILE] = aws_profile
        else:
            os.environ.pop(ENV_AWS_PROFILE, None)
cloud: Cloud property readonly

The type of sky cloud to use.

Returns:

Type Description
Cloud

A sky.clouds.Cloud instance.

config: SkypilotAWSOrchestratorConfig property readonly

Returns the SkypilotAWSOrchestratorConfig config.

Returns:

Type Description
SkypilotAWSOrchestratorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property readonly

Settings class for the Skypilot orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

prepare_environment_variable(self, set=True)

Set up Environment variables that are required for the orchestrator.

Parameters:

Name Type Description Default
set bool

Whether to set the environment variables or not.

True

Exceptions:

Type Description
ValueError

If no service connector is found.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_aws_vm_orchestrator.py
def prepare_environment_variable(self, set: bool = True) -> None:
    """Set up Environment variables that are required for the orchestrator.

    Args:
        set: Whether to set the environment variables or not.

    Raises:
        ValueError: If no service connector is found.
    """
    connector = self.get_connector()
    if connector is None:
        raise ValueError(
            "No service connector found. Please make sure to set up a connector "
            "that is compatible with this orchestrator."
        )
    if set:
        # The AWS connector creates a local configuration profile with the name computed from
        # the first 8 digits of its UUID.
        aws_profile = f"zenml-{str(connector.id)[:8]}"
        os.environ[ENV_AWS_PROFILE] = aws_profile
    else:
        os.environ.pop(ENV_AWS_PROFILE, None)

skypilot_azure_vm_orchestrator

Implementation of the a Skypilot based Azure VM orchestrator.

SkypilotAzureOrchestrator (SkypilotBaseOrchestrator)

Orchestrator responsible for running pipelines remotely in a VM on Azure.

This orchestrator does not support running on a schedule.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_azure_vm_orchestrator.py
class SkypilotAzureOrchestrator(SkypilotBaseOrchestrator):
    """Orchestrator responsible for running pipelines remotely in a VM on Azure.

    This orchestrator does not support running on a schedule.
    """

    DEFAULT_INSTANCE_TYPE: str = "Standard_B1ms"

    @property
    def cloud(self) -> sky.clouds.Cloud:
        """The type of sky cloud to use.

        Returns:
            A `sky.clouds.Cloud` instance.
        """
        return sky.clouds.Azure()

    @property
    def config(self) -> SkypilotAzureOrchestratorConfig:
        """Returns the `SkypilotAzureOrchestratorConfig` config.

        Returns:
            The configuration.
        """
        return cast(SkypilotAzureOrchestratorConfig, self._config)

    @property
    def settings_class(self) -> Optional[Type["BaseSettings"]]:
        """Settings class for the Skypilot orchestrator.

        Returns:
            The settings class.
        """
        return SkypilotAzureOrchestratorSettings

    def prepare_environment_variable(self, set: bool = True) -> None:
        """Set up Environment variables that are required for the orchestrator.

        Args:
            set: Whether to set the environment variables or not.
        """
        pass
cloud: Cloud property readonly

The type of sky cloud to use.

Returns:

Type Description
Cloud

A sky.clouds.Cloud instance.

config: SkypilotAzureOrchestratorConfig property readonly

Returns the SkypilotAzureOrchestratorConfig config.

Returns:

Type Description
SkypilotAzureOrchestratorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property readonly

Settings class for the Skypilot orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

prepare_environment_variable(self, set=True)

Set up Environment variables that are required for the orchestrator.

Parameters:

Name Type Description Default
set bool

Whether to set the environment variables or not.

True
Source code in zenml/integrations/skypilot/orchestrators/skypilot_azure_vm_orchestrator.py
def prepare_environment_variable(self, set: bool = True) -> None:
    """Set up Environment variables that are required for the orchestrator.

    Args:
        set: Whether to set the environment variables or not.
    """
    pass

skypilot_base_vm_orchestrator

Implementation of the Skypilot base VM orchestrator.

SkypilotBaseOrchestrator (ContainerizedOrchestrator)

Base class for Orchestrator responsible for running pipelines remotely in a VM.

This orchestrator does not support running on a schedule.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
class SkypilotBaseOrchestrator(ContainerizedOrchestrator):
    """Base class for Orchestrator responsible for running pipelines remotely in a VM.

    This orchestrator does not support running on a schedule.
    """

    # The default instance type to use if none is specified in settings
    DEFAULT_INSTANCE_TYPE: Optional[str] = None

    @property
    def validator(self) -> Optional[StackValidator]:
        """Validates the stack.

        In the remote case, checks that the stack contains a container registry,
        image builder and only remote components.

        Returns:
            A `StackValidator` instance.
        """

        def _validate_remote_components(
            stack: "Stack",
        ) -> Tuple[bool, str]:
            for component in stack.components.values():
                if not component.config.is_local:
                    continue

                return False, (
                    f"The Skypilot orchestrator runs pipelines remotely, "
                    f"but the '{component.name}' {component.type.value} is "
                    "a local stack component and will not be available in "
                    "the Skypilot step.\nPlease ensure that you always "
                    "use non-local stack components with the Skypilot "
                    "orchestrator."
                )

            return True, ""

        return StackValidator(
            required_components={
                StackComponentType.CONTAINER_REGISTRY,
                StackComponentType.IMAGE_BUILDER,
            },
            custom_validation_function=_validate_remote_components,
        )

    def get_orchestrator_run_id(self) -> str:
        """Returns the active orchestrator run id.

        Raises:
            RuntimeError: If the environment variable specifying the run id
                is not set.

        Returns:
            The orchestrator run id.
        """
        try:
            return os.environ[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID]
        except KeyError:
            raise RuntimeError(
                "Unable to read run id from environment variable "
                f"{ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID}."
            )

    @property
    @abstractmethod
    def cloud(self) -> sky.clouds.Cloud:
        """The type of sky cloud to use.

        Returns:
            A `sky.clouds.Cloud` instance.
        """

    def setup_credentials(self) -> None:
        """Set up credentials for the orchestrator."""
        connector = self.get_connector()
        assert connector is not None
        connector.configure_local_client()

    @abstractmethod
    def prepare_environment_variable(self, set: bool = True) -> None:
        """Set up Environment variables that are required for the orchestrator.

        Args:
            set: Whether to set the environment variables or not.
        """

    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeploymentResponse",
        stack: "Stack",
        environment: Dict[str, str],
    ) -> Any:
        """Runs all pipeline steps in Skypilot containers.

        Args:
            deployment: The pipeline deployment to prepare or run.
            stack: The stack the pipeline will run on.
            environment: Environment variables to set in the orchestration
                environment.

        Raises:
            Exception: If the pipeline run fails.
        """
        if deployment.schedule:
            logger.warning(
                "Skypilot Orchestrator currently does not support the"
                "use of schedules. The `schedule` will be ignored "
                "and the pipeline will be run immediately."
            )

        # Set up some variables for configuration
        orchestrator_run_id = str(uuid4())
        environment[
            ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID
        ] = orchestrator_run_id

        settings = cast(
            SkypilotBaseOrchestratorSettings,
            self.get_settings(deployment),
        )

        entrypoint = PipelineEntrypointConfiguration.get_entrypoint_command()
        entrypoint_str = " ".join(entrypoint)
        arguments = PipelineEntrypointConfiguration.get_entrypoint_arguments(
            deployment_id=deployment.id
        )
        arguments_str = " ".join(arguments)

        # Set up docker run command
        image = self.get_image(deployment=deployment)
        docker_environment_str = " ".join(
            f"-e {k}={v}" for k, v in environment.items()
        )

        start_time = time.time()

        instance_type = settings.instance_type or self.DEFAULT_INSTANCE_TYPE

        # Set up credentials
        self.setup_credentials()

        # Guaranteed by stack validation
        assert stack is not None and stack.container_registry is not None

        docker_creds = stack.container_registry.credentials
        if docker_creds:
            docker_username, docker_password = docker_creds
            setup = (
                f"docker login --username $DOCKER_USERNAME --password "
                f"$DOCKER_PASSWORD {stack.container_registry.config.uri}"
            )
            task_envs = {
                "DOCKER_USERNAME": docker_username,
                "DOCKER_PASSWORD": docker_password,
            }
        else:
            setup = None
            task_envs = None

        # Run the entire pipeline

        # Set the service connector AWS profile ENV variable
        self.prepare_environment_variable(set=True)

        try:
            task = sky.Task(
                run=f"docker run --rm {docker_environment_str} {image} {entrypoint_str} {arguments_str}",
                setup=setup,
                envs=task_envs,
            )
            task = task.set_resources(
                sky.Resources(
                    cloud=self.cloud,
                    instance_type=instance_type,
                    cpus=settings.cpus,
                    memory=settings.memory,
                    accelerators=settings.accelerators,
                    accelerator_args=settings.accelerator_args,
                    use_spot=settings.use_spot,
                    spot_recovery=settings.spot_recovery,
                    region=settings.region,
                    zone=settings.zone,
                    image_id=settings.image_id,
                    disk_size=settings.disk_size,
                    disk_tier=settings.disk_tier,
                )
            )

            cluster_name = settings.cluster_name
            if cluster_name is None:
                # Find existing cluster
                for i in sky.status(refresh=True):
                    if isinstance(
                        i["handle"].launched_resources.cloud, type(self.cloud)
                    ):
                        cluster_name = i["handle"].cluster_name
                        logger.info(
                            f"Found existing cluster {cluster_name}. Reusing..."
                        )

            # Launch the cluster
            sky.launch(
                task,
                cluster_name,
                retry_until_up=settings.retry_until_up,
                idle_minutes_to_autostop=settings.idle_minutes_to_autostop,
                down=settings.down,
                stream_logs=settings.stream_logs,
            )

        except Exception as e:
            raise e

        finally:
            # Unset the service connector AWS profile ENV variable
            self.prepare_environment_variable(set=False)

        run_duration = time.time() - start_time
        run_id = orchestrator_utils.get_run_id_for_orchestrator_run_id(
            orchestrator=self, orchestrator_run_id=orchestrator_run_id
        )
        run_model = Client().zen_store.get_run(run_id)
        logger.info(
            "Pipeline run `%s` has finished in `%s`.\n",
            run_model.name,
            string_utils.get_human_readable_time(run_duration),
        )
cloud: Cloud property readonly

The type of sky cloud to use.

Returns:

Type Description
Cloud

A sky.clouds.Cloud instance.

validator: Optional[zenml.stack.stack_validator.StackValidator] property readonly

Validates the stack.

In the remote case, checks that the stack contains a container registry, image builder and only remote components.

Returns:

Type Description
Optional[zenml.stack.stack_validator.StackValidator]

A StackValidator instance.

get_orchestrator_run_id(self)

Returns the active orchestrator run id.

Exceptions:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    try:
        return os.environ[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID}."
        )
prepare_environment_variable(self, set=True)

Set up Environment variables that are required for the orchestrator.

Parameters:

Name Type Description Default
set bool

Whether to set the environment variables or not.

True
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
@abstractmethod
def prepare_environment_variable(self, set: bool = True) -> None:
    """Set up Environment variables that are required for the orchestrator.

    Args:
        set: Whether to set the environment variables or not.
    """
prepare_or_run_pipeline(self, deployment, stack, environment)

Runs all pipeline steps in Skypilot containers.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required

Exceptions:

Type Description
Exception

If the pipeline run fails.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
) -> Any:
    """Runs all pipeline steps in Skypilot containers.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment.

    Raises:
        Exception: If the pipeline run fails.
    """
    if deployment.schedule:
        logger.warning(
            "Skypilot Orchestrator currently does not support the"
            "use of schedules. The `schedule` will be ignored "
            "and the pipeline will be run immediately."
        )

    # Set up some variables for configuration
    orchestrator_run_id = str(uuid4())
    environment[
        ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID
    ] = orchestrator_run_id

    settings = cast(
        SkypilotBaseOrchestratorSettings,
        self.get_settings(deployment),
    )

    entrypoint = PipelineEntrypointConfiguration.get_entrypoint_command()
    entrypoint_str = " ".join(entrypoint)
    arguments = PipelineEntrypointConfiguration.get_entrypoint_arguments(
        deployment_id=deployment.id
    )
    arguments_str = " ".join(arguments)

    # Set up docker run command
    image = self.get_image(deployment=deployment)
    docker_environment_str = " ".join(
        f"-e {k}={v}" for k, v in environment.items()
    )

    start_time = time.time()

    instance_type = settings.instance_type or self.DEFAULT_INSTANCE_TYPE

    # Set up credentials
    self.setup_credentials()

    # Guaranteed by stack validation
    assert stack is not None and stack.container_registry is not None

    docker_creds = stack.container_registry.credentials
    if docker_creds:
        docker_username, docker_password = docker_creds
        setup = (
            f"docker login --username $DOCKER_USERNAME --password "
            f"$DOCKER_PASSWORD {stack.container_registry.config.uri}"
        )
        task_envs = {
            "DOCKER_USERNAME": docker_username,
            "DOCKER_PASSWORD": docker_password,
        }
    else:
        setup = None
        task_envs = None

    # Run the entire pipeline

    # Set the service connector AWS profile ENV variable
    self.prepare_environment_variable(set=True)

    try:
        task = sky.Task(
            run=f"docker run --rm {docker_environment_str} {image} {entrypoint_str} {arguments_str}",
            setup=setup,
            envs=task_envs,
        )
        task = task.set_resources(
            sky.Resources(
                cloud=self.cloud,
                instance_type=instance_type,
                cpus=settings.cpus,
                memory=settings.memory,
                accelerators=settings.accelerators,
                accelerator_args=settings.accelerator_args,
                use_spot=settings.use_spot,
                spot_recovery=settings.spot_recovery,
                region=settings.region,
                zone=settings.zone,
                image_id=settings.image_id,
                disk_size=settings.disk_size,
                disk_tier=settings.disk_tier,
            )
        )

        cluster_name = settings.cluster_name
        if cluster_name is None:
            # Find existing cluster
            for i in sky.status(refresh=True):
                if isinstance(
                    i["handle"].launched_resources.cloud, type(self.cloud)
                ):
                    cluster_name = i["handle"].cluster_name
                    logger.info(
                        f"Found existing cluster {cluster_name}. Reusing..."
                    )

        # Launch the cluster
        sky.launch(
            task,
            cluster_name,
            retry_until_up=settings.retry_until_up,
            idle_minutes_to_autostop=settings.idle_minutes_to_autostop,
            down=settings.down,
            stream_logs=settings.stream_logs,
        )

    except Exception as e:
        raise e

    finally:
        # Unset the service connector AWS profile ENV variable
        self.prepare_environment_variable(set=False)

    run_duration = time.time() - start_time
    run_id = orchestrator_utils.get_run_id_for_orchestrator_run_id(
        orchestrator=self, orchestrator_run_id=orchestrator_run_id
    )
    run_model = Client().zen_store.get_run(run_id)
    logger.info(
        "Pipeline run `%s` has finished in `%s`.\n",
        run_model.name,
        string_utils.get_human_readable_time(run_duration),
    )
setup_credentials(self)

Set up credentials for the orchestrator.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
def setup_credentials(self) -> None:
    """Set up credentials for the orchestrator."""
    connector = self.get_connector()
    assert connector is not None
    connector.configure_local_client()

skypilot_gcp_vm_orchestrator

Implementation of the a Skypilot-based GCP VM orchestrator.

SkypilotGCPOrchestrator (SkypilotBaseOrchestrator, GoogleCredentialsMixin)

Orchestrator responsible for running pipelines remotely in a VM on GCP.

This orchestrator does not support running on a schedule.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_gcp_vm_orchestrator.py
class SkypilotGCPOrchestrator(
    SkypilotBaseOrchestrator, GoogleCredentialsMixin
):
    """Orchestrator responsible for running pipelines remotely in a VM on GCP.

    This orchestrator does not support running on a schedule.
    """

    DEFAULT_INSTANCE_TYPE: str = "n1-standard-4"

    @property
    def cloud(self) -> sky.clouds.Cloud:
        """The type of sky cloud to use.

        Returns:
            A `sky.clouds.Cloud` instance.
        """
        return sky.clouds.GCP()

    @property
    def config(self) -> SkypilotGCPOrchestratorConfig:
        """Returns the `SkypilotGCPOrchestratorConfig` config.

        Returns:
            The configuration.
        """
        return cast(SkypilotGCPOrchestratorConfig, self._config)

    @property
    def settings_class(self) -> Optional[Type["BaseSettings"]]:
        """Settings class for the Skypilot orchestrator.

        Returns:
            The settings class.
        """
        return SkypilotGCPOrchestratorSettings

    def prepare_environment_variable(self, set: bool = True) -> None:
        """Set up Environment variables that are required for the orchestrator.

        Args:
            set: Whether to set the environment variables or not.
        """
        pass
cloud: Cloud property readonly

The type of sky cloud to use.

Returns:

Type Description
Cloud

A sky.clouds.Cloud instance.

config: SkypilotGCPOrchestratorConfig property readonly

Returns the SkypilotGCPOrchestratorConfig config.

Returns:

Type Description
SkypilotGCPOrchestratorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property readonly

Settings class for the Skypilot orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

prepare_environment_variable(self, set=True)

Set up Environment variables that are required for the orchestrator.

Parameters:

Name Type Description Default
set bool

Whether to set the environment variables or not.

True
Source code in zenml/integrations/skypilot/orchestrators/skypilot_gcp_vm_orchestrator.py
def prepare_environment_variable(self, set: bool = True) -> None:
    """Set up Environment variables that are required for the orchestrator.

    Args:
        set: Whether to set the environment variables or not.
    """
    pass