Skip to content

Modal

zenml.integrations.modal special

Modal integration for cloud-native step execution.

The Modal integration sub-module provides a step operator flavor that allows executing steps on Modal's cloud infrastructure.

ModalIntegration (Integration)

Definition of Modal integration for ZenML.

Source code in zenml/integrations/modal/__init__.py
class ModalIntegration(Integration):
    """Definition of Modal integration for ZenML."""

    NAME = MODAL
    REQUIREMENTS = ["modal>=0.64.49,<1"]

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Modal integration.

        Returns:
            List of new stack component flavors.
        """
        from zenml.integrations.modal.flavors import ModalStepOperatorFlavor

        return [ModalStepOperatorFlavor]

flavors() classmethod

Declare the stack component flavors for the Modal integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of new stack component flavors.

Source code in zenml/integrations/modal/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Modal integration.

    Returns:
        List of new stack component flavors.
    """
    from zenml.integrations.modal.flavors import ModalStepOperatorFlavor

    return [ModalStepOperatorFlavor]

flavors special

Modal integration flavors.

modal_step_operator_flavor

Modal step operator flavor.

ModalStepOperatorConfig (BaseStepOperatorConfig, ModalStepOperatorSettings)

Configuration for the Modal step operator.

Source code in zenml/integrations/modal/flavors/modal_step_operator_flavor.py
class ModalStepOperatorConfig(
    BaseStepOperatorConfig, ModalStepOperatorSettings
):
    """Configuration for the Modal step operator."""

    @property
    def is_remote(self) -> bool:
        """Checks if this stack component is running remotely.

        This designation is used to determine if the stack component can be
        used with a local ZenML database or if it requires a remote ZenML
        server.

        Returns:
            True if this config is for a remote component, False otherwise.
        """
        return True
is_remote: bool property readonly

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

ModalStepOperatorFlavor (BaseStepOperatorFlavor)

Modal step operator flavor.

