Skip to content

Modal

zenml.integrations.modal

Modal integration for cloud-native pipeline and step execution.

The Modal integration sub-module provides stack component flavors that allow executing pipelines and individual steps on Modal's cloud infrastructure.

Attributes

MODAL = 'modal' module-attribute

MODAL_ORCHESTRATOR_FLAVOR = 'modal' module-attribute

MODAL_SANDBOX_FLAVOR = 'modal' module-attribute

MODAL_STEP_OPERATOR_FLAVOR = 'modal' module-attribute

Classes

Flavor

Class for ZenML Flavors.

Attributes
config_class: Type[StackComponentConfig] abstractmethod property

Returns StackComponentConfig config class.

Returns:

Type Description
Type[StackComponentConfig]

The config class.

config_schema: Dict[str, Any] property

The config schema for a flavor.

Returns:

Type Description
Dict[str, Any]

The config schema.

display_name: Optional[str] property

The display name of the flavor.

By default, converts the technical name to a human-readable format. For example, "vm_kubernetes" becomes "VM Kubernetes". Flavors can override this to provide custom display names.

Returns:

Type Description
Optional[str]

The display name of the flavor.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[StackComponent] abstractmethod property

Implementation class for this flavor.

Returns:

Type Description
Type[StackComponent]

The implementation class for this flavor.

logo_url: Optional[str] property

A url to represent the flavor in the dashboard.

Returns:

Type Description
Optional[str]

The flavor logo.

name: str abstractmethod property

The flavor name.

Returns:

Type Description
str

The flavor name.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

type: StackComponentType abstractmethod property

The stack component type.

Returns:

Type Description
StackComponentType

The stack component type.

Methods:
from_model(flavor_model: FlavorResponse) -> Flavor classmethod

Loads a flavor from a model.

Parameters:

Name Type Description Default
flavor_model FlavorResponse

The model to load from.

required

Raises:

Type Description
CustomFlavorImportError

If the custom flavor can't be imported.

ImportError

If the flavor can't be imported.

Returns:

Type Description
Flavor

The loaded flavor.

Source code in src/zenml/stack/flavor.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
@classmethod
def from_model(cls, flavor_model: FlavorResponse) -> "Flavor":
    """Loads a flavor from a model.

    Args:
        flavor_model: The model to load from.

    Raises:
        CustomFlavorImportError: If the custom flavor can't be imported.
        ImportError: If the flavor can't be imported.

    Returns:
        The loaded flavor.
    """
    try:
        flavor = source_utils.load(flavor_model.source)()
    except (ModuleNotFoundError, ImportError, NotImplementedError) as err:
        if flavor_model.is_custom:
            flavor_module, _ = flavor_model.source.rsplit(".", maxsplit=1)
            expected_file_path = os.path.join(
                source_utils.get_source_root(),
                flavor_module.replace(".", os.path.sep),
            )
            raise CustomFlavorImportError(
                f"Couldn't import custom flavor {flavor_model.name}: "
                f"{err}. Make sure the custom flavor class "
                f"`{flavor_model.source}` is importable. If it is part of "
                "a library, make sure it is installed. If "
                "it is a local code file, make sure it exists at "
                f"`{expected_file_path}.py`."
            )
        else:
            raise ImportError(
                f"Couldn't import flavor {flavor_model.name}: {err}"
            )
    return cast(Flavor, flavor)
generate_default_docs_url() -> str

Generate the doc urls for all inbuilt and integration flavors.

Note that this method is not going to be useful for custom flavors, which do not have any docs in the main zenml docs.

Returns:

Type Description
str

The complete url to the zenml documentation

Source code in src/zenml/stack/flavor.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def generate_default_docs_url(self) -> str:
    """Generate the doc urls for all inbuilt and integration flavors.

    Note that this method is not going to be useful for custom flavors,
    which do not have any docs in the main zenml docs.

    Returns:
        The complete url to the zenml documentation
    """
    component_type = self.type.plural.replace("_", "-")
    name = self.name.replace("_", "-")

    base = "https://docs.zenml.io"
    return f"{base}/stack-components/{component_type}/{name}"
generate_default_sdk_docs_url() -> str

Generate SDK docs url for a flavor.

Returns:

Type Description
str

The complete url to the zenml SDK docs

Source code in src/zenml/stack/flavor.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def generate_default_sdk_docs_url(self) -> str:
    """Generate SDK docs url for a flavor.

    Returns:
        The complete url to the zenml SDK docs
    """
    from zenml import __version__

    base = f"https://sdkdocs.zenml.io/{__version__}"

    component_type = self.type.plural

    if "zenml.integrations" in self.__module__:
        # Get integration name out of module path which will look something
        #  like this "zenml.integrations.<integration>....
        integration = self.__module__.split(
            "zenml.integrations.", maxsplit=1
        )[1].split(".")[0]

        # Get the config class name to point to the specific class
        config_class_name = self.config_class.__name__

        return (
            f"{base}/integration_code_docs"
            f"/integrations-{integration}"
            f"#zenml.integrations.{integration}.flavors.{config_class_name}"
        )

    else:
        return (
            f"{base}/core_code_docs/core-{component_type}/"
            f"#{self.__module__}"
        )
to_model(integration: Optional[str] = None, is_custom: bool = True) -> FlavorRequest

Converts a flavor to a model.

Parameters:

Name Type Description Default
integration Optional[str]

The integration to use for the model.

None
is_custom bool

Whether the flavor is a custom flavor.

True

Returns:

Type Description
FlavorRequest

The model.

Source code in src/zenml/stack/flavor.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def to_model(
    self,
    integration: Optional[str] = None,
    is_custom: bool = True,
) -> FlavorRequest:
    """Converts a flavor to a model.

    Args:
        integration: The integration to use for the model.
        is_custom: Whether the flavor is a custom flavor.

    Returns:
        The model.
    """
    connector_requirements = self.service_connector_requirements
    connector_type = (
        connector_requirements.connector_type
        if connector_requirements
        else None
    )
    resource_type = (
        connector_requirements.resource_type
        if connector_requirements
        else None
    )
    resource_id_attr = (
        connector_requirements.resource_id_attr
        if connector_requirements
        else None
    )

    model = FlavorRequest(
        name=self.name,
        display_name=self.display_name,
        type=self.type,
        source=source_utils.resolve(self.__class__).import_path,
        config_schema=self.config_schema,
        connector_type=connector_type,
        connector_resource_type=resource_type,
        connector_resource_id_attr=resource_id_attr,
        integration=integration,
        logo_url=self.logo_url,
        docs_url=self.docs_url,
        sdk_docs_url=self.sdk_docs_url,
        is_custom=is_custom,
    )
    return model

Integration

Base class for integration in ZenML.

Methods:
activate() -> None classmethod

Abstract method to activate the integration.

Source code in src/zenml/integrations/integration.py
136
137
138
@classmethod
def activate(cls) -> None:
    """Abstract method to activate the integration."""
check_installation() -> bool classmethod

Method to check whether the required packages are installed.

Returns:

Type Description
bool

True if all required packages are installed, False otherwise.

Source code in src/zenml/integrations/integration.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@classmethod
def check_installation(cls) -> bool:
    """Method to check whether the required packages are installed.

    Returns:
        True if all required packages are installed, False otherwise.
    """
    for requirement in cls.get_requirements():
        parsed_requirement = Requirement(requirement)

        if not requirement_installed(parsed_requirement):
            logger.debug(
                "Requirement '%s' for integration '%s' is not installed "
                "or installed with the wrong version.",
                requirement,
                cls.NAME,
            )
            return False

        dependencies = get_dependencies(parsed_requirement)

        for dependency in dependencies:
            if not requirement_installed(dependency):
                logger.debug(
                    "Requirement '%s' for integration '%s' is not "
                    "installed or installed with the wrong version.",
                    dependency,
                    cls.NAME,
                )
                return False

    logger.debug(
        f"Integration '{cls.NAME}' is installed correctly with "
        f"requirements {cls.get_requirements()}."
    )
    return True
flavors() -> List[Type[Flavor]] classmethod

Abstract method to declare new stack component flavors.

Returns:

Type Description
List[Type[Flavor]]

A list of new stack component flavors.

Source code in src/zenml/integrations/integration.py
140
141
142
143
144
145
146
147
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Abstract method to declare new stack component flavors.

    Returns:
        A list of new stack component flavors.
    """
    return []
get_requirements(target_os: Optional[str] = None, python_version: Optional[str] = None) -> List[str] classmethod

Method to get the requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None
python_version Optional[str]

The Python version to use for the requirements.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@classmethod
def get_requirements(
    cls,
    target_os: Optional[str] = None,
    python_version: Optional[str] = None,
) -> List[str]:
    """Method to get the requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.
        python_version: The Python version to use for the requirements.

    Returns:
        A list of requirements.
    """
    return cls.REQUIREMENTS
get_uninstall_requirements(target_os: Optional[str] = None) -> List[str] classmethod

Method to get the uninstall requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
@classmethod
def get_uninstall_requirements(
    cls, target_os: Optional[str] = None
) -> List[str]:
    """Method to get the uninstall requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.

    Returns:
        A list of requirements.
    """
    ret = []
    for each in cls.get_requirements(target_os=target_os):
        is_ignored = False
        for ignored in cls.REQUIREMENTS_IGNORED_ON_UNINSTALL:
            if each.startswith(ignored):
                is_ignored = True
                break
        if not is_ignored:
            ret.append(each)
    return ret

ModalIntegration

Bases: Integration

Definition of Modal integration for ZenML.

Methods:
flavors() -> List[Type[Flavor]] classmethod

Declare the stack component flavors for the Modal integration.

Returns:

Type Description
List[Type[Flavor]]

List of new stack component flavors.

Source code in src/zenml/integrations/modal/__init__.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Modal integration.

    Returns:
        List of new stack component flavors.
    """
    from zenml.integrations.modal.flavors import (
        ModalOrchestratorFlavor,
        ModalSandboxFlavor,
        ModalStepOperatorFlavor,
    )

    return [
        ModalStepOperatorFlavor,
        ModalOrchestratorFlavor,
        ModalSandboxFlavor,
    ]

Modules

flavors

Modal integration flavors.

Classes
ModalOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseOrchestratorConfig, ModalOrchestratorSettings

Configuration for the Modal orchestrator.

Source code in src/zenml/stack/stack_component.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
handles_step_retries: bool property

Whether the orchestrator handles step retries internally.

is_local: bool property

Checks if this stack component is running locally.

is_remote: bool property

Checks if this stack component is running remotely.

is_schedulable: bool property

Whether the orchestrator supports schedules.

is_synchronous: bool property

Whether the orchestrator runs synchronously or not.

supports_client_side_caching: bool property

Whether the orchestrator supports client-side caching.

Methods:
validate_modal_token_pair() -> ModalOrchestratorConfig

Validate that Modal token fields are configured together.

Source code in src/zenml/integrations/modal/flavors/modal_orchestrator_flavor.py
83
84
85
86
87
88
89
90
91
92
93
94
@model_validator(mode="after")
def validate_modal_token_pair(self) -> "ModalOrchestratorConfig":
    """Validate that Modal token fields are configured together."""
    token_id = self.token_id.strip() if self.token_id else None
    token_secret = self.token_secret.strip() if self.token_secret else None

    if bool(token_id) != bool(token_secret):
        raise ValueError(
            "Modal token_id and token_secret must be configured together."
        )

    return self
ModalOrchestratorFlavor

Bases: BaseOrchestratorFlavor

Modal orchestrator flavor.

Attributes
config_class: Type[ModalOrchestratorConfig] property

Returns the Modal orchestrator config class.

docs_url: Optional[str] property

A URL to point at docs explaining this flavor.

implementation_class: Type[ModalOrchestrator] property

Implementation class for this flavor.

logo_url: str property

A URL to represent the flavor in the dashboard.

name: str property

Name of the flavor.

sdk_docs_url: Optional[str] property

A URL to point at SDK docs explaining this flavor.

ModalOrchestratorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Settings for Modal sandboxes created by the orchestrator.

These settings can apply to the sandbox that controls the run and to child sandboxes that execute individual steps.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
ModalSandboxConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSandboxConfig, ModalSandboxSettings, ModalStepOperatorConfig

Configuration for the Modal sandbox component.

Source code in src/zenml/stack/stack_component.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
ModalSandboxFlavor

Bases: BaseSandboxFlavor

Modal sandbox flavor.

Attributes
config_class: Type[ModalSandboxConfig] property

Config class.

Returns:

Type Description
Type[ModalSandboxConfig]

ModalSandboxConfig.

docs_url: Optional[str] property

URL to user-facing docs for this flavor.

Returns:

Type Description
Optional[str]

The flavor docs URL.

implementation_class: Type[ModalSandbox] property

Implementation class.

Returns:

Type Description
Type[ModalSandbox]

ModalSandbox.

logo_url: str property

Dashboard logo URL.

Returns:

Type Description
str

The flavor logo URL.

name: str property

Flavor name.

Returns:

Type Description
str

"modal".

sdk_docs_url: Optional[str] property

URL to SDK docs for this flavor.

Returns:

Type Description
Optional[str]

The flavor SDK docs URL.

ModalSandboxSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSandboxSettings, ModalStepOperatorSettings

Per-step settings for the Modal sandbox.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
ModalStepOperatorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseStepOperatorConfig, ModalStepOperatorSettings

Configuration for the Modal step operator.

Attributes:

Name Type Description
token_id Optional[str]

Modal API token ID (ak-xxxxx format) for authentication.

token_secret Optional[str]

Modal API token secret (as-xxxxx format) for authentication.

