Skip to content

Skypilot

zenml.integrations.skypilot special

Initialization of the Skypilot integration for ZenML.

The Skypilot integration sub-module powers an alternative to the local orchestrator for a remote orchestration of ZenML pipelines on VMs.

SkypilotAWSIntegration (Integration)

Definition of Skypilot AWS Integration for ZenML.

Source code in zenml/integrations/skypilot/__init__.py
class SkypilotAWSIntegration(Integration):
    """Definition of Skypilot AWS Integration for ZenML."""

    NAME = SKYPILOT_AWS
    REQUIREMENTS = ["skypilot[aws]"]

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Skypilot AWS integration.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.skypilot.flavors import (
            SkypilotAWSOrchestratorFlavor,
        )

        return [SkypilotAWSOrchestratorFlavor]

flavors() classmethod

Declare the stack component flavors for the Skypilot AWS integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/skypilot/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Skypilot AWS integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.skypilot.flavors import (
        SkypilotAWSOrchestratorFlavor,
    )

    return [SkypilotAWSOrchestratorFlavor]

SkypilotAzureIntegration (Integration)

Definition of Skypilot Integration for ZenML.

Source code in zenml/integrations/skypilot/__init__.py
class SkypilotAzureIntegration(Integration):
    """Definition of Skypilot Integration for ZenML."""

    NAME = SKYPILOT_AZURE
    REQUIREMENTS = ["skypilot[azure]"]

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Skypilot Azure integration.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.skypilot.flavors import (
            SkypilotAzureOrchestratorFlavor,
        )

        return [SkypilotAzureOrchestratorFlavor]

flavors() classmethod

Declare the stack component flavors for the Skypilot Azure integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/skypilot/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Skypilot Azure integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.skypilot.flavors import (
        SkypilotAzureOrchestratorFlavor,
    )

    return [SkypilotAzureOrchestratorFlavor]

SkypilotGCPIntegration (Integration)

Definition of Skypilot Integration for ZenML.

Source code in zenml/integrations/skypilot/__init__.py
class SkypilotGCPIntegration(Integration):
    """Definition of Skypilot Integration for ZenML."""

    NAME = SKYPILOT_GCP
    REQUIREMENTS = ["skypilot[gcp]"]

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Skypilot GCP integration.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.skypilot.flavors import (
            SkypilotGCPOrchestratorFlavor,
        )

        return [SkypilotGCPOrchestratorFlavor]

flavors() classmethod

Declare the stack component flavors for the Skypilot GCP integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/skypilot/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Skypilot GCP integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.skypilot.flavors import (
        SkypilotGCPOrchestratorFlavor,
    )

    return [SkypilotGCPOrchestratorFlavor]

flavors special

Skypilot integration flavors.

skypilot_orchestrator_aws_vm_flavor

Skypilot orchestrator AWS flavor.

SkypilotAWSOrchestratorConfig (SkypilotBaseOrchestratorConfig, SkypilotAWSOrchestratorSettings) pydantic-model

Skypilot orchestrator config.

Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_aws_vm_flavor.py
class SkypilotAWSOrchestratorConfig(  # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
    SkypilotBaseOrchestratorConfig, SkypilotAWSOrchestratorSettings
):
    """Skypilot orchestrator config."""
SkypilotAWSOrchestratorFlavor (BaseOrchestratorFlavor)

Flavor for the Skypilot AWS orchestrator.

Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_aws_vm_flavor.py
class SkypilotAWSOrchestratorFlavor(BaseOrchestratorFlavor):
    """Flavor for the Skypilot AWS orchestrator."""

    @property
    def name(self) -> str:
        """Name of the orchestrator flavor.

        Returns:
            Name of the orchestrator flavor.
        """
        return SKYPILOT_AWS_ORCHESTRATOR_FLAVOR

    @property
    def service_connector_requirements(
        self,
    ) -> Optional[ServiceConnectorRequirements]:
        """Service connector resource requirements for service connectors.

        Specifies resource requirements that are used to filter the available
        service connector types that are compatible with this flavor.

        Returns:
            Requirements for compatible service connectors, if a service
            connector is required for this flavor.
        """
        return ServiceConnectorRequirements(
            resource_type="aws-generic",
        )

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/aws-skypilot.png"

    @property
    def config_class(self) -> Type[BaseOrchestratorConfig]:
        """Config class for the base orchestrator flavor.

        Returns:
            The config class.
        """
        return SkypilotAWSOrchestratorConfig

    @property
    def implementation_class(self) -> Type["SkypilotAWSOrchestrator"]:
        """Implementation class for this flavor.

        Returns:
            Implementation class for this flavor.
        """
        from zenml.integrations.skypilot.orchestrators import (
            SkypilotAWSOrchestrator,
        )

        return SkypilotAWSOrchestrator