Source code in zenml/integrations/modal/flavors/modal_step_operator_flavor.py
class ModalStepOperatorFlavor(BaseStepOperatorFlavor):
    """Modal step operator flavor."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return MODAL_STEP_OPERATOR_FLAVOR

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/step_operator/modal.png"

    @property
    def config_class(self) -> Type[ModalStepOperatorConfig]:
        """Returns `ModalStepOperatorConfig` config class.

        Returns:
            The config class.
        """
        return ModalStepOperatorConfig

    @property
    def implementation_class(self) -> Type["ModalStepOperator"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.modal.step_operators import ModalStepOperator

        return ModalStepOperator
config_class: Type[zenml.integrations.modal.flavors.modal_step_operator_flavor.ModalStepOperatorConfig] property readonly

Returns ModalStepOperatorConfig config class.

Returns:

Type Description
Type[zenml.integrations.modal.flavors.modal_step_operator_flavor.ModalStepOperatorConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[ModalStepOperator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[ModalStepOperator]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

ModalStepOperatorSettings (BaseSettings)

Settings for the Modal step operator.

Specifying the region and cloud provider is only available for Enterprise and Team plan customers.

Certain combinations of settings are not available. It is suggested to err on the side of looser settings rather than more restrictive ones to avoid pipeline execution failures. In the case of failures, however, Modal provides detailed error messages that can help identify what is incompatible. See more in the Modal docs at https://modal.com/docs/guide/region-selection.

Attributes:

Name Type Description
gpu Optional[str]

The type of GPU to use for the step execution.

region Optional[str]

The region to use for the step execution.

cloud Optional[str]

The cloud provider to use for the step execution.

Source code in zenml/integrations/modal/flavors/modal_step_operator_flavor.py
class ModalStepOperatorSettings(BaseSettings):
    """Settings for the Modal step operator.

    Specifying the region and cloud provider is only available for Enterprise
    and Team plan customers.

    Certain combinations of settings are not available. It is suggested to err
    on the side of looser settings rather than more restrictive ones to avoid
    pipeline execution failures. In the case of failures, however, Modal
    provides detailed error messages that can help identify what is
    incompatible. See more in the Modal docs at https://modal.com/docs/guide/region-selection.

    Attributes:
        gpu: The type of GPU to use for the step execution.
        region: The region to use for the step execution.
        cloud: The cloud provider to use for the step execution.
    """

    gpu: Optional[str] = None
    region: Optional[str] = None
    cloud: Optional[str] = None

step_operators special

Modal step operator.

modal_step_operator

Modal step operator implementation.

ModalStepOperator (BaseStepOperator)

Step operator to run a step on Modal.

This class defines code that can set up a Modal environment and run functions in it.

Source code in zenml/integrations/modal/step_operators/modal_step_operator.py
class ModalStepOperator(BaseStepOperator):
    """Step operator to run a step on Modal.

    This class defines code that can set up a Modal environment and run
    functions in it.
    """

    @property
    def config(self) -> ModalStepOperatorConfig:
        """Get the Modal step operator configuration.

        Returns:
            The Modal step operator configuration.
        """
        return cast(ModalStepOperatorConfig, self._config)

    @property
    def settings_class(self) -> Optional[Type["BaseSettings"]]:
        """Get the settings class for the Modal step operator.

        Returns:
            The Modal step operator settings class.
        """
        return ModalStepOperatorSettings

    @property
    def validator(self) -> Optional[StackValidator]:
        """Get the stack validator for the Modal step operator.

        Returns:
            The stack validator.
        """

        def _validate_remote_components(stack: "Stack") -> Tuple[bool, str]:
            if stack.artifact_store.config.is_local:
                return False, (
                    "The Modal step operator runs code remotely and "
                    "needs to write files into the artifact store, but the "
                    f"artifact store `{stack.artifact_store.name}` of the "
                    "active stack is local. Please ensure that your stack "
                    "contains a remote artifact store when using the Modal "
                    "step operator."
                )

            container_registry = stack.container_registry
            assert container_registry is not None

            if container_registry.config.is_local:
                return False, (
                    "The Modal step operator runs code remotely and "
                    "needs to push/pull Docker images, but the "
                    f"container registry `{container_registry.name}` of the "
                    "active stack is local. Please ensure that your stack "
                    "contains a remote container registry when using the "
                    "Modal step operator."
                )

            return True, ""

        return StackValidator(
            required_components={
                StackComponentType.CONTAINER_REGISTRY,
                StackComponentType.IMAGE_BUILDER,
            },
            custom_validation_function=_validate_remote_components,
        )

    def get_docker_builds(
        self, deployment: "PipelineDeploymentBase"
    ) -> List["BuildConfiguration"]:
        """Get the Docker build configurations for the Modal step operator.

        Args:
            deployment: The pipeline deployment.

        Returns:
            A list of Docker build configurations.
        """
        builds = []
        for step_name, step in deployment.step_configurations.items():
            if step.config.step_operator == self.name:
                build = BuildConfiguration(
                    key=MODAL_STEP_OPERATOR_DOCKER_IMAGE_KEY,
                    settings=step.config.docker_settings,
                    step_name=step_name,
                )
                builds.append(build)

        return builds

    def launch(
        self,
        info: "StepRunInfo",
        entrypoint_command: List[str],
        environment: Dict[str, str],
    ) -> None:
        """Launch a step run on Modal.

        Args:
            info: The step run information.
            entrypoint_command: The entrypoint command for the step.
            environment: The environment variables for the step.

        Raises:
            RuntimeError: If no Docker credentials are found for the container registry.
            ValueError: If no container registry is found in the stack.
        """
        settings = cast(ModalStepOperatorSettings, self.get_settings(info))
        image_name = info.get_image(key=MODAL_STEP_OPERATOR_DOCKER_IMAGE_KEY)
        zc = Client()
        stack = zc.active_stack

        if not stack.container_registry:
            raise ValueError(
                "No Container registry found in the stack. "
                "Please add a container registry and ensure "
                "it is correctly configured."
            )

        if docker_creds := stack.container_registry.credentials:
            docker_username, docker_password = docker_creds
        else:
            raise RuntimeError(
                "No Docker credentials found for the container registry."
            )

        my_secret = modal.secret._Secret.from_dict(
            {
                "REGISTRY_USERNAME": docker_username,
                "REGISTRY_PASSWORD": docker_password,
            }
        )

        spec = modal.image.DockerfileSpec(
            commands=[f"FROM {image_name}"], context_files={}
        )

        zenml_image = modal.Image._from_args(
            dockerfile_function=lambda *_, **__: spec,
            force_build=False,
            image_registry_config=modal.image._ImageRegistryConfig(
                api_pb2.REGISTRY_AUTH_TYPE_STATIC_CREDS, my_secret
            ),
        ).env(environment)

        resource_settings = info.config.resource_settings
        gpu_values = get_gpu_values(settings, resource_settings)

        app = modal.App(
            f"zenml-{info.run_name}-{info.step_run_id}-{info.pipeline_step_name}"
        )

        async def run_sandbox() -> asyncio.Future[None]:
            loop = asyncio.get_event_loop()
            future = loop.create_future()
            with modal.enable_output():
                async with app.run():
                    memory_mb = resource_settings.get_memory(ByteUnit.MB)
                    memory_int = (
                        int(memory_mb) if memory_mb is not None else None
                    )
                    sb = await modal.Sandbox.create.aio(
                        "bash",
                        "-c",
                        " ".join(entrypoint_command),
                        image=zenml_image,
                        gpu=gpu_values,
                        cpu=resource_settings.cpu_count,
                        memory=memory_int,
                        cloud=settings.cloud,
                        region=settings.region,
                        app=app,
                        timeout=86400,  # 24h, the max Modal allows
                    )

                    await sb.wait.aio()

            future.set_result(None)
            return future

        asyncio.run(run_sandbox())
config: ModalStepOperatorConfig property readonly

Get the Modal step operator configuration.

Returns:

Type Description
ModalStepOperatorConfig

The Modal step operator configuration.

settings_class: Optional[Type[BaseSettings]] property readonly

Get the settings class for the Modal step operator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The Modal step operator settings class.

validator: Optional[zenml.stack.stack_validator.StackValidator] property readonly

Get the stack validator for the Modal step operator.

Returns:

Type Description
Optional[zenml.stack.stack_validator.StackValidator]

The stack validator.

get_docker_builds(self, deployment)

Get the Docker build configurations for the Modal step operator.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The pipeline deployment.

required

Returns:

Type Description
List[BuildConfiguration]

A list of Docker build configurations.

Source code in zenml/integrations/modal/step_operators/modal_step_operator.py
def get_docker_builds(
    self, deployment: "PipelineDeploymentBase"
) -> List["BuildConfiguration"]:
    """Get the Docker build configurations for the Modal step operator.

    Args:
        deployment: The pipeline deployment.

    Returns:
        A list of Docker build configurations.
    """
    builds = []
    for step_name, step in deployment.step_configurations.items():
        if step.config.step_operator == self.name:
            build = BuildConfiguration(
                key=MODAL_STEP_OPERATOR_DOCKER_IMAGE_KEY,
                settings=step.config.docker_settings,
                step_name=step_name,
            )
            builds.append(build)

    return builds
launch(self, info, entrypoint_command, environment)

Launch a step run on Modal.

Parameters:

Name Type Description Default
info StepRunInfo

The step run information.

required
entrypoint_command List[str]

The entrypoint command for the step.

required
environment Dict[str, str]

The environment variables for the step.

required

Exceptions:

Type Description
RuntimeError

If no Docker credentials are found for the container registry.

ValueError

If no container registry is found in the stack.

Source code in zenml/integrations/modal/step_operators/modal_step_operator.py
def launch(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
    environment: Dict[str, str],
) -> None:
    """Launch a step run on Modal.

    Args:
        info: The step run information.
        entrypoint_command: The entrypoint command for the step.
        environment: The environment variables for the step.

    Raises:
        RuntimeError: If no Docker credentials are found for the container registry.
        ValueError: If no container registry is found in the stack.
    """
    settings = cast(ModalStepOperatorSettings, self.get_settings(info))
    image_name = info.get_image(key=MODAL_STEP_OPERATOR_DOCKER_IMAGE_KEY)
    zc = Client()
    stack = zc.active_stack

    if not stack.container_registry:
        raise ValueError(
            "No Container registry found in the stack. "
            "Please add a container registry and ensure "
            "it is correctly configured."
        )

    if docker_creds := stack.container_registry.credentials:
        docker_username, docker_password = docker_creds
    else:
        raise RuntimeError(
            "No Docker credentials found for the container registry."
        )

    my_secret = modal.secret._Secret.from_dict(
        {
            "REGISTRY_USERNAME": docker_username,
            "REGISTRY_PASSWORD": docker_password,
        }
    )

    spec = modal.image.DockerfileSpec(
        commands=[f"FROM {image_name}"], context_files={}
    )

    zenml_image = modal.Image._from_args(
        dockerfile_function=lambda *_, **__: spec,
        force_build=False,
        image_registry_config=modal.image._ImageRegistryConfig(
            api_pb2.REGISTRY_AUTH_TYPE_STATIC_CREDS, my_secret
        ),
    ).env(environment)

    resource_settings = info.config.resource_settings
    gpu_values = get_gpu_values(settings, resource_settings)

    app = modal.App(
        f"zenml-{info.run_name}-{info.step_run_id}-{info.pipeline_step_name}"
    )

    async def run_sandbox() -> asyncio.Future[None]:
        loop = asyncio.get_event_loop()
        future = loop.create_future()
        with modal.enable_output():
            async with app.run():
                memory_mb = resource_settings.get_memory(ByteUnit.MB)
                memory_int = (
                    int(memory_mb) if memory_mb is not None else None
                )
                sb = await modal.Sandbox.create.aio(
                    "bash",
                    "-c",
                    " ".join(entrypoint_command),
                    image=zenml_image,
                    gpu=gpu_values,
                    cpu=resource_settings.cpu_count,
                    memory=memory_int,
                    cloud=settings.cloud,
                    region=settings.region,
                    app=app,
                    timeout=86400,  # 24h, the max Modal allows
                )

                await sb.wait.aio()

        future.set_result(None)
        return future

    asyncio.run(run_sandbox())
get_gpu_values(settings, resource_settings)

Get the GPU values for the Modal step operator.

Parameters:

Name Type Description Default
settings ModalStepOperatorSettings

The Modal step operator settings.

required
resource_settings ResourceSettings

The resource settings.

required

Returns:

Type Description
Optional[str]

The GPU string if a count is specified, otherwise the GPU type.

Source code in zenml/integrations/modal/step_operators/modal_step_operator.py
def get_gpu_values(
    settings: ModalStepOperatorSettings, resource_settings: ResourceSettings
) -> Optional[str]:
    """Get the GPU values for the Modal step operator.

    Args:
        settings: The Modal step operator settings.
        resource_settings: The resource settings.

    Returns:
        The GPU string if a count is specified, otherwise the GPU type.
    """
    if not settings.gpu:
        return None
    gpu_count = resource_settings.gpu_count
    return f"{settings.gpu}:{gpu_count}" if gpu_count else settings.gpu