Note: If token_id and token_secret are provided, both fields must be set. If they are not provided, Modal falls back to its default authentication (~/.modal.toml or environment variables). All other configuration options (modal_environment, gpu, region, etc.) are inherited from ModalStepOperatorSettings.

Source code in src/zenml/stack/stack_component.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_remote: bool property

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

Methods:
validate_modal_token_pair() -> ModalStepOperatorConfig

Validate that Modal token fields are configured together.

Source code in src/zenml/integrations/modal/flavors/modal_step_operator_flavor.py
110
111
112
113
114
115
116
117
118
119
120
121
@model_validator(mode="after")
def validate_modal_token_pair(self) -> "ModalStepOperatorConfig":
    """Validate that Modal token fields are configured together."""
    token_id = self.token_id.strip() if self.token_id else None
    token_secret = self.token_secret.strip() if self.token_secret else None

    if bool(token_id) != bool(token_secret):
        raise ValueError(
            "Modal token_id and token_secret must be configured together."
        )

    return self
ModalStepOperatorFlavor

Bases: BaseStepOperatorFlavor

Modal step operator flavor.

Attributes
config_class: Type[ModalStepOperatorConfig] property

Returns ModalStepOperatorConfig config class.

Returns:

Type Description
Type[ModalStepOperatorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[ModalStepOperator] property

Implementation class for this flavor.

Returns:

Type Description
Type[ModalStepOperator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

ModalStepOperatorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Settings for the Modal step operator.

Specifying the region and cloud provider is only available for Enterprise and Team plan customers.

Certain combinations of settings are not available. It is suggested to err on the side of looser settings rather than more restrictive ones to avoid pipeline execution failures. In the case of failures, however, Modal provides detailed error messages that can help identify what is incompatible. See more in the Modal docs at https://modal.com/docs/guide/region-selection.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
Modules
modal_orchestrator_flavor

Modal orchestrator flavor.

Classes
ModalOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseOrchestratorConfig, ModalOrchestratorSettings

Configuration for the Modal orchestrator.

Source code in src/zenml/stack/stack_component.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
handles_step_retries: bool property

Whether the orchestrator handles step retries internally.

is_local: bool property

Checks if this stack component is running locally.

is_remote: bool property

Checks if this stack component is running remotely.

is_schedulable: bool property

Whether the orchestrator supports schedules.

is_synchronous: bool property

Whether the orchestrator runs synchronously or not.

supports_client_side_caching: bool property

Whether the orchestrator supports client-side caching.

Methods:
validate_modal_token_pair() -> ModalOrchestratorConfig

Validate that Modal token fields are configured together.

Source code in src/zenml/integrations/modal/flavors/modal_orchestrator_flavor.py
83
84
85
86
87
88
89
90
91
92
93
94
@model_validator(mode="after")
def validate_modal_token_pair(self) -> "ModalOrchestratorConfig":
    """Validate that Modal token fields are configured together."""
    token_id = self.token_id.strip() if self.token_id else None
    token_secret = self.token_secret.strip() if self.token_secret else None

    if bool(token_id) != bool(token_secret):
        raise ValueError(
            "Modal token_id and token_secret must be configured together."
        )

    return self
ModalOrchestratorFlavor

Bases: BaseOrchestratorFlavor

Modal orchestrator flavor.

Attributes
config_class: Type[ModalOrchestratorConfig] property

Returns the Modal orchestrator config class.

docs_url: Optional[str] property

A URL to point at docs explaining this flavor.

implementation_class: Type[ModalOrchestrator] property

Implementation class for this flavor.

logo_url: str property

A URL to represent the flavor in the dashboard.

name: str property

Name of the flavor.

sdk_docs_url: Optional[str] property

A URL to point at SDK docs explaining this flavor.

ModalOrchestratorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Settings for Modal sandboxes created by the orchestrator.

These settings can apply to the sandbox that controls the run and to child sandboxes that execute individual steps.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
Functions:
modal_sandbox_flavor

Modal sandbox flavor.

Classes
ModalSandboxConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSandboxConfig, ModalSandboxSettings, ModalStepOperatorConfig

Configuration for the Modal sandbox component.

Source code in src/zenml/stack/stack_component.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
ModalSandboxFlavor

Bases: BaseSandboxFlavor

Modal sandbox flavor.

Attributes
config_class: Type[ModalSandboxConfig] property

Config class.

Returns:

Type Description
Type[ModalSandboxConfig]

ModalSandboxConfig.

docs_url: Optional[str] property

URL to user-facing docs for this flavor.

Returns:

Type Description
Optional[str]

The flavor docs URL.

implementation_class: Type[ModalSandbox] property

Implementation class.

Returns:

Type Description
Type[ModalSandbox]

ModalSandbox.

logo_url: str property

Dashboard logo URL.

Returns:

Type Description
str

The flavor logo URL.

name: str property

Flavor name.

Returns:

Type Description
str

"modal".

sdk_docs_url: Optional[str] property

URL to SDK docs for this flavor.

Returns:

Type Description
Optional[str]

The flavor SDK docs URL.

ModalSandboxSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSandboxSettings, ModalStepOperatorSettings

Per-step settings for the Modal sandbox.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
modal_step_operator_flavor

Modal step operator flavor.

Classes
ModalStepOperatorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseStepOperatorConfig, ModalStepOperatorSettings

Configuration for the Modal step operator.

Attributes:

Name Type Description
token_id Optional[str]

Modal API token ID (ak-xxxxx format) for authentication.

token_secret Optional[str]

Modal API token secret (as-xxxxx format) for authentication.

Note: If token_id and token_secret are provided, both fields must be set. If they are not provided, Modal falls back to its default authentication (~/.modal.toml or environment variables). All other configuration options (modal_environment, gpu, region, etc.) are inherited from ModalStepOperatorSettings.

Source code in src/zenml/stack/stack_component.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_remote: bool property

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

Methods:
validate_modal_token_pair() -> ModalStepOperatorConfig

Validate that Modal token fields are configured together.

Source code in src/zenml/integrations/modal/flavors/modal_step_operator_flavor.py
110
111
112
113
114
115
116
117
118
119
120
121
@model_validator(mode="after")
def validate_modal_token_pair(self) -> "ModalStepOperatorConfig":
    """Validate that Modal token fields are configured together."""
    token_id = self.token_id.strip() if self.token_id else None
    token_secret = self.token_secret.strip() if self.token_secret else None

    if bool(token_id) != bool(token_secret):
        raise ValueError(
            "Modal token_id and token_secret must be configured together."
        )

    return self
ModalStepOperatorFlavor

Bases: BaseStepOperatorFlavor

Modal step operator flavor.

Attributes
config_class: Type[ModalStepOperatorConfig] property

Returns ModalStepOperatorConfig config class.

Returns:

Type Description
Type[ModalStepOperatorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[ModalStepOperator] property

Implementation class for this flavor.

Returns:

Type Description
Type[ModalStepOperator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

ModalStepOperatorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Settings for the Modal step operator.

Specifying the region and cloud provider is only available for Enterprise and Team plan customers.

Certain combinations of settings are not available. It is suggested to err on the side of looser settings rather than more restrictive ones to avoid pipeline execution failures. In the case of failures, however, Modal provides detailed error messages that can help identify what is incompatible. See more in the Modal docs at https://modal.com/docs/guide/region-selection.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
Functions:

orchestrators

Modal orchestrator implementation package.

Classes
ModalOrchestrator(*args: Any, **kwargs: Any)

Bases: ContainerizedOrchestrator

Orchestrator that runs ZenML pipelines in Modal Sandboxes.

Initialize the Modal orchestrator.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
119
120
121
122
123
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initialize the Modal orchestrator."""
    super().__init__(*args, **kwargs)
    self._modal_client: Optional["modal.Client"] = None
    self._modal_client_lock = Lock()
Attributes
config: ModalOrchestratorConfig property

Get the Modal orchestrator configuration.

settings_class: Optional[Type[BaseSettings]] property

Get the settings class for the Modal orchestrator.

supported_execution_modes: List[ExecutionMode] property

Get the execution modes supported by the Modal orchestrator.

validator: Optional[StackValidator] property

Validate that the active stack can run in remote Modal Sandboxes.

Methods:
create_static_step_sandbox(*, snapshot: PipelineSnapshotResponse, step_name: str, environment: Dict[str, str]) -> modal.Sandbox

Create a child sandbox for a static pipeline step.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
def create_static_step_sandbox(
    self,
    *,
    snapshot: "PipelineSnapshotResponse",
    step_name: str,
    environment: Dict[str, str],
) -> "modal.Sandbox":
    """Create a child sandbox for a static pipeline step."""
    settings = cast(
        ModalOrchestratorSettings,
        self.get_settings(snapshot.step_configurations[step_name]),
    )
    command = StepEntrypointConfiguration.get_entrypoint_command()
    args = StepEntrypointConfiguration.get_entrypoint_arguments(
        step_name=step_name,
        snapshot_id=snapshot.id,
    )
    image_name = self.get_image(snapshot=snapshot, step_name=step_name)
    return self._create_step_sandbox(
        app_name=os.environ.get(
            ENV_ZENML_MODAL_APP_NAME,
            get_modal_app_name(self.get_orchestrator_run_id()),
        ),
        image_name=image_name,
        settings=settings,
        resource_settings=snapshot.step_configurations[
            step_name
        ].config.resource_settings,
        environment=environment,
        entrypoint_command=command + args,
    )
fetch_status(run: PipelineRunResponse, include_steps: bool = False) -> Tuple[Optional[ExecutionStatus], Optional[Dict[str, ExecutionStatus]]]

Fetch Modal sandbox status for a pipeline run and its steps.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
def fetch_status(
    self, run: "PipelineRunResponse", include_steps: bool = False
) -> Tuple[
    Optional[ExecutionStatus], Optional[Dict[str, ExecutionStatus]]
]:
    """Fetch Modal sandbox status for a pipeline run and its steps."""
    pipeline_status = None
    sandbox_id = _metadata_value(
        run.run_metadata, MODAL_ORCHESTRATION_SANDBOX_ID_METADATA_KEY
    )
    if sandbox_id and not run.status.is_finished:
        try:
            status = sandbox_utils.get_sandbox_status(
                sandbox_id, modal_client=self.get_modal_client()
            )
        except Exception as e:
            logger.warning(
                "Failed to fetch Modal orchestration sandbox `%s`: %s",
                sandbox_id,
                e,
            )
        else:
            pipeline_status = status
    elif not sandbox_id:
        logger.warning(
            "No Modal orchestration sandbox metadata found for run `%s`.",
            run.id,
        )

    step_statuses = None
    if include_steps:
        step_statuses = {}
        run_with_steps = self._get_fresh_pipeline_run(run)
        try:
            steps = run_with_steps.steps
        except Exception:
            logger.debug(
                "Pipeline run `%s` is not hydrated with step metadata.",
                run.id,
                exc_info=True,
            )
            steps = {}

        fallback_sandbox_ids = _metadata_values_with_prefix(
            run_with_steps.run_metadata,
            MODAL_STATIC_STEP_SANDBOX_ID_METADATA_KEY_PREFIX,
        )

        for step_name, step_run in steps.items():
            if step_run.status.is_finished:
                continue

            step_sandbox_id = _metadata_value(
                step_run.run_metadata, MODAL_SANDBOX_ID_METADATA_KEY
            )
            if not step_sandbox_id:
                step_sandbox_id = fallback_sandbox_ids.get(step_name)
            if not step_sandbox_id:
                continue

            try:
                status = sandbox_utils.get_sandbox_status(
                    step_sandbox_id,
                    modal_client=self.get_modal_client(),
                )
                step_statuses[step_name] = status
            except Exception as e:
                logger.warning(
                    "Failed to fetch Modal sandbox `%s` for step `%s`: %s",
                    step_sandbox_id,
                    step_name,
                    e,
                )

    return pipeline_status, step_statuses
get_isolated_step_status(step_run: StepRunResponse) -> ExecutionStatus

Get the status of a dynamic isolated step sandbox.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
def get_isolated_step_status(
    self, step_run: "StepRunResponse"
) -> ExecutionStatus:
    """Get the status of a dynamic isolated step sandbox."""
    sandbox_id = _metadata_value(
        step_run.run_metadata, MODAL_SANDBOX_ID_METADATA_KEY
    )
    if not sandbox_id:
        logger.warning(
            "No Modal sandbox metadata found for step run `%s`.",
            step_run.id,
        )
        return step_run.status

    try:
        return sandbox_utils.get_sandbox_status(
            sandbox_id, modal_client=self.get_modal_client()
        )
    except Exception as e:
        logger.warning(
            "Failed to fetch Modal sandbox `%s` for step run `%s`: %s",
            sandbox_id,
            step_run.id,
            e,
        )
        return ExecutionStatus.FAILED
get_modal_client() -> Optional[modal.Client]

Get the Modal client used by orchestrator entrypoints.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def get_modal_client(self) -> Optional["modal.Client"]:
    """Get the Modal client used by orchestrator entrypoints."""
    if (
        self._modal_client is not None
        and not self._modal_client.is_closed()
    ):
        return self._modal_client

    with self._modal_client_lock:
        if (
            self._modal_client is not None
            and not self._modal_client.is_closed()
        ):
            return self._modal_client

        self._modal_client = (
            sandbox_utils.create_modal_client_from_credentials(
                token_id=self.config.token_id,
                token_secret=self.config.token_secret,
            )
        )
        return self._modal_client
get_orchestrator_run_id() -> str

Return the stable Modal run ID from the sandbox environment.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
245
246
247
248
249
250
251
252
253
def get_orchestrator_run_id(self) -> str:
    """Return the stable Modal run ID from the sandbox environment."""
    try:
        return os.environ[ENV_ZENML_MODAL_RUN_ID]
    except KeyError as e:
        raise RuntimeError(
            f"Unable to get Modal orchestrator run ID from the "
            f"{ENV_ZENML_MODAL_RUN_ID} environment variable."
        ) from e
get_pipeline_run_metadata(run_id: UUID) -> Dict[str, MetadataType]

Get run metadata from the Modal sandbox environment.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
842
843
844
845
846
847
848
849
850
851
def get_pipeline_run_metadata(
    self, run_id: UUID
) -> Dict[str, MetadataType]:
    """Get run metadata from the Modal sandbox environment."""
    metadata: Dict[str, MetadataType] = {
        METADATA_ORCHESTRATOR_RUN_ID: self.get_orchestrator_run_id()
    }
    if app_name := os.environ.get(ENV_ZENML_MODAL_APP_NAME):
        metadata[MODAL_APP_NAME_METADATA_KEY] = app_name
    return metadata
get_step_sandbox_metadata(settings: ModalOrchestratorSettings, sandbox_id: str) -> Dict[str, MetadataType] staticmethod

Build step sandbox metadata for orchestrator entrypoints.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
548
549
550
551
552
553
554
555
556
557
558
559
560
561
@staticmethod
def get_step_sandbox_metadata(
    settings: ModalOrchestratorSettings, sandbox_id: str
) -> Dict[str, MetadataType]:
    """Build step sandbox metadata for orchestrator entrypoints."""
    metadata: Dict[str, MetadataType] = {
        MODAL_SANDBOX_ID_METADATA_KEY: sandbox_id
    }
    modal_environment = sandbox_utils.normalize_optional_config_value(
        settings.modal_environment
    )
    if modal_environment:
        metadata[MODAL_ENVIRONMENT_METADATA_KEY] = modal_environment
    return metadata
stop_isolated_step(step_run: StepRunResponse) -> None

Stop a dynamic isolated step sandbox if it is still running.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
def stop_isolated_step(self, step_run: "StepRunResponse") -> None:
    """Stop a dynamic isolated step sandbox if it is still running."""
    sandbox_id = _metadata_value(
        step_run.run_metadata, MODAL_SANDBOX_ID_METADATA_KEY
    )
    if not sandbox_id:
        logger.warning(
            "No Modal sandbox metadata found for step run `%s`.",
            step_run.id,
        )
        return

    self.terminate_sandbox_if_running(
        sandbox_id=sandbox_id,
        description=f"step run `{step_run.id}`",
    )
submit_dynamic_pipeline(snapshot: PipelineSnapshotResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Optional[SubmissionResult]

Submit a dynamic pipeline orchestration sandbox to Modal.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
def submit_dynamic_pipeline(
    self,
    snapshot: "PipelineSnapshotResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Optional[SubmissionResult]:
    """Submit a dynamic pipeline orchestration sandbox to Modal."""
    if snapshot.schedule:
        raise RuntimeError(
            "Scheduling dynamic pipelines is not supported for the Modal "
            "orchestrator yet."
        )

    command = (
        DynamicPipelineEntrypointConfiguration.get_entrypoint_command()
    )
    args = DynamicPipelineEntrypointConfiguration.get_entrypoint_arguments(
        snapshot_id=snapshot.id,
        run_id=placeholder_run.id if placeholder_run else None,
    )

    return self._submit_orchestration_sandbox(
        snapshot=snapshot,
        stack=stack,
        entrypoint_command=command + args,
        environment=environment,
        placeholder_run=placeholder_run,
    )
submit_isolated_step(step_run_info: StepRunInfo, environment: Dict[str, str]) -> None

Submit a dynamic isolated step sandbox to Modal.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
def submit_isolated_step(
    self, step_run_info: "StepRunInfo", environment: Dict[str, str]
) -> None:
    """Submit a dynamic isolated step sandbox to Modal."""
    command, args = orchestrator_utils.get_step_entrypoint_command(
        invocation_id=step_run_info.pipeline_step_name,
        config=step_run_info.config,
        entrypoint_config_class=StepOperatorEntrypointConfiguration,
        snapshot_id=step_run_info.snapshot.id,
        step_run_id=str(step_run_info.step_run_id),
    )

    image_name = step_run_info.get_image(key=ORCHESTRATOR_DOCKER_IMAGE_KEY)
    settings = cast(
        ModalOrchestratorSettings, self.get_settings(step_run_info)
    )
    modal_run_id = os.environ.get(
        ENV_ZENML_MODAL_RUN_ID, str(step_run_info.run_id)
    )
    app_name = os.environ.get(
        ENV_ZENML_MODAL_APP_NAME, get_modal_app_name(modal_run_id)
    )
    sandbox_environment = environment.copy()
    sandbox_environment[ENV_ZENML_MODAL_RUN_ID] = modal_run_id
    sandbox_environment[ENV_ZENML_MODAL_APP_NAME] = app_name

    sandbox = self._create_step_sandbox(
        app_name=app_name,
        image_name=image_name,
        settings=settings,
        resource_settings=step_run_info.config.resource_settings,
        environment=sandbox_environment,
        entrypoint_command=command + args,
    )

    metadata = self.get_step_sandbox_metadata(settings, sandbox.object_id)
    try:
        publish_step_run_metadata(
            step_run_info.step_run_id,
            {self.id: metadata},
        )
        step_run_info.step_run.run_metadata.update(metadata)
    except Exception:
        self._terminate_created_sandbox_after_metadata_error(sandbox)
        raise
submit_pipeline(snapshot: PipelineSnapshotResponse, stack: Stack, base_environment: Dict[str, str], step_environments: Dict[str, Dict[str, str]], placeholder_run: Optional[PipelineRunResponse] = None) -> Optional[SubmissionResult]

Submit a static pipeline controller sandbox to Modal.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def submit_pipeline(
    self,
    snapshot: "PipelineSnapshotResponse",
    stack: "Stack",
    base_environment: Dict[str, str],
    step_environments: Dict[str, Dict[str, str]],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Optional[SubmissionResult]:
    """Submit a static pipeline controller sandbox to Modal."""
    if snapshot.schedule:
        raise RuntimeError(
            "Scheduling static pipelines is not supported for the Modal "
            "orchestrator yet."
        )

    from zenml.integrations.modal.orchestrators.modal_orchestrator_entrypoint import (
        ModalOrchestratorEntrypointConfiguration,
    )

    # The static Modal controller recomputes per-step environments after
    # it loads the active stack and run-scoped secrets inside Modal.
    del step_environments

    command = (
        ModalOrchestratorEntrypointConfiguration.get_entrypoint_command()
    )
    args = (
        ModalOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
            snapshot_id=snapshot.id,
            run_id=placeholder_run.id if placeholder_run else None,
        )
    )

    return self._submit_orchestration_sandbox(
        snapshot=snapshot,
        stack=stack,
        entrypoint_command=command + args,
        environment=base_environment,
        placeholder_run=placeholder_run,
    )
terminate_sandbox_if_running(*, sandbox_id: str, description: str) -> None

Terminate a running Modal sandbox for an orchestrator entrypoint.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
def terminate_sandbox_if_running(
    self, *, sandbox_id: str, description: str
) -> None:
    """Terminate a running Modal sandbox for an orchestrator entrypoint."""
    sandbox = sandbox_utils.get_sandbox_by_id(
        sandbox_id, modal_client=self.get_modal_client()
    )
    if sandbox.poll() is not None:
        logger.debug(
            "Modal sandbox for %s is already finished.", description
        )
        return

    sandbox.terminate()
    logger.info("Terminated Modal sandbox for %s.", description)
Modules
modal_orchestrator

Modal orchestrator implementation.

Classes
ModalOrchestrator(*args: Any, **kwargs: Any)

Bases: ContainerizedOrchestrator

Orchestrator that runs ZenML pipelines in Modal Sandboxes.

Initialize the Modal orchestrator.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
119
120
121
122
123
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initialize the Modal orchestrator."""
    super().__init__(*args, **kwargs)
    self._modal_client: Optional["modal.Client"] = None
    self._modal_client_lock = Lock()
Attributes
config: ModalOrchestratorConfig property

Get the Modal orchestrator configuration.

settings_class: Optional[Type[BaseSettings]] property

Get the settings class for the Modal orchestrator.

supported_execution_modes: List[ExecutionMode] property

Get the execution modes supported by the Modal orchestrator.

validator: Optional[StackValidator] property

Validate that the active stack can run in remote Modal Sandboxes.

Methods:
create_static_step_sandbox(*, snapshot: PipelineSnapshotResponse, step_name: str, environment: Dict[str, str]) -> modal.Sandbox

Create a child sandbox for a static pipeline step.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
def create_static_step_sandbox(
    self,
    *,
    snapshot: "PipelineSnapshotResponse",
    step_name: str,
    environment: Dict[str, str],
) -> "modal.Sandbox":
    """Create a child sandbox for a static pipeline step."""
    settings = cast(
        ModalOrchestratorSettings,
        self.get_settings(snapshot.step_configurations[step_name]),
    )
    command = StepEntrypointConfiguration.get_entrypoint_command()
    args = StepEntrypointConfiguration.get_entrypoint_arguments(
        step_name=step_name,
        snapshot_id=snapshot.id,
    )
    image_name = self.get_image(snapshot=snapshot, step_name=step_name)
    return self._create_step_sandbox(
        app_name=os.environ.get(
            ENV_ZENML_MODAL_APP_NAME,
            get_modal_app_name(self.get_orchestrator_run_id()),
        ),
        image_name=image_name,
        settings=settings,
        resource_settings=snapshot.step_configurations[
            step_name
        ].config.resource_settings,
        environment=environment,
        entrypoint_command=command + args,
    )
fetch_status(run: PipelineRunResponse, include_steps: bool = False) -> Tuple[Optional[ExecutionStatus], Optional[Dict[str, ExecutionStatus]]]

Fetch Modal sandbox status for a pipeline run and its steps.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
def fetch_status(
    self, run: "PipelineRunResponse", include_steps: bool = False
) -> Tuple[
    Optional[ExecutionStatus], Optional[Dict[str, ExecutionStatus]]
]:
    """Fetch Modal sandbox status for a pipeline run and its steps."""
    pipeline_status = None
    sandbox_id = _metadata_value(
        run.run_metadata, MODAL_ORCHESTRATION_SANDBOX_ID_METADATA_KEY
    )
    if sandbox_id and not run.status.is_finished:
        try:
            status = sandbox_utils.get_sandbox_status(
                sandbox_id, modal_client=self.get_modal_client()
            )
        except Exception as e:
            logger.warning(
                "Failed to fetch Modal orchestration sandbox `%s`: %s",
                sandbox_id,
                e,
            )
        else:
            pipeline_status = status
    elif not sandbox_id:
        logger.warning(
            "No Modal orchestration sandbox metadata found for run `%s`.",
            run.id,
        )

    step_statuses = None
    if include_steps:
        step_statuses = {}
        run_with_steps = self._get_fresh_pipeline_run(run)
        try:
            steps = run_with_steps.steps
        except Exception:
            logger.debug(
                "Pipeline run `%s` is not hydrated with step metadata.",
                run.id,
                exc_info=True,
            )
            steps = {}

        fallback_sandbox_ids = _metadata_values_with_prefix(
            run_with_steps.run_metadata,
            MODAL_STATIC_STEP_SANDBOX_ID_METADATA_KEY_PREFIX,
        )

        for step_name, step_run in steps.items():
            if step_run.status.is_finished:
                continue

            step_sandbox_id = _metadata_value(
                step_run.run_metadata, MODAL_SANDBOX_ID_METADATA_KEY
            )
            if not step_sandbox_id:
                step_sandbox_id = fallback_sandbox_ids.get(step_name)
            if not step_sandbox_id:
                continue

            try:
                status = sandbox_utils.get_sandbox_status(
                    step_sandbox_id,
                    modal_client=self.get_modal_client(),
                )
                step_statuses[step_name] = status
            except Exception as e:
                logger.warning(
                    "Failed to fetch Modal sandbox `%s` for step `%s`: %s",
                    step_sandbox_id,
                    step_name,
                    e,
                )

    return pipeline_status, step_statuses
get_isolated_step_status(step_run: StepRunResponse) -> ExecutionStatus

Get the status of a dynamic isolated step sandbox.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
def get_isolated_step_status(
    self, step_run: "StepRunResponse"
) -> ExecutionStatus:
    """Get the status of a dynamic isolated step sandbox."""
    sandbox_id = _metadata_value(
        step_run.run_metadata, MODAL_SANDBOX_ID_METADATA_KEY
    )
    if not sandbox_id:
        logger.warning(
            "No Modal sandbox metadata found for step run `%s`.",
            step_run.id,
        )
        return step_run.status

    try:
        return sandbox_utils.get_sandbox_status(
            sandbox_id, modal_client=self.get_modal_client()
        )
    except Exception as e:
        logger.warning(
            "Failed to fetch Modal sandbox `%s` for step run `%s`: %s",
            sandbox_id,
            step_run.id,
            e,
        )
        return ExecutionStatus.FAILED
get_modal_client() -> Optional[modal.Client]

Get the Modal client used by orchestrator entrypoints.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def get_modal_client(self) -> Optional["modal.Client"]:
    """Get the Modal client used by orchestrator entrypoints."""
    if (
        self._modal_client is not None
        and not self._modal_client.is_closed()
    ):
        return self._modal_client

    with self._modal_client_lock:
        if (
            self._modal_client is not None
            and not self._modal_client.is_closed()
        ):
            return self._modal_client

        self._modal_client = (
            sandbox_utils.create_modal_client_from_credentials(
                token_id=self.config.token_id,
                token_secret=self.config.token_secret,
            )
        )
        return self._modal_client
get_orchestrator_run_id() -> str

Return the stable Modal run ID from the sandbox environment.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
245
246
247
248
249
250
251
252
253
def get_orchestrator_run_id(self) -> str:
    """Return the stable Modal run ID from the sandbox environment."""
    try:
        return os.environ[ENV_ZENML_MODAL_RUN_ID]
    except KeyError as e:
        raise RuntimeError(
            f"Unable to get Modal orchestrator run ID from the "
            f"{ENV_ZENML_MODAL_RUN_ID} environment variable."
        ) from e
get_pipeline_run_metadata(run_id: UUID) -> Dict[str, MetadataType]

Get run metadata from the Modal sandbox environment.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
842
843
844
845
846
847
848
849
850
851
def get_pipeline_run_metadata(
    self, run_id: UUID
) -> Dict[str, MetadataType]:
    """Get run metadata from the Modal sandbox environment."""
    metadata: Dict[str, MetadataType] = {
        METADATA_ORCHESTRATOR_RUN_ID: self.get_orchestrator_run_id()
    }
    if app_name := os.environ.get(ENV_ZENML_MODAL_APP_NAME):
        metadata[MODAL_APP_NAME_METADATA_KEY] = app_name
    return metadata
get_step_sandbox_metadata(settings: ModalOrchestratorSettings, sandbox_id: str) -> Dict[str, MetadataType] staticmethod

Build step sandbox metadata for orchestrator entrypoints.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
548
549
550
551
552
553
554
555
556
557
558
559
560
561
@staticmethod
def get_step_sandbox_metadata(
    settings: ModalOrchestratorSettings, sandbox_id: str
) -> Dict[str, MetadataType]:
    """Build step sandbox metadata for orchestrator entrypoints."""
    metadata: Dict[str, MetadataType] = {
        MODAL_SANDBOX_ID_METADATA_KEY: sandbox_id
    }
    modal_environment = sandbox_utils.normalize_optional_config_value(
        settings.modal_environment
    )
    if modal_environment:
        metadata[MODAL_ENVIRONMENT_METADATA_KEY] = modal_environment
    return metadata
stop_isolated_step(step_run: StepRunResponse) -> None

Stop a dynamic isolated step sandbox if it is still running.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
def stop_isolated_step(self, step_run: "StepRunResponse") -> None:
    """Stop a dynamic isolated step sandbox if it is still running."""
    sandbox_id = _metadata_value(
        step_run.run_metadata, MODAL_SANDBOX_ID_METADATA_KEY
    )
    if not sandbox_id:
        logger.warning(
            "No Modal sandbox metadata found for step run `%s`.",
            step_run.id,
        )
        return

    self.terminate_sandbox_if_running(
        sandbox_id=sandbox_id,
        description=f"step run `{step_run.id}`",
    )
submit_dynamic_pipeline(snapshot: PipelineSnapshotResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Optional[SubmissionResult]

Submit a dynamic pipeline orchestration sandbox to Modal.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
def submit_dynamic_pipeline(
    self,
    snapshot: "PipelineSnapshotResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Optional[SubmissionResult]:
    """Submit a dynamic pipeline orchestration sandbox to Modal."""
    if snapshot.schedule:
        raise RuntimeError(
            "Scheduling dynamic pipelines is not supported for the Modal "
            "orchestrator yet."
        )

    command = (
        DynamicPipelineEntrypointConfiguration.get_entrypoint_command()
    )
    args = DynamicPipelineEntrypointConfiguration.get_entrypoint_arguments(
        snapshot_id=snapshot.id,
        run_id=placeholder_run.id if placeholder_run else None,
    )

    return self._submit_orchestration_sandbox(
        snapshot=snapshot,
        stack=stack,
        entrypoint_command=command + args,
        environment=environment,
        placeholder_run=placeholder_run,
    )
submit_isolated_step(step_run_info: StepRunInfo, environment: Dict[str, str]) -> None

Submit a dynamic isolated step sandbox to Modal.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
def submit_isolated_step(
    self, step_run_info: "StepRunInfo", environment: Dict[str, str]
) -> None:
    """Submit a dynamic isolated step sandbox to Modal."""
    command, args = orchestrator_utils.get_step_entrypoint_command(
        invocation_id=step_run_info.pipeline_step_name,
        config=step_run_info.config,
        entrypoint_config_class=StepOperatorEntrypointConfiguration,
        snapshot_id=step_run_info.snapshot.id,
        step_run_id=str(step_run_info.step_run_id),
    )

    image_name = step_run_info.get_image(key=ORCHESTRATOR_DOCKER_IMAGE_KEY)
    settings = cast(
        ModalOrchestratorSettings, self.get_settings(step_run_info)
    )
    modal_run_id = os.environ.get(
        ENV_ZENML_MODAL_RUN_ID, str(step_run_info.run_id)
    )
    app_name = os.environ.get(
        ENV_ZENML_MODAL_APP_NAME, get_modal_app_name(modal_run_id)
    )
    sandbox_environment = environment.copy()
    sandbox_environment[ENV_ZENML_MODAL_RUN_ID] = modal_run_id
    sandbox_environment[ENV_ZENML_MODAL_APP_NAME] = app_name

    sandbox = self._create_step_sandbox(
        app_name=app_name,
        image_name=image_name,
        settings=settings,
        resource_settings=step_run_info.config.resource_settings,
        environment=sandbox_environment,
        entrypoint_command=command + args,
    )

    metadata = self.get_step_sandbox_metadata(settings, sandbox.object_id)
    try:
        publish_step_run_metadata(
            step_run_info.step_run_id,
            {self.id: metadata},
        )
        step_run_info.step_run.run_metadata.update(metadata)
    except Exception:
        self._terminate_created_sandbox_after_metadata_error(sandbox)
        raise
submit_pipeline(snapshot: PipelineSnapshotResponse, stack: Stack, base_environment: Dict[str, str], step_environments: Dict[str, Dict[str, str]], placeholder_run: Optional[PipelineRunResponse] = None) -> Optional[SubmissionResult]

Submit a static pipeline controller sandbox to Modal.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def submit_pipeline(
    self,
    snapshot: "PipelineSnapshotResponse",
    stack: "Stack",
    base_environment: Dict[str, str],
    step_environments: Dict[str, Dict[str, str]],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Optional[SubmissionResult]:
    """Submit a static pipeline controller sandbox to Modal."""
    if snapshot.schedule:
        raise RuntimeError(
            "Scheduling static pipelines is not supported for the Modal "
            "orchestrator yet."
        )

    from zenml.integrations.modal.orchestrators.modal_orchestrator_entrypoint import (
        ModalOrchestratorEntrypointConfiguration,
    )

    # The static Modal controller recomputes per-step environments after
    # it loads the active stack and run-scoped secrets inside Modal.
    del step_environments

    command = (
        ModalOrchestratorEntrypointConfiguration.get_entrypoint_command()
    )
    args = (
        ModalOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
            snapshot_id=snapshot.id,
            run_id=placeholder_run.id if placeholder_run else None,
        )
    )

    return self._submit_orchestration_sandbox(
        snapshot=snapshot,
        stack=stack,
        entrypoint_command=command + args,
        environment=base_environment,
        placeholder_run=placeholder_run,
    )
terminate_sandbox_if_running(*, sandbox_id: str, description: str) -> None

Terminate a running Modal sandbox for an orchestrator entrypoint.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
def terminate_sandbox_if_running(
    self, *, sandbox_id: str, description: str
) -> None:
    """Terminate a running Modal sandbox for an orchestrator entrypoint."""
    sandbox = sandbox_utils.get_sandbox_by_id(
        sandbox_id, modal_client=self.get_modal_client()
    )
    if sandbox.poll() is not None:
        logger.debug(
            "Modal sandbox for %s is already finished.", description
        )
        return

    sandbox.terminate()
    logger.info("Terminated Modal sandbox for %s.", description)
Functions:
get_modal_app_name(modal_run_id: str) -> str

Build the deterministic Modal app name for a ZenML run.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
81
82
83
def get_modal_app_name(modal_run_id: str) -> str:
    """Build the deterministic Modal app name for a ZenML run."""
    return f"zenml-{modal_run_id}"[:64]
get_static_step_sandbox_metadata_key(step_name: str) -> str

Build an append-safe run metadata key for a static child sandbox.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
111
112
113
def get_static_step_sandbox_metadata_key(step_name: str) -> str:
    """Build an append-safe run metadata key for a static child sandbox."""
    return f"{MODAL_STATIC_STEP_SANDBOX_ID_METADATA_KEY_PREFIX}{step_name}"
Modules
modal_orchestrator_entrypoint

Static Modal orchestration entrypoint.

Classes
ModalOrchestratorEntrypointConfiguration(arguments: List[str])

Bases: BaseEntrypointConfiguration

Entrypoint configuration for the static Modal controller sandbox.

Source code in src/zenml/entrypoints/base_entrypoint_configuration.py
61
62
63
64
65
66
67
68
def __init__(self, arguments: List[str]):
    """Initializes the entrypoint configuration.

    Args:
        arguments: Command line arguments to configure this object.
    """
    self.entrypoint_args = self._parse_arguments(arguments)
    self._snapshot: Optional["PipelineSnapshotResponse"] = None
Methods:
get_entrypoint_arguments(**kwargs: Any) -> List[str] classmethod

Get command arguments for the static Modal controller.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator_entrypoint.py
380
381
382
383
384
385
386
@classmethod
def get_entrypoint_arguments(cls, **kwargs: Any) -> List[str]:
    """Get command arguments for the static Modal controller."""
    args = super().get_entrypoint_arguments(**kwargs)
    if run_id := kwargs.get(RUN_ID_OPTION):
        args.extend([f"--{RUN_ID_OPTION}", str(run_id)])
    return args
get_entrypoint_options() -> Dict[str, bool] classmethod

Get options required by the static Modal controller.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator_entrypoint.py
375
376
377
378
@classmethod
def get_entrypoint_options(cls) -> Dict[str, bool]:
    """Get options required by the static Modal controller."""
    return super().get_entrypoint_options() | {RUN_ID_OPTION: False}
run() -> None

Start child Modal sandboxes for all steps in a static pipeline.

Source code in src/zenml/integrations/modal/orchestrators/modal_orchestrator_entrypoint.py
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
def run(self) -> None:
    """Start child Modal sandboxes for all steps in a static pipeline."""
    snapshot = self.snapshot

    client = Client()
    active_stack = client.active_stack
    orchestrator = active_stack.orchestrator
    if not isinstance(orchestrator, ModalOrchestrator):
        raise RuntimeError(
            "The active stack orchestrator is not a ModalOrchestrator."
        )

    modal_run_id = orchestrator.get_orchestrator_run_id()
    pipeline_run = self._get_or_create_pipeline_run(
        snapshot_id=snapshot.id,
        modal_run_id=modal_run_id,
    )

    shared_env, secrets = get_config_environment_vars(
        pipeline_run_id=pipeline_run.id
    )
    shared_env.update(secrets)
    shared_env[ENV_ZENML_MODAL_RUN_ID] = modal_run_id
    if app_name := os.environ.get(ENV_ZENML_MODAL_APP_NAME):
        shared_env[ENV_ZENML_MODAL_APP_NAME] = app_name

    controller = _StaticModalPipelineController(
        snapshot=snapshot,
        pipeline_run=pipeline_run,
        active_stack=active_stack,
        orchestrator=orchestrator,
        client=client,
        shared_env=shared_env,
        step_run_request_factory=StepRunRequestFactory(
            snapshot=snapshot,
            pipeline_run=pipeline_run,
            stack=active_stack,
        ),
    )
    node_statuses = DagRunner(
        nodes=controller.build_nodes(),
        node_startup_function=controller.start_step_sandbox,
        node_monitoring_function=controller.check_step_sandbox,
        node_stop_function=controller.stop_step_sandbox,
        interrupt_function=controller.should_interrupt_execution,
    ).run()

    self._finalize_pipeline_run(pipeline_run.id, node_statuses)
    logger.info("Modal orchestration sandbox finished.")
Functions: Modules

sandbox_utils

Shared Modal Sandbox helpers.

Classes
ModalSandboxSettings

Bases: Protocol

Fields shared by Modal component settings that describe a sandbox.

Attributes
cloud: Optional[str] property

Cloud provider for the sandbox, if any.

gpu: Optional[str] property

GPU type requested for the sandbox, if any.

region: Optional[str] property

Cloud region for the sandbox, if any.

timeout: int property

Maximum sandbox lifetime in seconds.

Functions:
build_sandbox_create_kwargs(*, app: modal.App, image: modal.Image, settings: ModalSandboxSettings, resource_settings: ResourceSettings, environment: Dict[str, str], modal_client: Optional[modal.Client], gpu_settings_field: str = DEFAULT_GPU_SETTINGS_FIELD, gpu_settings_example: str = DEFAULT_GPU_SETTINGS_EXAMPLE) -> Dict[str, Any]

Build keyword arguments for modal.Sandbox.create.

Source code in src/zenml/integrations/modal/sandbox_utils.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
def build_sandbox_create_kwargs(
    *,
    app: "modal.App",
    image: "modal.Image",
    settings: ModalSandboxSettings,
    resource_settings: ResourceSettings,
    environment: Dict[str, str],
    modal_client: Optional["modal.Client"],
    gpu_settings_field: str = DEFAULT_GPU_SETTINGS_FIELD,
    gpu_settings_example: str = DEFAULT_GPU_SETTINGS_EXAMPLE,
) -> Dict[str, Any]:
    """Build keyword arguments for ``modal.Sandbox.create``."""
    sandbox_environment, sensitive_environment = (
        split_modal_runtime_environment(environment)
    )
    runtime_secrets = create_runtime_secrets(sensitive_environment)

    sandbox_create_kwargs: Dict[str, Any] = {
        "app": app,
        "image": image,
        "gpu": get_gpu_values(
            settings,
            resource_settings,
            gpu_settings_field=gpu_settings_field,
            gpu_settings_example=gpu_settings_example,
        ),
        "cpu": resource_settings.cpu_count,
        "memory": get_memory_mb(resource_settings),
        "cloud": settings.cloud,
        "region": settings.region,
        "timeout": settings.timeout,
        "env": sandbox_environment,
        "client": modal_client,
    }
    if runtime_secrets:
        sandbox_create_kwargs["secrets"] = runtime_secrets

    return sandbox_create_kwargs
create_modal_client_from_credentials(*, token_id: Optional[str], token_secret: Optional[str]) -> Optional[modal.Client]

Create an explicit Modal client from component credentials.

If no component credentials are configured, this returns None so the Modal SDK can use its ambient authentication configuration. If only one credential value is configured, the component configuration is invalid.

Source code in src/zenml/integrations/modal/sandbox_utils.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def create_modal_client_from_credentials(
    *,
    token_id: Optional[str],
    token_secret: Optional[str],
) -> Optional["modal.Client"]:
    """Create an explicit Modal client from component credentials.

    If no component credentials are configured, this returns ``None`` so the
    Modal SDK can use its ambient authentication configuration. If only one
    credential value is configured, the component configuration is invalid.
    """
    normalized_token_id = normalize_optional_config_value(token_id)
    normalized_token_secret = normalize_optional_config_value(token_secret)

    if bool(normalized_token_id) != bool(normalized_token_secret):
        raise StackComponentInterfaceError(
            "Modal token_id and token_secret must be configured together."
        )

    if not normalized_token_id or not normalized_token_secret:
        return None

    return modal.Client.from_credentials(
        normalized_token_id,
        normalized_token_secret,
    )
create_modal_sandbox(entrypoint_command: List[str], *, app: modal.App, image: modal.Image, settings: ModalSandboxSettings, resource_settings: ResourceSettings, environment: Dict[str, str], modal_client: Optional[modal.Client], gpu_settings_field: str = DEFAULT_GPU_SETTINGS_FIELD, gpu_settings_example: str = DEFAULT_GPU_SETTINGS_EXAMPLE) -> modal.Sandbox

Create a Modal sandbox for a ZenML entrypoint command.

Source code in src/zenml/integrations/modal/sandbox_utils.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
def create_modal_sandbox(
    entrypoint_command: List[str],
    *,
    app: "modal.App",
    image: "modal.Image",
    settings: ModalSandboxSettings,
    resource_settings: ResourceSettings,
    environment: Dict[str, str],
    modal_client: Optional["modal.Client"],
    gpu_settings_field: str = DEFAULT_GPU_SETTINGS_FIELD,
    gpu_settings_example: str = DEFAULT_GPU_SETTINGS_EXAMPLE,
) -> "modal.Sandbox":
    """Create a Modal sandbox for a ZenML entrypoint command."""
    sandbox_create_kwargs = build_sandbox_create_kwargs(
        app=app,
        image=image,
        settings=settings,
        resource_settings=resource_settings,
        environment=environment,
        modal_client=modal_client,
        gpu_settings_field=gpu_settings_field,
        gpu_settings_example=gpu_settings_example,
    )
    return modal.Sandbox.create(*entrypoint_command, **sandbox_create_kwargs)
create_runtime_secrets(sensitive_environment: Dict[str, Optional[str]]) -> List[modal.Secret]

Create Modal secrets for sensitive runtime environment values.

Source code in src/zenml/integrations/modal/sandbox_utils.py
277
278
279
280
281
282
283
284
def create_runtime_secrets(
    sensitive_environment: Dict[str, Optional[str]],
) -> List["modal.Secret"]:
    """Create Modal secrets for sensitive runtime environment values."""
    if not sensitive_environment:
        return []

    return [modal.Secret.from_dict(sensitive_environment)]
get_gpu_values(settings: ModalSandboxSettings, resource_settings: ResourceSettings, *, gpu_settings_field: str = DEFAULT_GPU_SETTINGS_FIELD, gpu_settings_example: str = DEFAULT_GPU_SETTINGS_EXAMPLE) -> Optional[str]

Compute and validate the Modal gpu argument string.

Modal expects GPU resources as either None (CPU only), a GPU type string like "A100" (implicitly a single GPU), or "A100:2" when multiple GPUs of the same type are requested. Within ZenML, the GPU type is captured in Modal component settings while the count lives in :class:~zenml.config.resource_settings.ResourceSettings. This helper reconciles both sources.

Parameters:

Name Type Description Default
settings ModalSandboxSettings

Modal component settings describing the GPU type.

required
resource_settings ResourceSettings

Resource constraints providing the GPU count.

required
gpu_settings_field str

Settings field name to mention in GPU errors.

DEFAULT_GPU_SETTINGS_FIELD
gpu_settings_example str

Example configuration to mention in GPU errors.

DEFAULT_GPU_SETTINGS_EXAMPLE

Returns:

Type Description
Optional[str]

A Modal-compatible GPU specification string or None when running on

Optional[str]

CPU.

Raises:

Type Description
StackComponentInterfaceError

If the configuration is inconsistent or invalid.

Source code in src/zenml/integrations/modal/sandbox_utils.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def get_gpu_values(
    settings: ModalSandboxSettings,
    resource_settings: ResourceSettings,
    *,
    gpu_settings_field: str = DEFAULT_GPU_SETTINGS_FIELD,
    gpu_settings_example: str = DEFAULT_GPU_SETTINGS_EXAMPLE,
) -> Optional[str]:
    """Compute and validate the Modal ``gpu`` argument string.

    Modal expects GPU resources as either ``None`` (CPU only), a GPU type string
    like ``"A100"`` (implicitly a single GPU), or ``"A100:2"`` when multiple
    GPUs of the same type are requested. Within ZenML, the GPU type is captured
    in Modal component settings while the count lives in
    :class:`~zenml.config.resource_settings.ResourceSettings`. This helper
    reconciles both sources.

    Args:
        settings: Modal component settings describing the GPU type.
        resource_settings: Resource constraints providing the GPU count.
        gpu_settings_field: Settings field name to mention in GPU errors.
        gpu_settings_example: Example configuration to mention in GPU errors.

    Returns:
        A Modal-compatible GPU specification string or ``None`` when running on
        CPU.

    Raises:
        StackComponentInterfaceError: If the configuration is inconsistent or
            invalid.
    """
    gpu_type = normalize_optional_config_value(settings.gpu)
    gpu_count = resource_settings.gpu_count

    if gpu_type is None:
        if gpu_count is not None and gpu_count > 0:
            raise StackComponentInterfaceError(
                "GPU resources requested (gpu_count > 0) but no GPU type was "
                "specified in Modal settings. Please set a GPU type (e.g., "
                f"'T4', 'A100') via "
                f"{gpu_settings_field}. "
                f"{gpu_settings_example}"
            )
        return None

    if gpu_count == 0:
        logger.warning(
            "Modal GPU type '%s' is configured but ResourceSettings.gpu_count "
            "is 0. Ignoring the GPU type and running on CPU only.",
            gpu_type,
        )
        return None

    if gpu_count is None:
        return gpu_type

    return f"{gpu_type}:{gpu_count}"
get_memory_mb(resource_settings: ResourceSettings) -> Optional[int]

Convert ZenML memory resource settings to Modal MiB.

Source code in src/zenml/integrations/modal/sandbox_utils.py
271
272
273
274
def get_memory_mb(resource_settings: ResourceSettings) -> Optional[int]:
    """Convert ZenML memory resource settings to Modal MiB."""
    memory_mb = resource_settings.get_memory(ByteUnit.MB)
    return math.ceil(memory_mb) if memory_mb is not None else None
get_modal_image_from_registry(image_name: str, registry_credentials: Optional[Tuple[str, str]] = None) -> modal.Image

Create a Modal image from a registry image name.

Source code in src/zenml/integrations/modal/sandbox_utils.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def get_modal_image_from_registry(
    image_name: str,
    registry_credentials: Optional[Tuple[str, str]] = None,
) -> "modal.Image":
    """Create a Modal image from a registry image name."""
    if registry_credentials:
        docker_username, docker_password = registry_credentials
        registry_secret = modal.Secret.from_dict(
            {
                "REGISTRY_USERNAME": docker_username,
                "REGISTRY_PASSWORD": docker_password,
            }
        )
        return modal.Image.from_registry(image_name, secret=registry_secret)

    return modal.Image.from_registry(image_name)
get_sandbox_by_id(sandbox_id: str, *, modal_client: Optional[modal.Client]) -> modal.Sandbox

Reconstruct a Modal sandbox handle by ID.

Source code in src/zenml/integrations/modal/sandbox_utils.py
375
376
377
378
379
380
381
def get_sandbox_by_id(
    sandbox_id: str,
    *,
    modal_client: Optional["modal.Client"],
) -> "modal.Sandbox":
    """Reconstruct a Modal sandbox handle by ID."""
    return modal.Sandbox.from_id(sandbox_id, client=modal_client)
get_sandbox_status(sandbox_id: str, *, modal_client: Optional[modal.Client]) -> ExecutionStatus

Get the ZenML execution status for a Modal sandbox.

Source code in src/zenml/integrations/modal/sandbox_utils.py
384
385
386
387
388
389
390
391
def get_sandbox_status(
    sandbox_id: str,
    *,
    modal_client: Optional["modal.Client"],
) -> ExecutionStatus:
    """Get the ZenML execution status for a Modal sandbox."""
    sandbox = get_sandbox_by_id(sandbox_id, modal_client=modal_client)
    return sandbox_status_from_return_code(sandbox.poll())
lookup_modal_app(app_name: str, *, modal_environment: Optional[str], modal_client: Optional[modal.Client]) -> modal.App

Look up or create a Modal app for Sandbox execution.

Source code in src/zenml/integrations/modal/sandbox_utils.py
198
199
200
201
202
203
204
205
206
207
208
209
210
def lookup_modal_app(
    app_name: str,
    *,
    modal_environment: Optional[str],
    modal_client: Optional["modal.Client"],
) -> "modal.App":
    """Look up or create a Modal app for Sandbox execution."""
    return modal.App.lookup(
        app_name,
        create_if_missing=True,
        environment_name=modal_environment,
        client=modal_client,
    )
normalize_optional_config_value(value: Optional[str]) -> Optional[str]

Normalize optional string config values.

Source code in src/zenml/integrations/modal/sandbox_utils.py
77
78
79
80
81
82
83
def normalize_optional_config_value(value: Optional[str]) -> Optional[str]:
    """Normalize optional string config values."""
    if value is None:
        return None

    stripped_value = value.strip()
    return stripped_value or None
resolve_modal_token_pair(*, token_id: Optional[str], token_secret: Optional[str]) -> Optional[Tuple[str, str]]

Resolve the Modal token pair to forward into a sandbox.

The explicitly configured component token is preferred. When it is absent, this falls back to Modal's ambient authentication config (~/.modal.toml or the MODAL_TOKEN_ID / MODAL_TOKEN_SECRET environment variables), the same source Modal's SDK uses when no explicit client is created.

Parameters:

Name Type Description Default
token_id Optional[str]

Explicitly configured Modal token ID, if any.

required
token_secret Optional[str]

Explicitly configured Modal token secret, if any.

required

Returns:

Type Description
Optional[Tuple[str, str]]

The resolved (token_id, token_secret) pair, or None when no

Optional[Tuple[str, str]]

credentials can be resolved from either source.

Source code in src/zenml/integrations/modal/sandbox_utils.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def resolve_modal_token_pair(
    *,
    token_id: Optional[str],
    token_secret: Optional[str],
) -> Optional[Tuple[str, str]]:
    """Resolve the Modal token pair to forward into a sandbox.

    The explicitly configured component token is preferred. When it is absent,
    this falls back to Modal's ambient authentication config (``~/.modal.toml``
    or the ``MODAL_TOKEN_ID`` / ``MODAL_TOKEN_SECRET`` environment variables),
    the same source Modal's SDK uses when no explicit client is created.

    Args:
        token_id: Explicitly configured Modal token ID, if any.
        token_secret: Explicitly configured Modal token secret, if any.

    Returns:
        The resolved ``(token_id, token_secret)`` pair, or ``None`` when no
        credentials can be resolved from either source.
    """
    resolved_id = normalize_optional_config_value(token_id)
    resolved_secret = normalize_optional_config_value(token_secret)

    if not resolved_id or not resolved_secret:
        from modal.config import config as modal_config

        ambient_id: Optional[str] = None
        ambient_secret: Optional[str] = None
        try:
            ambient_id = modal_config.get("token_id")  # type: ignore[no-untyped-call, unused-ignore]
            ambient_secret = modal_config.get("token_secret")  # type: ignore[no-untyped-call, unused-ignore]
        except Exception:
            logger.debug(
                "Failed to read ambient Modal token configuration.",
                exc_info=True,
            )

        resolved_id = resolved_id or normalize_optional_config_value(
            ambient_id
        )
        resolved_secret = resolved_secret or normalize_optional_config_value(
            ambient_secret
        )

    if not resolved_id or not resolved_secret:
        return None

    return resolved_id, resolved_secret
sandbox_status_from_return_code(return_code: Optional[int]) -> ExecutionStatus

Map a Modal sandbox return code to a ZenML execution status.

Source code in src/zenml/integrations/modal/sandbox_utils.py
364
365
366
367
368
369
370
371
372
def sandbox_status_from_return_code(
    return_code: Optional[int],
) -> ExecutionStatus:
    """Map a Modal sandbox return code to a ZenML execution status."""
    if return_code is None:
        return ExecutionStatus.RUNNING
    if return_code == 0:
        return ExecutionStatus.COMPLETED
    return ExecutionStatus.FAILED
split_modal_runtime_environment(environment: Dict[str, str]) -> Tuple[Dict[str, str], Dict[str, Optional[str]]]

Split runtime environment variables into plain and secret values.

Source code in src/zenml/integrations/modal/sandbox_utils.py
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def split_modal_runtime_environment(
    environment: Dict[str, str],
) -> Tuple[Dict[str, str], Dict[str, Optional[str]]]:
    """Split runtime environment variables into plain and secret values."""
    sandbox_environment: Dict[str, str] = {}
    sensitive_environment: Dict[str, Optional[str]] = {}

    for key, value in environment.items():
        if key in SENSITIVE_ZENML_RUNTIME_ENV_KEYS:
            sensitive_environment[key] = value
        else:
            sandbox_environment[key] = value

    return sandbox_environment, sensitive_environment
terminate_sandbox(sandbox_id: str, *, modal_client: Optional[modal.Client]) -> None

Terminate a Modal sandbox by ID.

Source code in src/zenml/integrations/modal/sandbox_utils.py
394
395
396
397
398
399
400
401
def terminate_sandbox(
    sandbox_id: str,
    *,
    modal_client: Optional["modal.Client"],
) -> None:
    """Terminate a Modal sandbox by ID."""
    sandbox = get_sandbox_by_id(sandbox_id, modal_client=modal_client)
    sandbox.terminate()
wait_for_sandbox(sandbox: modal.Sandbox, poll_interval: float = 1.0) -> int

Wait for a Modal sandbox by polling its return code.

Source code in src/zenml/integrations/modal/sandbox_utils.py
353
354
355
356
357
358
359
360
361
def wait_for_sandbox(
    sandbox: "modal.Sandbox", poll_interval: float = 1.0
) -> int:
    """Wait for a Modal sandbox by polling its return code."""
    while True:
        return_code = sandbox.poll()
        if return_code is not None:
            return return_code
        time.sleep(poll_interval)

sandboxes

Modal sandbox implementation.

Classes
ModalSandbox(*args: Any, **kwargs: Any)

Bases: BaseSandbox

Sandbox flavor backed by Modal.

Initialize the Modal sandbox component.

Parameters:

Name Type Description Default
*args Any

Forwarded to StackComponent.

()
**kwargs Any

Forwarded to StackComponent.

{}
Source code in src/zenml/integrations/modal/sandboxes/modal_sandbox.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initialize the Modal sandbox component.

    Args:
        *args: Forwarded to ``StackComponent``.
        **kwargs: Forwarded to ``StackComponent``.
    """
    super().__init__(*args, **kwargs)
    # Explicit Modal client built lazily from the component's
    # credentials and reused across sessions; rebuilt if it closes.
    # The lock guards the build against concurrent create_session calls
    # (e.g. fanned-out mapped steps sharing this component instance).
    self._modal_client: Optional["modal.Client"] = None
    self._modal_client_lock = Lock()
Attributes
config: ModalSandboxConfig property

Typed config.

Returns:

Type Description
ModalSandboxConfig

The component's Modal-specific config.

settings_class: Optional[Type[BaseSettings]] property

Settings class.

Returns:

Type Description
Optional[Type[BaseSettings]]

ModalSandboxSettings.

Methods:
attach(session_id: str) -> SandboxSession

Reconnect to a still-live Modal Sandbox by id.

Parameters:

Name Type Description Default
session_id str

The Modal sandbox object id (e.g. sb_xxx).

required

Returns:

Type Description
SandboxSession

A ModalSandboxSession wrapping the existing live sandbox.

Raises:

Type Description
RuntimeError

If Modal can't find a sandbox with the given id, or the sandbox has already terminated.

Source code in src/zenml/integrations/modal/sandboxes/modal_sandbox.py
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
def attach(self, session_id: str) -> SandboxSession:
    """Reconnect to a still-live Modal Sandbox by id.

    Args:
        session_id: The Modal sandbox object id (e.g. ``sb_xxx``).

    Returns:
        A ``ModalSandboxSession`` wrapping the existing live sandbox.

    Raises:
        RuntimeError: If Modal can't find a sandbox with the given id,
            or the sandbox has already terminated.
    """
    modal_client = self._get_modal_client()
    try:
        sandbox = sandbox_utils.get_sandbox_by_id(
            session_id, modal_client=modal_client
        )
    except Exception as e:
        raise RuntimeError(
            f"Failed to attach to Modal sandbox '{session_id}' "
            f"({type(e).__name__}): {e}"
        ) from e
    # Modal happily returns a handle for a dead sandbox; without this
    # check the first exec fails with a confusing Modal error.
    if sandbox.poll() is not None:
        raise RuntimeError(
            f"Modal sandbox '{session_id}' has already terminated "
            "(destroyed or TTL expired); create a new session instead."
        )
    return ModalSandboxSession(sandbox, parent=self)
create_session(settings: Optional[BaseSandboxSettings] = None) -> SandboxSession

Boot a fresh Modal Sandbox.

Parameters:

Name Type Description Default
settings Optional[BaseSandboxSettings]

Optional per-step overrides for image / env.

None

Returns:

Type Description
SandboxSession

A ModalSandboxSession wrapping the live Modal Sandbox.

Source code in src/zenml/integrations/modal/sandboxes/modal_sandbox.py
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
def create_session(
    self, settings: Optional[BaseSandboxSettings] = None
) -> SandboxSession:
    """Boot a fresh Modal Sandbox.

    Args:
        settings: Optional per-step overrides for image / env.

    Returns:
        A ``ModalSandboxSession`` wrapping the live Modal Sandbox.
    """
    settings = cast(ModalSandboxSettings, self.resolve_settings(settings))
    modal_client = self._get_modal_client()
    image = sandbox_utils.get_modal_image_from_registry(
        settings.image,
        registry_credentials=self._registry_credentials(settings.image),
    )
    sandbox = modal.Sandbox.create(
        **self._build_create_kwargs(
            settings,
            image=image,
            modal_client=modal_client,
            environment=self._resolve_session_environment(settings),
        )
    )
    return ModalSandboxSession(sandbox, parent=self)
restore(snapshot: SandboxSnapshot) -> SandboxSession

Boot a new Session from a stored filesystem snapshot.

Parameters:

Name Type Description Default
snapshot SandboxSnapshot

A SandboxSnapshot whose ref is a Modal Image id captured via snapshot_filesystem().

required

Returns:

Type Description
SandboxSession

A new ModalSandboxSession initialized from the Image. No

SandboxSession

in-memory state from the original Session is preserved.

Raises:

Type Description
RuntimeError

If Modal can't load the Image (e.g. id GC'd).

Source code in src/zenml/integrations/modal/sandboxes/modal_sandbox.py
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
def restore(self, snapshot: SandboxSnapshot) -> SandboxSession:
    """Boot a new Session from a stored filesystem snapshot.

    Args:
        snapshot: A ``SandboxSnapshot`` whose ``ref`` is a Modal Image
            id captured via ``snapshot_filesystem()``.

    Returns:
        A new ``ModalSandboxSession`` initialized from the Image. No
        in-memory state from the original Session is preserved.

    Raises:
        RuntimeError: If Modal can't load the Image (e.g. id GC'd).
    """
    self._validate_snapshot(snapshot)
    settings = cast(ModalSandboxSettings, self.resolve_settings(None))
    modal_client = self._get_modal_client()
    try:
        image = modal.Image.from_id(snapshot.ref, client=modal_client)
        # Env vars are runtime config, not filesystem state, so the
        # snapshot image doesn't carry them — re-apply the resolved
        # session environment on restore.
        sandbox = modal.Sandbox.create(
            **self._build_create_kwargs(
                settings,
                image=image,
                modal_client=modal_client,
                environment=self._resolve_session_environment(settings),
            )
        )
    except Exception as e:
        raise RuntimeError(
            f"Failed to restore Modal sandbox from image "
            f"'{snapshot.ref}' ({type(e).__name__}): {e}"
        ) from e
    return ModalSandboxSession(sandbox, parent=self)
ModalSandboxProcess(process: ContainerProcess[str], *, session: ModalSandboxSession, started_at: float)

Bases: SandboxProcess

Wraps a Modal ContainerProcess in the SandboxProcess interface.

Initialize the process wrapper.

Parameters:

Name Type Description Default
process ContainerProcess[str]

The Modal ContainerProcess returned by sandbox.exec.

required
session ModalSandboxSession

Owning session — stdout/stderr lines forward through session._wrap_stream into the per-session log source.

required
started_at float

Wall-clock launch time, used by collect() to report elapsed time.

required
Source code in src/zenml/integrations/modal/sandboxes/modal_sandbox.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def __init__(
    self,
    process: "ContainerProcess[str]",
    *,
    session: "ModalSandboxSession",
    started_at: float,
) -> None:
    """Initialize the process wrapper.

    Args:
        process: The Modal ``ContainerProcess`` returned by ``sandbox.exec``.
        session: Owning session — stdout/stderr lines forward through
            ``session._wrap_stream`` into the per-session log source.
        started_at: Wall-clock launch time, used by ``collect()`` to
            report elapsed time.
    """
    super().__init__(session=session, started_at=started_at)
    self._process = process
Attributes
exit_code: Optional[int] property

Exit code, or None if the command is still running.

Uses poll() instead of returncode: Modal's ContainerProcess.returncode raises InvalidError while the command is still running, whereas this property promises None.

Returns:

Type Description
Optional[int]

The exit code or None.

Methods:
kill() -> None

Killing a single command is not supported on Modal.

Modal's ContainerProcess exposes no per-command termination (only poll/wait/stdin/stdout/stderr), and tearing down the whole Sandbox to stop one command would also stop every other command in the session. Rather than that surprising side effect, per-command kill is unsupported; call session.destroy() to stop the entire sandbox.

Raises:

Type Description
NotImplementedError

Always — the Modal SDK has no per-command kill.

Source code in src/zenml/integrations/modal/sandboxes/modal_sandbox.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def kill(self) -> None:
    """Killing a single command is not supported on Modal.

    Modal's ``ContainerProcess`` exposes no per-command termination
    (only ``poll``/``wait``/``stdin``/``stdout``/``stderr``), and
    tearing down the whole Sandbox to stop one command would also stop
    every other command in the session. Rather than that surprising
    side effect, per-command kill is unsupported; call
    ``session.destroy()`` to stop the entire sandbox.

    Raises:
        NotImplementedError: Always — the Modal SDK has no per-command
            kill.
    """
    raise NotImplementedError(
        "The Modal sandbox cannot kill an individual command: Modal's "
        "ContainerProcess has no per-command termination. Use "
        "session.destroy() to tear down the whole sandbox instead."
    )
stderr() -> Iterator[str]

Returns a line-buffered, log-wrapped stderr iterator.

Returns:

Type Description
Iterator[str]

Line iterator wrapped via session._wrap_stream.

Source code in src/zenml/integrations/modal/sandboxes/modal_sandbox.py
 92
 93
 94
 95
 96
 97
 98
 99
100
def stderr(self) -> Iterator[str]:
    """Returns a line-buffered, log-wrapped stderr iterator.

    Returns:
        Line iterator wrapped via ``session._wrap_stream``.
    """
    return self._session._wrap_stream(
        line_buffer(self._process.stderr), log_level=logging.ERROR
    )
stdout() -> Iterator[str]

Returns a line-buffered, log-wrapped stdout iterator.

Returns:

Type Description
Iterator[str]

Line iterator wrapped via session._wrap_stream.

Source code in src/zenml/integrations/modal/sandboxes/modal_sandbox.py
82
83
84
85
86
87
88
89
90
def stdout(self) -> Iterator[str]:
    """Returns a line-buffered, log-wrapped stdout iterator.

    Returns:
        Line iterator wrapped via ``session._wrap_stream``.
    """
    return self._session._wrap_stream(
        line_buffer(self._process.stdout), log_level=logging.INFO
    )
wait(timeout: Optional[float] = None) -> int

Blocks until the command exits.

Parameters:

Name Type Description Default
timeout Optional[float]

Maximum seconds to wait. Modal's own wait() has no timeout parameter, so a bounded wait polls instead.

None

Returns:

Type Description
int

The exit code.

Raises:

Type Description
TimeoutError

If the command is still running after timeout seconds.

Source code in src/zenml/integrations/modal/sandboxes/modal_sandbox.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def wait(self, timeout: Optional[float] = None) -> int:
    """Blocks until the command exits.

    Args:
        timeout: Maximum seconds to wait. Modal's own ``wait()`` has
            no timeout parameter, so a bounded wait polls instead.

    Returns:
        The exit code.

    Raises:
        TimeoutError: If the command is still running after ``timeout``
            seconds.
    """
    if timeout is None:
        self._process.wait()
        return int(self._process.returncode or 0)
    deadline = time.monotonic() + timeout
    while time.monotonic() < deadline:
        code = self._process.poll()
        if code is not None:
            return int(code)
        time.sleep(0.2)
    raise TimeoutError(
        f"Modal command did not exit within {timeout} seconds."
    )
ModalSandboxSession(sandbox: modal.Sandbox, *, parent: ModalSandbox)

Bases: SandboxSession

Wraps a Modal Sandbox in the SandboxSession interface.

Initialize the session wrapper.

Parameters:

Name Type Description Default
sandbox Sandbox

The live Modal Sandbox object.

required
parent ModalSandbox

The owning Modal sandbox component.

required
Source code in src/zenml/integrations/modal/sandboxes/modal_sandbox.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def __init__(
    self,
    sandbox: "modal.Sandbox",
    *,
    parent: "ModalSandbox",
) -> None:
    """Initialize the session wrapper.

    Args:
        sandbox: The live Modal ``Sandbox`` object.
        parent: The owning Modal sandbox component.
    """
    # Assign _sandbox before super().__init__ so the dashboard hook,
    # which is called during base __init__ via _publish_sandbox_metadata,
    # has the state it needs.
    self._sandbox = sandbox
    super().__init__(id=sandbox.object_id, parent=parent)
Methods:
Modules
modal_sandbox

Modal sandbox flavor implementation.

Classes
ModalSandbox(*args: Any, **kwargs: Any)

Bases: BaseSandbox

Sandbox flavor backed by Modal.

Initialize the Modal sandbox component.

Parameters:

Name Type Description Default
*args Any

Forwarded to StackComponent.

()
**kwargs Any

Forwarded to StackComponent.

{}
Source code in src/zenml/integrations/modal/sandboxes/modal_sandbox.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initialize the Modal sandbox component.

    Args:
        *args: Forwarded to ``StackComponent``.
        **kwargs: Forwarded to ``StackComponent``.
    """
    super().__init__(*args, **kwargs)
    # Explicit Modal client built lazily from the component's
    # credentials and reused across sessions; rebuilt if it closes.
    # The lock guards the build against concurrent create_session calls
    # (e.g. fanned-out mapped steps sharing this component instance).
    self._modal_client: Optional["modal.Client"] = None
    self._modal_client_lock = Lock()
Attributes
config: ModalSandboxConfig property

Typed config.

Returns:

Type Description
ModalSandboxConfig

The component's Modal-specific config.

settings_class: Optional[Type[BaseSettings]] property

Settings class.

Returns:

Type Description
Optional[Type[BaseSettings]]

ModalSandboxSettings.

Methods:
attach(session_id: str) -> SandboxSession

Reconnect to a still-live Modal Sandbox by id.

Parameters:

Name Type Description Default
session_id str

The Modal sandbox object id (e.g. sb_xxx).

required

Returns:

Type Description
SandboxSession

A ModalSandboxSession wrapping the existing live sandbox.

Raises:

Type Description
RuntimeError

If Modal can't find a sandbox with the given id, or the sandbox has already terminated.

Source code in src/zenml/integrations/modal/sandboxes/modal_sandbox.py
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
def attach(self, session_id: str) -> SandboxSession:
    """Reconnect to a still-live Modal Sandbox by id.

    Args:
        session_id: The Modal sandbox object id (e.g. ``sb_xxx``).

    Returns:
        A ``ModalSandboxSession`` wrapping the existing live sandbox.

    Raises:
        RuntimeError: If Modal can't find a sandbox with the given id,
            or the sandbox has already terminated.
    """
    modal_client = self._get_modal_client()
    try:
        sandbox = sandbox_utils.get_sandbox_by_id(
            session_id, modal_client=modal_client
        )
    except Exception as e:
        raise RuntimeError(
            f"Failed to attach to Modal sandbox '{session_id}' "
            f"({type(e).__name__}): {e}"
        ) from e
    # Modal happily returns a handle for a dead sandbox; without this
    # check the first exec fails with a confusing Modal error.
    if sandbox.poll() is not None:
        raise RuntimeError(
            f"Modal sandbox '{session_id}' has already terminated "
            "(destroyed or TTL expired); create a new session instead."
        )
    return ModalSandboxSession(sandbox, parent=self)
create_session(settings: Optional[BaseSandboxSettings] = None) -> SandboxSession

Boot a fresh Modal Sandbox.

Parameters:

Name Type Description Default
settings Optional[BaseSandboxSettings]

Optional per-step overrides for image / env.

None

Returns:

Type Description
SandboxSession

A ModalSandboxSession wrapping the live Modal Sandbox.

Source code in src/zenml/integrations/modal/sandboxes/modal_sandbox.py
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
def create_session(
    self, settings: Optional[BaseSandboxSettings] = None
) -> SandboxSession:
    """Boot a fresh Modal Sandbox.

    Args:
        settings: Optional per-step overrides for image / env.

    Returns:
        A ``ModalSandboxSession`` wrapping the live Modal Sandbox.
    """
    settings = cast(ModalSandboxSettings, self.resolve_settings(settings))
    modal_client = self._get_modal_client()
    image = sandbox_utils.get_modal_image_from_registry(
        settings.image,
        registry_credentials=self._registry_credentials(settings.image),
    )
    sandbox = modal.Sandbox.create(
        **self._build_create_kwargs(
            settings,
            image=image,
            modal_client=modal_client,
            environment=self._resolve_session_environment(settings),
        )
    )
    return ModalSandboxSession(sandbox, parent=self)
restore(snapshot: SandboxSnapshot) -> SandboxSession

Boot a new Session from a stored filesystem snapshot.

Parameters:

Name Type Description Default
snapshot SandboxSnapshot

A SandboxSnapshot whose ref is a Modal Image id captured via snapshot_filesystem().

required

Returns:

Type Description
SandboxSession

A new ModalSandboxSession initialized from the Image. No

SandboxSession

in-memory state from the original Session is preserved.

Raises:

Type Description
RuntimeError

If Modal can't load the Image (e.g. id GC'd).

Source code in src/zenml/integrations/modal/sandboxes/modal_sandbox.py
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
def restore(self, snapshot: SandboxSnapshot) -> SandboxSession:
    """Boot a new Session from a stored filesystem snapshot.

    Args:
        snapshot: A ``SandboxSnapshot`` whose ``ref`` is a Modal Image
            id captured via ``snapshot_filesystem()``.

    Returns:
        A new ``ModalSandboxSession`` initialized from the Image. No
        in-memory state from the original Session is preserved.

    Raises:
        RuntimeError: If Modal can't load the Image (e.g. id GC'd).
    """
    self._validate_snapshot(snapshot)
    settings = cast(ModalSandboxSettings, self.resolve_settings(None))
    modal_client = self._get_modal_client()
    try:
        image = modal.Image.from_id(snapshot.ref, client=modal_client)
        # Env vars are runtime config, not filesystem state, so the
        # snapshot image doesn't carry them — re-apply the resolved
        # session environment on restore.
        sandbox = modal.Sandbox.create(
            **self._build_create_kwargs(
                settings,
                image=image,
                modal_client=modal_client,
                environment=self._resolve_session_environment(settings),
            )
        )
    except Exception as e:
        raise RuntimeError(
            f"Failed to restore Modal sandbox from image "
            f"'{snapshot.ref}' ({type(e).__name__}): {e}"
        ) from e
    return ModalSandboxSession(sandbox, parent=self)
ModalSandboxProcess(process: ContainerProcess[str], *, session: ModalSandboxSession, started_at: float)

Bases: SandboxProcess

Wraps a Modal ContainerProcess in the SandboxProcess interface.

Initialize the process wrapper.

Parameters:

Name Type Description Default
process ContainerProcess[str]

The Modal ContainerProcess returned by sandbox.exec.

required
session ModalSandboxSession

Owning session — stdout/stderr lines forward through session._wrap_stream into the per-session log source.

required
started_at float

Wall-clock launch time, used by collect() to report elapsed time.

required
Source code in src/zenml/integrations/modal/sandboxes/modal_sandbox.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def __init__(
    self,
    process: "ContainerProcess[str]",
    *,
    session: "ModalSandboxSession",
    started_at: float,
) -> None:
    """Initialize the process wrapper.

    Args:
        process: The Modal ``ContainerProcess`` returned by ``sandbox.exec``.
        session: Owning session — stdout/stderr lines forward through
            ``session._wrap_stream`` into the per-session log source.
        started_at: Wall-clock launch time, used by ``collect()`` to
            report elapsed time.
    """
    super().__init__(session=session, started_at=started_at)
    self._process = process
Attributes
exit_code: Optional[int] property

Exit code, or None if the command is still running.

Uses poll() instead of returncode: Modal's ContainerProcess.returncode raises InvalidError while the command is still running, whereas this property promises None.

Returns:

Type Description
Optional[int]

The exit code or None.

Methods:
kill() -> None

Killing a single command is not supported on Modal.

Modal's ContainerProcess exposes no per-command termination (only poll/wait/stdin/stdout/stderr), and tearing down the whole Sandbox to stop one command would also stop every other command in the session. Rather than that surprising side effect, per-command kill is unsupported; call session.destroy() to stop the entire sandbox.

Raises:

Type Description
NotImplementedError

Always — the Modal SDK has no per-command kill.

Source code in src/zenml/integrations/modal/sandboxes/modal_sandbox.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def kill(self) -> None:
    """Killing a single command is not supported on Modal.

    Modal's ``ContainerProcess`` exposes no per-command termination
    (only ``poll``/``wait``/``stdin``/``stdout``/``stderr``), and
    tearing down the whole Sandbox to stop one command would also stop
    every other command in the session. Rather than that surprising
    side effect, per-command kill is unsupported; call
    ``session.destroy()`` to stop the entire sandbox.

    Raises:
        NotImplementedError: Always — the Modal SDK has no per-command
            kill.
    """
    raise NotImplementedError(
        "The Modal sandbox cannot kill an individual command: Modal's "
        "ContainerProcess has no per-command termination. Use "
        "session.destroy() to tear down the whole sandbox instead."
    )
stderr() -> Iterator[str]

Returns a line-buffered, log-wrapped stderr iterator.

Returns:

Type Description
Iterator[str]

Line iterator wrapped via session._wrap_stream.

Source code in src/zenml/integrations/modal/sandboxes/modal_sandbox.py
 92
 93
 94
 95
 96
 97
 98
 99
100
def stderr(self) -> Iterator[str]:
    """Returns a line-buffered, log-wrapped stderr iterator.

    Returns:
        Line iterator wrapped via ``session._wrap_stream``.
    """
    return self._session._wrap_stream(
        line_buffer(self._process.stderr), log_level=logging.ERROR
    )
stdout() -> Iterator[str]

Returns a line-buffered, log-wrapped stdout iterator.

Returns:

Type Description
Iterator[str]

Line iterator wrapped via session._wrap_stream.

Source code in src/zenml/integrations/modal/sandboxes/modal_sandbox.py
82
83
84
85
86
87
88
89
90
def stdout(self) -> Iterator[str]:
    """Returns a line-buffered, log-wrapped stdout iterator.

    Returns:
        Line iterator wrapped via ``session._wrap_stream``.
    """
    return self._session._wrap_stream(
        line_buffer(self._process.stdout), log_level=logging.INFO
    )
wait(timeout: Optional[float] = None) -> int

Blocks until the command exits.

Parameters:

Name Type Description Default
timeout Optional[float]

Maximum seconds to wait. Modal's own wait() has no timeout parameter, so a bounded wait polls instead.

None

Returns:

Type Description
int

The exit code.

Raises:

Type Description
TimeoutError

If the command is still running after timeout seconds.

Source code in src/zenml/integrations/modal/sandboxes/modal_sandbox.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def wait(self, timeout: Optional[float] = None) -> int:
    """Blocks until the command exits.

    Args:
        timeout: Maximum seconds to wait. Modal's own ``wait()`` has
            no timeout parameter, so a bounded wait polls instead.

    Returns:
        The exit code.

    Raises:
        TimeoutError: If the command is still running after ``timeout``
            seconds.
    """
    if timeout is None:
        self._process.wait()
        return int(self._process.returncode or 0)
    deadline = time.monotonic() + timeout
    while time.monotonic() < deadline:
        code = self._process.poll()
        if code is not None:
            return int(code)
        time.sleep(0.2)
    raise TimeoutError(
        f"Modal command did not exit within {timeout} seconds."
    )
ModalSandboxSession(sandbox: modal.Sandbox, *, parent: ModalSandbox)

Bases: SandboxSession

Wraps a Modal Sandbox in the SandboxSession interface.

Initialize the session wrapper.

Parameters:

Name Type Description Default
sandbox Sandbox

The live Modal Sandbox object.

required
parent ModalSandbox

The owning Modal sandbox component.

required
Source code in src/zenml/integrations/modal/sandboxes/modal_sandbox.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def __init__(
    self,
    sandbox: "modal.Sandbox",
    *,
    parent: "ModalSandbox",
) -> None:
    """Initialize the session wrapper.

    Args:
        sandbox: The live Modal ``Sandbox`` object.
        parent: The owning Modal sandbox component.
    """
    # Assign _sandbox before super().__init__ so the dashboard hook,
    # which is called during base __init__ via _publish_sandbox_metadata,
    # has the state it needs.
    self._sandbox = sandbox
    super().__init__(id=sandbox.object_id, parent=parent)
Methods:
Functions: Modules
utils

Sandbox-process helpers (line buffering for SandboxProcess IO).

Functions:
line_buffer(chunks: Iterable[Optional[Union[bytes, str]]]) -> Iterator[str]

Re-emit byte/str chunks one decoded line at a time.

Modal returns LogsReader iterables that yield byte chunks, not lines, so callers line-buffer on this side to match the line-delimited SandboxProcess.stdout() / stderr() contract.

Parameters:

Name Type Description Default
chunks Iterable[Optional[Union[bytes, str]]]

Source iterable yielding bytes, str, or None.

required

Yields:

Type Description
str

Lines with trailing newline when present; a final newline-less

str

remainder is yielded once the source is exhausted.

Source code in src/zenml/integrations/modal/sandboxes/utils.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def line_buffer(
    chunks: Iterable[Optional[Union[bytes, str]]],
) -> Iterator[str]:
    """Re-emit byte/str chunks one decoded line at a time.

    Modal returns LogsReader iterables that yield byte chunks, not lines,
    so callers line-buffer on this side to match the line-delimited
    ``SandboxProcess.stdout()`` / ``stderr()`` contract.

    Args:
        chunks: Source iterable yielding bytes, str, or None.

    Yields:
        Lines with trailing newline when present; a final newline-less
        remainder is yielded once the source is exhausted.
    """
    buffer = ""
    for chunk in chunks:
        if chunk is None:
            continue
        if isinstance(chunk, bytes):
            text = chunk.decode("utf-8", errors="replace")
        else:
            text = str(chunk)
        buffer += text
        while "\n" in buffer:
            line, buffer = buffer.split("\n", 1)
            yield line + "\n"
    if buffer:
        yield buffer

step_operators

Modal step operator.

Classes
ModalStepOperator(*args: Any, **kwargs: Any)

Bases: BaseStepOperator

Step operator to run a step on Modal.

This class defines code that can set up a Modal environment and run functions in it.

Initialize the Modal step operator.

Source code in src/zenml/integrations/modal/step_operators/modal_step_operator.py
53
54
55
56
57
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initialize the Modal step operator."""
    super().__init__(*args, **kwargs)
    self._modal_client: Optional["modal.Client"] = None
    self._modal_client_lock = Lock()
Attributes
config: ModalStepOperatorConfig property

Get the Modal step operator configuration.

Returns:

Type Description
ModalStepOperatorConfig

The Modal step operator configuration.

settings_class: Optional[Type[BaseSettings]] property

Get the settings class for the Modal step operator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The Modal step operator settings class.

validator: Optional[StackValidator] property

Get the stack validator for the Modal step operator.

Returns:

Type Description
Optional[StackValidator]

The stack validator.

Methods:
cancel(step_run: StepRunResponse) -> None

Cancels a submitted Modal sandbox.

Parameters:

Name Type Description Default
step_run StepRunResponse

The step run.

required
Source code in src/zenml/integrations/modal/step_operators/modal_step_operator.py
261
262
263
264
265
266
267
268
269
def cancel(self, step_run: "StepRunResponse") -> None:
    """Cancels a submitted Modal sandbox.

    Args:
        step_run: The step run.
    """
    sandbox_id = str(step_run.run_metadata[STEP_SANDBOX_ID_METADATA_KEY])
    modal_client = self._get_modal_client()
    sandbox_utils.terminate_sandbox(sandbox_id, modal_client=modal_client)
get_docker_builds(snapshot: PipelineSnapshotBase) -> List[BuildConfiguration]

Get the Docker build configurations for the Modal step operator.

Parameters:

Name Type Description Default
snapshot PipelineSnapshotBase

The pipeline snapshot.

required

Returns:

Type Description
List[BuildConfiguration]

A list of Docker build configurations.

Source code in src/zenml/integrations/modal/step_operators/modal_step_operator.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def get_docker_builds(
    self, snapshot: "PipelineSnapshotBase"
) -> List["BuildConfiguration"]:
    """Get the Docker build configurations for the Modal step operator.

    Args:
        snapshot: The pipeline snapshot.

    Returns:
        A list of Docker build configurations.
    """
    builds = []
    for step_name, step in snapshot.step_configurations.items():
        if step.config.uses_step_operator(self.name):
            build = BuildConfiguration(
                key=MODAL_STEP_OPERATOR_DOCKER_IMAGE_KEY,
                settings=step.config.docker_settings,
                step_name=step_name,
            )
            builds.append(build)

    return builds
get_status(step_run: StepRunResponse) -> ExecutionStatus

Gets the status of a submitted Modal sandbox.

Parameters:

Name Type Description Default
step_run StepRunResponse

The step run.

required

Returns:

Type Description
ExecutionStatus

The step status.

Source code in src/zenml/integrations/modal/step_operators/modal_step_operator.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def get_status(self, step_run: "StepRunResponse") -> ExecutionStatus:
    """Gets the status of a submitted Modal sandbox.

    Args:
        step_run: The step run.

    Returns:
        The step status.
    """
    sandbox_id = str(step_run.run_metadata[STEP_SANDBOX_ID_METADATA_KEY])
    modal_client = self._get_modal_client()
    return sandbox_utils.get_sandbox_status(
        sandbox_id, modal_client=modal_client
    )
submit(info: StepRunInfo, entrypoint_command: List[str], environment: Dict[str, str]) -> None

Submits a step run to Modal.

Parameters:

Name Type Description Default
info StepRunInfo

The step run information.

required
entrypoint_command List[str]

The entrypoint command for the step.

required
environment Dict[str, str]

The environment variables for the step.

required

Raises:

Type Description
ValueError

If no container registry is found in the stack or the entrypoint command is empty.

Exception

If sandbox metadata publication or local metadata update fails after sandbox creation. In this case, the sandbox is terminated before re-raising the original exception.

Source code in src/zenml/integrations/modal/step_operators/modal_step_operator.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def submit(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
    environment: Dict[str, str],
) -> None:
    """Submits a step run to Modal.

    Args:
        info: The step run information.
        entrypoint_command: The entrypoint command for the step.
        environment: The environment variables for the step.

    Raises:
        ValueError: If no container registry is found in the stack or the
            entrypoint command is empty.
        Exception: If sandbox metadata publication or local metadata update
            fails after sandbox creation. In this case, the sandbox is
            terminated before re-raising the original exception.
    """
    settings = cast(ModalStepOperatorSettings, self.get_settings(info))
    image_name = info.get_image(key=MODAL_STEP_OPERATOR_DOCKER_IMAGE_KEY)
    zc = Client()
    stack = zc.active_stack

    if not stack.container_registry:
        raise ValueError(
            "No Container registry found in the stack. "
            "Please add a container registry and ensure "
            "it is correctly configured."
        )

    if not entrypoint_command:
        raise ValueError(
            "Modal step operator received an empty entrypoint command."
        )

    zenml_image = sandbox_utils.get_modal_image_from_registry(
        image_name,
        stack.container_registry.credentials,
    )
    resource_settings = info.config.resource_settings
    modal_environment = sandbox_utils.normalize_optional_config_value(
        settings.modal_environment
    )
    modal_client = self._get_modal_client()

    app = sandbox_utils.lookup_modal_app(
        f"zenml-{info.step_run_id}-{info.pipeline_step_name}"[:64],
        modal_environment=modal_environment,
        modal_client=modal_client,
    )
    sandbox = sandbox_utils.create_modal_sandbox(
        entrypoint_command,
        app=app,
        image=zenml_image,
        settings=settings,
        resource_settings=resource_settings,
        environment=environment,
        modal_client=modal_client,
    )
    metadata: Dict[str, Any] = {
        STEP_SANDBOX_ID_METADATA_KEY: sandbox.object_id
    }
    if modal_environment:
        metadata[STEP_MODAL_ENVIRONMENT_METADATA_KEY] = modal_environment

    try:
        publish_step_run_metadata(info.step_run_id, {self.id: metadata})
        info.step_run.run_metadata.update(metadata)
    except Exception:
        try:
            sandbox.terminate()
        except Exception:
            logger.exception(
                "Failed to terminate Modal sandbox '%s' after metadata "
                "publication failed.",
                sandbox.object_id,
            )
        raise
Modules
modal_step_operator

Modal step operator implementation.

Classes
ModalStepOperator(*args: Any, **kwargs: Any)

Bases: BaseStepOperator

Step operator to run a step on Modal.

This class defines code that can set up a Modal environment and run functions in it.

Initialize the Modal step operator.

Source code in src/zenml/integrations/modal/step_operators/modal_step_operator.py
53
54
55
56
57
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initialize the Modal step operator."""
    super().__init__(*args, **kwargs)
    self._modal_client: Optional["modal.Client"] = None
    self._modal_client_lock = Lock()
Attributes
config: ModalStepOperatorConfig property

Get the Modal step operator configuration.

Returns:

Type Description
ModalStepOperatorConfig

The Modal step operator configuration.

settings_class: Optional[Type[BaseSettings]] property

Get the settings class for the Modal step operator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The Modal step operator settings class.

validator: Optional[StackValidator] property

Get the stack validator for the Modal step operator.

Returns:

Type Description
Optional[StackValidator]

The stack validator.

Methods:
cancel(step_run: StepRunResponse) -> None

Cancels a submitted Modal sandbox.

Parameters:

Name Type Description Default
step_run StepRunResponse

The step run.

required
Source code in src/zenml/integrations/modal/step_operators/modal_step_operator.py
261
262
263
264
265
266
267
268
269
def cancel(self, step_run: "StepRunResponse") -> None:
    """Cancels a submitted Modal sandbox.

    Args:
        step_run: The step run.
    """
    sandbox_id = str(step_run.run_metadata[STEP_SANDBOX_ID_METADATA_KEY])
    modal_client = self._get_modal_client()
    sandbox_utils.terminate_sandbox(sandbox_id, modal_client=modal_client)
get_docker_builds(snapshot: PipelineSnapshotBase) -> List[BuildConfiguration]

Get the Docker build configurations for the Modal step operator.

Parameters:

Name Type Description Default
snapshot PipelineSnapshotBase

The pipeline snapshot.

required

Returns:

Type Description
List[BuildConfiguration]

A list of Docker build configurations.

Source code in src/zenml/integrations/modal/step_operators/modal_step_operator.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def get_docker_builds(
    self, snapshot: "PipelineSnapshotBase"
) -> List["BuildConfiguration"]:
    """Get the Docker build configurations for the Modal step operator.

    Args:
        snapshot: The pipeline snapshot.

    Returns:
        A list of Docker build configurations.
    """
    builds = []
    for step_name, step in snapshot.step_configurations.items():
        if step.config.uses_step_operator(self.name):
            build = BuildConfiguration(
                key=MODAL_STEP_OPERATOR_DOCKER_IMAGE_KEY,
                settings=step.config.docker_settings,
                step_name=step_name,
            )
            builds.append(build)

    return builds
get_status(step_run: StepRunResponse) -> ExecutionStatus

Gets the status of a submitted Modal sandbox.

Parameters:

Name Type Description Default
step_run StepRunResponse

The step run.

required

Returns:

Type Description
ExecutionStatus

The step status.

Source code in src/zenml/integrations/modal/step_operators/modal_step_operator.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def get_status(self, step_run: "StepRunResponse") -> ExecutionStatus:
    """Gets the status of a submitted Modal sandbox.

    Args:
        step_run: The step run.

    Returns:
        The step status.
    """
    sandbox_id = str(step_run.run_metadata[STEP_SANDBOX_ID_METADATA_KEY])
    modal_client = self._get_modal_client()
    return sandbox_utils.get_sandbox_status(
        sandbox_id, modal_client=modal_client
    )
submit(info: StepRunInfo, entrypoint_command: List[str], environment: Dict[str, str]) -> None

Submits a step run to Modal.

Parameters:

Name Type Description Default
info StepRunInfo

The step run information.

required
entrypoint_command List[str]

The entrypoint command for the step.

required
environment Dict[str, str]

The environment variables for the step.

required

Raises:

Type Description
ValueError

If no container registry is found in the stack or the entrypoint command is empty.

Exception

If sandbox metadata publication or local metadata update fails after sandbox creation. In this case, the sandbox is terminated before re-raising the original exception.

Source code in src/zenml/integrations/modal/step_operators/modal_step_operator.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def submit(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
    environment: Dict[str, str],
) -> None:
    """Submits a step run to Modal.

    Args:
        info: The step run information.
        entrypoint_command: The entrypoint command for the step.
        environment: The environment variables for the step.

    Raises:
        ValueError: If no container registry is found in the stack or the
            entrypoint command is empty.
        Exception: If sandbox metadata publication or local metadata update
            fails after sandbox creation. In this case, the sandbox is
            terminated before re-raising the original exception.
    """
    settings = cast(ModalStepOperatorSettings, self.get_settings(info))
    image_name = info.get_image(key=MODAL_STEP_OPERATOR_DOCKER_IMAGE_KEY)
    zc = Client()
    stack = zc.active_stack

    if not stack.container_registry:
        raise ValueError(
            "No Container registry found in the stack. "
            "Please add a container registry and ensure "
            "it is correctly configured."
        )

    if not entrypoint_command:
        raise ValueError(
            "Modal step operator received an empty entrypoint command."
        )

    zenml_image = sandbox_utils.get_modal_image_from_registry(
        image_name,
        stack.container_registry.credentials,
    )
    resource_settings = info.config.resource_settings
    modal_environment = sandbox_utils.normalize_optional_config_value(
        settings.modal_environment
    )
    modal_client = self._get_modal_client()

    app = sandbox_utils.lookup_modal_app(
        f"zenml-{info.step_run_id}-{info.pipeline_step_name}"[:64],
        modal_environment=modal_environment,
        modal_client=modal_client,
    )
    sandbox = sandbox_utils.create_modal_sandbox(
        entrypoint_command,
        app=app,
        image=zenml_image,
        settings=settings,
        resource_settings=resource_settings,
        environment=environment,
        modal_client=modal_client,
    )
    metadata: Dict[str, Any] = {
        STEP_SANDBOX_ID_METADATA_KEY: sandbox.object_id
    }
    if modal_environment:
        metadata[STEP_MODAL_ENVIRONMENT_METADATA_KEY] = modal_environment

    try:
        publish_step_run_metadata(info.step_run_id, {self.id: metadata})
        info.step_run.run_metadata.update(metadata)
    except Exception:
        try:
            sandbox.terminate()
        except Exception:
            logger.exception(
                "Failed to terminate Modal sandbox '%s' after metadata "
                "publication failed.",
                sandbox.object_id,
            )
        raise
Functions: Modules