config_class: Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig] property readonly

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[SkypilotAWSOrchestrator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[SkypilotAWSOrchestrator]

Implementation class for this flavor.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the orchestrator flavor.

Returns:

Type Description
str

Name of the orchestrator flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[zenml.models.service_connector_models.ServiceConnectorRequirements] property readonly

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[zenml.models.service_connector_models.ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service connector is required for this flavor.

SkypilotAWSOrchestratorSettings (SkypilotBaseOrchestratorSettings) pydantic-model

Skypilot orchestrator settings.

Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_aws_vm_flavor.py
class SkypilotAWSOrchestratorSettings(SkypilotBaseOrchestratorSettings):
    """Skypilot orchestrator settings."""

skypilot_orchestrator_azure_vm_flavor

Skypilot orchestrator Azure flavor.

SkypilotAzureOrchestratorConfig (SkypilotBaseOrchestratorConfig, SkypilotAzureOrchestratorSettings) pydantic-model

Skypilot orchestrator config for Azure.

Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_azure_vm_flavor.py
class SkypilotAzureOrchestratorConfig(  # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
    SkypilotBaseOrchestratorConfig, SkypilotAzureOrchestratorSettings
):
    """Skypilot orchestrator config for Azure."""
SkypilotAzureOrchestratorFlavor (BaseOrchestratorFlavor)

Flavor for the Skypilot orchestrator for Azure.

Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_azure_vm_flavor.py
class SkypilotAzureOrchestratorFlavor(BaseOrchestratorFlavor):
    """Flavor for the Skypilot orchestrator for Azure."""

    @property
    def name(self) -> str:
        """Name of the orchestrator flavor.

        Returns:
            Name of the orchestrator flavor.
        """
        return SKYPILOT_AZURE_ORCHESTRATOR_FLAVOR

    @property
    def service_connector_requirements(
        self,
    ) -> Optional[ServiceConnectorRequirements]:
        """Service connector resource requirements for service connectors.

        Specifies resource requirements that are used to filter the available
        service connector types that are compatible with this flavor.

        Returns:
            Requirements for compatible service connectors, if a service
            connector is required for this flavor.
        """
        return ServiceConnectorRequirements(
            resource_type="azure-generic",
        )

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/azure-skypilot.png"

    @property
    def config_class(self) -> Type[BaseOrchestratorConfig]:
        """Config class for the base orchestrator flavor.

        Returns:
            The config class.
        """
        return SkypilotAzureOrchestratorConfig

    @property
    def implementation_class(self) -> Type["SkypilotAzureOrchestrator"]:
        """Implementation class for this flavor.

        Returns:
            Implementation class for this flavor.
        """
        from zenml.integrations.skypilot.orchestrators import (
            SkypilotAzureOrchestrator,
        )

        return SkypilotAzureOrchestrator
config_class: Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig] property readonly

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[SkypilotAzureOrchestrator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[SkypilotAzureOrchestrator]

Implementation class for this flavor.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the orchestrator flavor.

Returns:

Type Description
str

Name of the orchestrator flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[zenml.models.service_connector_models.ServiceConnectorRequirements] property readonly

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[zenml.models.service_connector_models.ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service connector is required for this flavor.

SkypilotAzureOrchestratorSettings (SkypilotBaseOrchestratorSettings) pydantic-model

Skypilot orchestrator settings for Azure.

Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_azure_vm_flavor.py
class SkypilotAzureOrchestratorSettings(SkypilotBaseOrchestratorSettings):
    """Skypilot orchestrator settings for Azure."""

skypilot_orchestrator_base_vm_config

Skypilot orchestrator base config and settings.

SkypilotBaseOrchestratorConfig (BaseOrchestratorConfig, SkypilotBaseOrchestratorSettings) pydantic-model

Skypilot orchestrator base config.

Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py
class SkypilotBaseOrchestratorConfig(  # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
    BaseOrchestratorConfig, SkypilotBaseOrchestratorSettings
):
    """Skypilot orchestrator base config."""

    @property
    def is_local(self) -> bool:
        """Checks if this stack component is running locally.

        This designation is used to determine if the stack component can be
        shared with other users or if it is only usable on the local host.

        Returns:
            True if this config is for a local component, False otherwise.
        """
        return False
is_local: bool property readonly

Checks if this stack component is running locally.

This designation is used to determine if the stack component can be shared with other users or if it is only usable on the local host.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

SkypilotBaseOrchestratorSettings (BaseSettings) pydantic-model

Skypilot orchestrator base settings.

Attributes:

Name Type Description
instance_type Optional[str]

the instance type to use.

cpus Union[NoneType, int, float, str]

the number of CPUs required for the task. If a str, must be a string of the form '2' or '2+', where the + indicates that the task requires at least 2 CPUs.

memory Union[NoneType, int, float, str]

the amount of memory in GiB required. If a str, must be a string of the form '16' or '16+', where the + indicates that the task requires at least 16 GB of memory.

accelerators Union[NoneType, str, Dict[str, int]]

the accelerators required. If a str, must be a string of the form 'V100' or 'V100:2', where the :2 indicates that the task requires 2 V100 GPUs. If a dict, must be a dict of the form {'V100': 2} or {'tpu-v2-8': 1}.

accelerator_args Optional[Dict[str, str]]

accelerator-specific arguments. For example, {'tpu_vm': True, 'runtime_version': 'tpu-vm-base'} for TPUs.

use_spot Optional[bool]

whether to use spot instances. If None, defaults to False.

spot_recovery Optional[str]

the spot recovery strategy to use for the managed spot to recover the cluster from preemption. Refer to recovery_strategy module <https://github.com/skypilot-org/skypilot/blob/master/sky/spot/recovery_strategy.py>__ # pylint: disable=line-too-long for more details.

region Optional[str]

the region to use.

zone Optional[str]

the zone to use.

image_id Union[Dict[str, str], str]

the image ID to use. If a str, must be a string of the image id from the cloud, such as AWS: 'ami-1234567890abcdef0', GCP: 'projects/my-project-id/global/images/my-image-name'; Or, a image tag provided by SkyPilot, such as AWS: 'skypilot:gpu-ubuntu-2004'. If a dict, must be a dict mapping from region to image ID, such as:

.. code-block:: python

{
'us-west1': 'ami-1234567890abcdef0',
'us-east1': 'ami-1234567890abcdef0'
}
disk_size Optional[int]

the size of the OS disk in GiB.

disk_tier Optional[Literal['high', 'medium', 'low']]

the disk performance tier to use. If None, defaults to 'medium'.

cluster_name Optional[str]

name of the cluster to create/reuse. If None, auto-generate a name.

retry_until_up bool

whether to retry launching the cluster until it is up.

idle_minutes_to_autostop Optional[int]

automatically stop the cluster after this many minute of idleness, i.e., no running or pending jobs in the cluster's job queue. Idleness gets reset whenever setting-up/ running/pending jobs are found in the job queue. Setting this flag is equivalent to running sky.launch(..., detach_run=True, ...) and then sky.autostop(idle_minutes=<minutes>). If not set, the cluster will not be autostopped.

down bool

Tear down the cluster after all jobs finish (successfully or abnormally). If --idle-minutes-to-autostop is also set, the cluster will be torn down after the specified idle time. Note that if errors occur during provisioning/data syncing/setting up, the cluster will not be torn down for debugging purposes.

stream_logs bool

if True, show the logs in the terminal.

Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py
class SkypilotBaseOrchestratorSettings(BaseSettings):
    """Skypilot orchestrator base settings.

    Attributes:
        instance_type: the instance type to use.
        cpus: the number of CPUs required for the task.
            If a str, must be a string of the form `'2'` or `'2+'`, where
            the `+` indicates that the task requires at least 2 CPUs.
        memory: the amount of memory in GiB required. If a
            str, must be a string of the form `'16'` or `'16+'`, where
            the `+` indicates that the task requires at least 16 GB of memory.
        accelerators: the accelerators required. If a str, must be
            a string of the form `'V100'` or `'V100:2'`, where the `:2`
            indicates that the task requires 2 V100 GPUs. If a dict, must be a
            dict of the form `{'V100': 2}` or `{'tpu-v2-8': 1}`.
        accelerator_args: accelerator-specific arguments. For example,
            `{'tpu_vm': True, 'runtime_version': 'tpu-vm-base'}` for TPUs.
        use_spot: whether to use spot instances. If None, defaults to
            False.
        spot_recovery: the spot recovery strategy to use for the managed
            spot to recover the cluster from preemption. Refer to
            `recovery_strategy module <https://github.com/skypilot-org/skypilot/blob/master/sky/spot/recovery_strategy.py>`__ # pylint: disable=line-too-long
            for more details.
        region: the region to use.
        zone: the zone to use.
        image_id: the image ID to use. If a str, must be a string
            of the image id from the cloud, such as AWS:
            ``'ami-1234567890abcdef0'``, GCP:
            ``'projects/my-project-id/global/images/my-image-name'``;
            Or, a image tag provided by SkyPilot, such as AWS:
            ``'skypilot:gpu-ubuntu-2004'``. If a dict, must be a dict mapping
            from region to image ID, such as:

            .. code-block:: python

                {
                'us-west1': 'ami-1234567890abcdef0',
                'us-east1': 'ami-1234567890abcdef0'
                }

        disk_size: the size of the OS disk in GiB.
        disk_tier: the disk performance tier to use. If None, defaults to
            ``'medium'``.

        cluster_name: name of the cluster to create/reuse.  If None,
            auto-generate a name.
        retry_until_up: whether to retry launching the cluster until it is
            up.
        idle_minutes_to_autostop: automatically stop the cluster after this
            many minute of idleness, i.e., no running or pending jobs in the
            cluster's job queue. Idleness gets reset whenever setting-up/
            running/pending jobs are found in the job queue. Setting this
            flag is equivalent to running
            ``sky.launch(..., detach_run=True, ...)`` and then
            ``sky.autostop(idle_minutes=<minutes>)``. If not set, the cluster
            will not be autostopped.
        down: Tear down the cluster after all jobs finish (successfully or
            abnormally). If --idle-minutes-to-autostop is also set, the
            cluster will be torn down after the specified idle time.
            Note that if errors occur during provisioning/data syncing/setting
            up, the cluster will not be torn down for debugging purposes.
        stream_logs: if True, show the logs in the terminal.
    """

    # Resources
    instance_type: Optional[str] = None
    cpus: Union[None, int, float, str] = None
    memory: Union[None, int, float, str] = None
    accelerators: Union[None, str, Dict[str, int]] = None
    accelerator_args: Optional[Dict[str, str]] = None
    use_spot: Optional[bool] = None
    spot_recovery: Optional[str] = None
    region: Optional[str] = None
    zone: Optional[str] = None
    image_id: Union[Dict[str, str], str, None] = None
    disk_size: Optional[int] = None
    disk_tier: Optional[Literal["high", "medium", "low"]] = None

    # Run settings
    cluster_name: Optional[str] = None
    retry_until_up: bool = False
    idle_minutes_to_autostop: Optional[int] = 30
    down: bool = True
    stream_logs: bool = True

skypilot_orchestrator_gcp_vm_flavor

Skypilot orchestrator GCP flavor.

SkypilotGCPOrchestratorConfig (SkypilotBaseOrchestratorConfig, GoogleCredentialsConfigMixin, SkypilotGCPOrchestratorSettings) pydantic-model

Skypilot orchestrator config for GCP.

Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_gcp_vm_flavor.py
class SkypilotGCPOrchestratorConfig(  # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
    SkypilotBaseOrchestratorConfig,
    GoogleCredentialsConfigMixin,
    SkypilotGCPOrchestratorSettings,
):
    """Skypilot orchestrator config for GCP."""
SkypilotGCPOrchestratorFlavor (BaseOrchestratorFlavor)

Flavor for the Skypilot orchestrator for GCP.

Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_gcp_vm_flavor.py
class SkypilotGCPOrchestratorFlavor(BaseOrchestratorFlavor):
    """Flavor for the Skypilot orchestrator for GCP."""

    @property
    def name(self) -> str:
        """Name of the orchestrator flavor.

        Returns:
            Name of the orchestrator flavor.
        """
        return SKYPILOT_GCP_ORCHESTRATOR_FLAVOR

    @property
    def service_connector_requirements(
        self,
    ) -> Optional[ServiceConnectorRequirements]:
        """Service connector resource requirements for service connectors.

        Specifies resource requirements that are used to filter the available
        service connector types that are compatible with this flavor.

        Returns:
            Requirements for compatible service connectors, if a service
            connector is required for this flavor.
        """
        return ServiceConnectorRequirements(
            resource_type="gcp-generic",
        )

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/gcp-skypilot.png"

    @property
    def config_class(self) -> Type[BaseOrchestratorConfig]:
        """Config class for the base orchestrator flavor.

        Returns:
            The config class.
        """
        return SkypilotGCPOrchestratorConfig

    @property
    def implementation_class(self) -> Type["SkypilotGCPOrchestrator"]:
        """Implementation class for this flavor.

        Returns:
            Implementation class for this flavor.
        """
        from zenml.integrations.skypilot.orchestrators import (
            SkypilotGCPOrchestrator,
        )

        return SkypilotGCPOrchestrator
config_class: Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig] property readonly

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[SkypilotGCPOrchestrator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[SkypilotGCPOrchestrator]

Implementation class for this flavor.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the orchestrator flavor.

Returns:

Type Description
str

Name of the orchestrator flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[zenml.models.service_connector_models.ServiceConnectorRequirements] property readonly

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[zenml.models.service_connector_models.ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service connector is required for this flavor.

SkypilotGCPOrchestratorSettings (SkypilotBaseOrchestratorSettings) pydantic-model

Skypilot orchestrator settings for GCP.

Source code in zenml/integrations/skypilot/flavors/skypilot_orchestrator_gcp_vm_flavor.py
class SkypilotGCPOrchestratorSettings(SkypilotBaseOrchestratorSettings):
    """Skypilot orchestrator settings for GCP."""

orchestrators special

Initialization of the Skypilot ZenML orchestrators.

skypilot_aws_vm_orchestrator

Implementation of the a Skypilot based AWS VM orchestrator.

SkypilotAWSOrchestrator (SkypilotBaseOrchestrator)

Orchestrator responsible for running pipelines remotely in a VM on AWS.

This orchestrator does not support running on a schedule.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_aws_vm_orchestrator.py
class SkypilotAWSOrchestrator(SkypilotBaseOrchestrator):
    """Orchestrator responsible for running pipelines remotely in a VM on AWS.

    This orchestrator does not support running on a schedule.
    """

    DEFAULT_INSTANCE_TYPE: str = "t3.xlarge"

    @property
    def cloud(self) -> sky.clouds.Cloud:
        """The type of sky cloud to use.

        Returns:
            A `sky.clouds.Cloud` instance.
        """
        return sky.clouds.AWS()

    @property
    def config(self) -> SkypilotAWSOrchestratorConfig:
        """Returns the `SkypilotAWSOrchestratorConfig` config.

        Returns:
            The configuration.
        """
        return cast(SkypilotAWSOrchestratorConfig, self._config)

    @property
    def settings_class(self) -> Optional[Type["BaseSettings"]]:
        """Settings class for the Skypilot orchestrator.

        Returns:
            The settings class.
        """
        return SkypilotAWSOrchestratorSettings

    def prepare_environment_variable(self, set: bool = True) -> None:
        """Set up Environment variables that are required for the orchestrator.

        Args:
            set: Whether to set the environment variables or not.

        Raises:
            ValueError: If no service connector is found.
        """
        connector = self.get_connector()
        if connector is None:
            raise ValueError(
                "No service connector found. Please make sure to set up a connector "
                "that is compatible with this orchestrator."
            )
        if set:
            # The AWS connector creates a local configuration profile with the name computed from
            # the first 8 digits of its UUID.
            aws_profile = f"zenml-{str(connector.id)[:8]}"
            os.environ[ENV_AWS_PROFILE] = aws_profile
        else:
            os.environ.pop(ENV_AWS_PROFILE, None)
cloud: Cloud property readonly

The type of sky cloud to use.

Returns:

Type Description
Cloud

A sky.clouds.Cloud instance.

config: SkypilotAWSOrchestratorConfig property readonly

Returns the SkypilotAWSOrchestratorConfig config.

Returns:

Type Description
SkypilotAWSOrchestratorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property readonly

Settings class for the Skypilot orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

prepare_environment_variable(self, set=True)

Set up Environment variables that are required for the orchestrator.

Parameters:

Name Type Description Default
set bool

Whether to set the environment variables or not.

True

Exceptions:

Type Description
ValueError

If no service connector is found.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_aws_vm_orchestrator.py
def prepare_environment_variable(self, set: bool = True) -> None:
    """Set up Environment variables that are required for the orchestrator.

    Args:
        set: Whether to set the environment variables or not.

    Raises:
        ValueError: If no service connector is found.
    """
    connector = self.get_connector()
    if connector is None:
        raise ValueError(
            "No service connector found. Please make sure to set up a connector "
            "that is compatible with this orchestrator."
        )
    if set:
        # The AWS connector creates a local configuration profile with the name computed from
        # the first 8 digits of its UUID.
        aws_profile = f"zenml-{str(connector.id)[:8]}"
        os.environ[ENV_AWS_PROFILE] = aws_profile
    else:
        os.environ.pop(ENV_AWS_PROFILE, None)

skypilot_azure_vm_orchestrator

Implementation of the a Skypilot based Azure VM orchestrator.

SkypilotAzureOrchestrator (SkypilotBaseOrchestrator)

Orchestrator responsible for running pipelines remotely in a VM on Azure.

This orchestrator does not support running on a schedule.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_azure_vm_orchestrator.py
class SkypilotAzureOrchestrator(SkypilotBaseOrchestrator):
    """Orchestrator responsible for running pipelines remotely in a VM on Azure.

    This orchestrator does not support running on a schedule.
    """

    DEFAULT_INSTANCE_TYPE: str = "Standard_B1ms"

    @property
    def cloud(self) -> sky.clouds.Cloud:
        """The type of sky cloud to use.

        Returns:
            A `sky.clouds.Cloud` instance.
        """
        return sky.clouds.Azure()

    @property
    def config(self) -> SkypilotAzureOrchestratorConfig:
        """Returns the `SkypilotAzureOrchestratorConfig` config.

        Returns:
            The configuration.
        """
        return cast(SkypilotAzureOrchestratorConfig, self._config)

    @property
    def settings_class(self) -> Optional[Type["BaseSettings"]]:
        """Settings class for the Skypilot orchestrator.

        Returns:
            The settings class.
        """
        return SkypilotAzureOrchestratorSettings

    def prepare_environment_variable(self, set: bool = True) -> None:
        """Set up Environment variables that are required for the orchestrator.

        Args:
            set: Whether to set the environment variables or not.
        """
        pass
cloud: Cloud property readonly

The type of sky cloud to use.

Returns:

Type Description
Cloud

A sky.clouds.Cloud instance.

config: SkypilotAzureOrchestratorConfig property readonly

Returns the SkypilotAzureOrchestratorConfig config.

Returns:

Type Description
SkypilotAzureOrchestratorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property readonly

Settings class for the Skypilot orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

prepare_environment_variable(self, set=True)

Set up Environment variables that are required for the orchestrator.

Parameters:

Name Type Description Default
set bool

Whether to set the environment variables or not.

True
Source code in zenml/integrations/skypilot/orchestrators/skypilot_azure_vm_orchestrator.py
def prepare_environment_variable(self, set: bool = True) -> None:
    """Set up Environment variables that are required for the orchestrator.

    Args:
        set: Whether to set the environment variables or not.
    """
    pass

skypilot_base_vm_orchestrator

Implementation of the a Skypilot base VM orchestrator.

SkypilotBaseOrchestrator (ContainerizedOrchestrator)

Base class for Orchestrator responsible for running pipelines remotely in a VM.

This orchestrator does not support running on a schedule.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
class SkypilotBaseOrchestrator(ContainerizedOrchestrator):
    """Base class for Orchestrator responsible for running pipelines remotely in a VM.

    This orchestrator does not support running on a schedule.
    """

    # The default instance type to use if none is specified in settings
    DEFAULT_INSTANCE_TYPE: Optional[str] = None

    @property
    def validator(self) -> Optional[StackValidator]:
        """Validates the stack.

        In the remote case, checks that the stack contains a container registry,
        image builder and only remote components.

        Returns:
            A `StackValidator` instance.
        """

        def _validate_remote_components(
            stack: "Stack",
        ) -> Tuple[bool, str]:
            for component in stack.components.values():
                if not component.config.is_local:
                    continue

                return False, (
                    f"The Skypilot orchestrator runs pipelines remotely, "
                    f"but the '{component.name}' {component.type.value} is "
                    "a local stack component and will not be available in "
                    "the Skypilot step.\nPlease ensure that you always "
                    "use non-local stack components with the Skypilot "
                    "orchestrator."
                )

            return True, ""

        return StackValidator(
            required_components={
                StackComponentType.CONTAINER_REGISTRY,
                StackComponentType.IMAGE_BUILDER,
            },
            custom_validation_function=_validate_remote_components,
        )

    def get_orchestrator_run_id(self) -> str:
        """Returns the active orchestrator run id.

        Raises:
            RuntimeError: If the environment variable specifying the run id
                is not set.

        Returns:
            The orchestrator run id.
        """
        try:
            return os.environ[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID]
        except KeyError:
            raise RuntimeError(
                "Unable to read run id from environment variable "
                f"{ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID}."
            )

    @property
    @abstractmethod
    def cloud(self) -> sky.clouds.Cloud:
        """The type of sky cloud to use.

        Returns:
            A `sky.clouds.Cloud` instance.
        """

    def setup_credentials(self) -> None:
        """Set up credentials for the orchestrator."""
        connector = self.get_connector()
        assert connector is not None
        connector.configure_local_client()

    @abstractmethod
    def prepare_environment_variable(self, set: bool = True) -> None:
        """Set up Environment variables that are required for the orchestrator.

        Args:
            set: Whether to set the environment variables or not.
        """

    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeploymentResponseModel",
        stack: "Stack",
        environment: Dict[str, str],
    ) -> Any:
        """Runs all pipeline steps in Skypilot containers.

        Args:
            deployment: The pipeline deployment to prepare or run.
            stack: The stack the pipeline will run on.
            environment: Environment variables to set in the orchestration
                environment.

        Raises:
            Exception: If the pipeline run fails.
        """
        if deployment.schedule:
            logger.warning(
                "Skypilot Orchestrator currently does not support the"
                "use of schedules. The `schedule` will be ignored "
                "and the pipeline will be run immediately."
            )

        # Set up some variables for configuration
        orchestrator_run_id = str(uuid4())
        environment[
            ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID
        ] = orchestrator_run_id

        settings = cast(
            SkypilotBaseOrchestratorSettings,
            self.get_settings(deployment),
        )

        entrypoint = PipelineEntrypointConfiguration.get_entrypoint_command()
        entrypoint_str = " ".join(entrypoint)
        arguments = PipelineEntrypointConfiguration.get_entrypoint_arguments(
            deployment_id=deployment.id
        )
        arguments_str = " ".join(arguments)

        # Set up docker run command
        image = self.get_image(deployment=deployment)
        docker_environment_str = " ".join(
            f"-e {k}={v}" for k, v in environment.items()
        )

        start_time = time.time()

        instance_type = settings.instance_type or self.DEFAULT_INSTANCE_TYPE

        # Set up credentials
        self.setup_credentials()

        # Guaranteed by stack validation
        assert stack is not None and stack.container_registry is not None

        docker_creds = stack.container_registry.credentials
        if docker_creds:
            docker_username, docker_password = docker_creds
            setup = (
                f"docker login --username $DOCKER_USERNAME --password "
                f"$DOCKER_PASSWORD {stack.container_registry.config.uri}"
            )
            task_envs = {
                "DOCKER_USERNAME": docker_username,
                "DOCKER_PASSWORD": docker_password,
            }
        else:
            setup = None
            task_envs = None

        # Run the entire pipeline

        # Set the service connector AWS profile ENV variable
        self.prepare_environment_variable(set=True)

        try:
            task = sky.Task(
                run=f"docker run --rm {docker_environment_str} {image} {entrypoint_str} {arguments_str}",
                setup=setup,
                envs=task_envs,
            )
            task = task.set_resources(
                sky.Resources(
                    cloud=self.cloud,
                    instance_type=instance_type,
                    cpus=settings.cpus,
                    memory=settings.memory,
                    accelerators=settings.accelerators,
                    accelerator_args=settings.accelerator_args,
                    use_spot=settings.use_spot,
                    spot_recovery=settings.spot_recovery,
                    region=settings.region,
                    zone=settings.zone,
                    image_id=settings.image_id,
                    disk_size=settings.disk_size,
                    disk_tier=settings.disk_tier,
                )
            )

            cluster_name = settings.cluster_name
            if cluster_name is None:
                # Find existing cluster
                for i in sky.status(refresh=True):
                    if isinstance(
                        i["handle"].launched_resources.cloud, type(self.cloud)
                    ):
                        cluster_name = i["handle"].cluster_name
                        logger.info(
                            f"Found existing cluster {cluster_name}. Reusing..."
                        )

            # Launch the cluster
            sky.launch(
                task,
                cluster_name,
                retry_until_up=settings.retry_until_up,
                idle_minutes_to_autostop=settings.idle_minutes_to_autostop,
                down=settings.down,
                stream_logs=settings.stream_logs,
            )

        except Exception as e:
            raise e

        finally:
            # Unset the service connector AWS profile ENV variable
            self.prepare_environment_variable(set=False)

        run_duration = time.time() - start_time
        run_id = orchestrator_utils.get_run_id_for_orchestrator_run_id(
            orchestrator=self, orchestrator_run_id=orchestrator_run_id
        )
        run_model = Client().zen_store.get_run(run_id)
        logger.info(
            "Pipeline run `%s` has finished in `%s`.\n",
            run_model.name,
            string_utils.get_human_readable_time(run_duration),
        )
cloud: Cloud property readonly

The type of sky cloud to use.

Returns:

Type Description
Cloud

A sky.clouds.Cloud instance.

validator: Optional[zenml.stack.stack_validator.StackValidator] property readonly

Validates the stack.

In the remote case, checks that the stack contains a container registry, image builder and only remote components.

Returns:

Type Description
Optional[zenml.stack.stack_validator.StackValidator]

A StackValidator instance.

get_orchestrator_run_id(self)

Returns the active orchestrator run id.

Exceptions:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    try:
        return os.environ[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID}."
        )
prepare_environment_variable(self, set=True)

Set up Environment variables that are required for the orchestrator.

Parameters:

Name Type Description Default
set bool

Whether to set the environment variables or not.

True
Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
@abstractmethod
def prepare_environment_variable(self, set: bool = True) -> None:
    """Set up Environment variables that are required for the orchestrator.

    Args:
        set: Whether to set the environment variables or not.
    """
prepare_or_run_pipeline(self, deployment, stack, environment)

Runs all pipeline steps in Skypilot containers.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponseModel

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required

Exceptions:

Type Description
Exception

If the pipeline run fails.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponseModel",
    stack: "Stack",
    environment: Dict[str, str],
) -> Any:
    """Runs all pipeline steps in Skypilot containers.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment.

    Raises:
        Exception: If the pipeline run fails.
    """
    if deployment.schedule:
        logger.warning(
            "Skypilot Orchestrator currently does not support the"
            "use of schedules. The `schedule` will be ignored "
            "and the pipeline will be run immediately."
        )

    # Set up some variables for configuration
    orchestrator_run_id = str(uuid4())
    environment[
        ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID
    ] = orchestrator_run_id

    settings = cast(
        SkypilotBaseOrchestratorSettings,
        self.get_settings(deployment),
    )

    entrypoint = PipelineEntrypointConfiguration.get_entrypoint_command()
    entrypoint_str = " ".join(entrypoint)
    arguments = PipelineEntrypointConfiguration.get_entrypoint_arguments(
        deployment_id=deployment.id
    )
    arguments_str = " ".join(arguments)

    # Set up docker run command
    image = self.get_image(deployment=deployment)
    docker_environment_str = " ".join(
        f"-e {k}={v}" for k, v in environment.items()
    )

    start_time = time.time()

    instance_type = settings.instance_type or self.DEFAULT_INSTANCE_TYPE

    # Set up credentials
    self.setup_credentials()

    # Guaranteed by stack validation
    assert stack is not None and stack.container_registry is not None

    docker_creds = stack.container_registry.credentials
    if docker_creds:
        docker_username, docker_password = docker_creds
        setup = (
            f"docker login --username $DOCKER_USERNAME --password "
            f"$DOCKER_PASSWORD {stack.container_registry.config.uri}"
        )
        task_envs = {
            "DOCKER_USERNAME": docker_username,
            "DOCKER_PASSWORD": docker_password,
        }
    else:
        setup = None
        task_envs = None

    # Run the entire pipeline

    # Set the service connector AWS profile ENV variable
    self.prepare_environment_variable(set=True)

    try:
        task = sky.Task(
            run=f"docker run --rm {docker_environment_str} {image} {entrypoint_str} {arguments_str}",
            setup=setup,
            envs=task_envs,
        )
        task = task.set_resources(
            sky.Resources(
                cloud=self.cloud,
                instance_type=instance_type,
                cpus=settings.cpus,
                memory=settings.memory,
                accelerators=settings.accelerators,
                accelerator_args=settings.accelerator_args,
                use_spot=settings.use_spot,
                spot_recovery=settings.spot_recovery,
                region=settings.region,
                zone=settings.zone,
                image_id=settings.image_id,
                disk_size=settings.disk_size,
                disk_tier=settings.disk_tier,
            )
        )

        cluster_name = settings.cluster_name
        if cluster_name is None:
            # Find existing cluster
            for i in sky.status(refresh=True):
                if isinstance(
                    i["handle"].launched_resources.cloud, type(self.cloud)
                ):
                    cluster_name = i["handle"].cluster_name
                    logger.info(
                        f"Found existing cluster {cluster_name}. Reusing..."
                    )

        # Launch the cluster
        sky.launch(
            task,
            cluster_name,
            retry_until_up=settings.retry_until_up,
            idle_minutes_to_autostop=settings.idle_minutes_to_autostop,
            down=settings.down,
            stream_logs=settings.stream_logs,
        )

    except Exception as e:
        raise e

    finally:
        # Unset the service connector AWS profile ENV variable
        self.prepare_environment_variable(set=False)

    run_duration = time.time() - start_time
    run_id = orchestrator_utils.get_run_id_for_orchestrator_run_id(
        orchestrator=self, orchestrator_run_id=orchestrator_run_id
    )
    run_model = Client().zen_store.get_run(run_id)
    logger.info(
        "Pipeline run `%s` has finished in `%s`.\n",
        run_model.name,
        string_utils.get_human_readable_time(run_duration),
    )
setup_credentials(self)

Set up credentials for the orchestrator.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py
def setup_credentials(self) -> None:
    """Set up credentials for the orchestrator."""
    connector = self.get_connector()
    assert connector is not None
    connector.configure_local_client()

skypilot_gcp_vm_orchestrator

Implementation of the a Skypilot-based GCP VM orchestrator.

SkypilotGCPOrchestrator (SkypilotBaseOrchestrator, GoogleCredentialsMixin)

Orchestrator responsible for running pipelines remotely in a VM on GCP.

This orchestrator does not support running on a schedule.

Source code in zenml/integrations/skypilot/orchestrators/skypilot_gcp_vm_orchestrator.py
class SkypilotGCPOrchestrator(
    SkypilotBaseOrchestrator, GoogleCredentialsMixin
):
    """Orchestrator responsible for running pipelines remotely in a VM on GCP.

    This orchestrator does not support running on a schedule.
    """

    DEFAULT_INSTANCE_TYPE: str = "n1-standard-4"

    @property
    def cloud(self) -> sky.clouds.Cloud:
        """The type of sky cloud to use.

        Returns:
            A `sky.clouds.Cloud` instance.
        """
        return sky.clouds.GCP()

    @property
    def config(self) -> SkypilotGCPOrchestratorConfig:
        """Returns the `SkypilotGCPOrchestratorConfig` config.

        Returns:
            The configuration.
        """
        return cast(SkypilotGCPOrchestratorConfig, self._config)

    @property
    def settings_class(self) -> Optional[Type["BaseSettings"]]:
        """Settings class for the Skypilot orchestrator.

        Returns:
            The settings class.
        """
        return SkypilotGCPOrchestratorSettings

    def prepare_environment_variable(self, set: bool = True) -> None:
        """Set up Environment variables that are required for the orchestrator.

        Args:
            set: Whether to set the environment variables or not.
        """
        pass
cloud: Cloud property readonly

The type of sky cloud to use.

Returns:

Type Description
Cloud

A sky.clouds.Cloud instance.

config: SkypilotGCPOrchestratorConfig property readonly

Returns the SkypilotGCPOrchestratorConfig config.

Returns:

Type Description
SkypilotGCPOrchestratorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property readonly

Settings class for the Skypilot orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

prepare_environment_variable(self, set=True)

Set up Environment variables that are required for the orchestrator.

Parameters:

Name Type Description Default
set bool

Whether to set the environment variables or not.

True
Source code in zenml/integrations/skypilot/orchestrators/skypilot_gcp_vm_orchestrator.py
def prepare_environment_variable(self, set: bool = True) -> None:
    """Set up Environment variables that are required for the orchestrator.

    Args:
        set: Whether to set the environment variables or not.
    """
    pass