Skip to content

Ssh

zenml.integrations.ssh

SSH integration for remote pipeline and step execution.

Attributes

SSH = 'ssh' module-attribute

SSH_ORCHESTRATOR_FLAVOR = 'ssh' module-attribute

SSH_STEP_OPERATOR_FLAVOR = 'ssh' module-attribute

Classes

Flavor

Class for ZenML Flavors.

Attributes
config_class: Type[StackComponentConfig] abstractmethod property

Returns StackComponentConfig config class.

Returns:

Type Description
Type[StackComponentConfig]

The config class.

config_schema: Dict[str, Any] property

The config schema for a flavor.

Returns:

Type Description
Dict[str, Any]

The config schema.

display_name: Optional[str] property

The display name of the flavor.

By default, converts the technical name to a human-readable format. For example, "vm_kubernetes" becomes "VM Kubernetes". Flavors can override this to provide custom display names.

Returns:

Type Description
Optional[str]

The display name of the flavor.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[StackComponent] abstractmethod property

Implementation class for this flavor.

Returns:

Type Description
Type[StackComponent]

The implementation class for this flavor.

logo_url: Optional[str] property

A url to represent the flavor in the dashboard.

Returns:

Type Description
Optional[str]

The flavor logo.

name: str abstractmethod property

The flavor name.

Returns:

Type Description
str

The flavor name.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

type: StackComponentType abstractmethod property

The stack component type.

Returns:

Type Description
StackComponentType

The stack component type.

Methods:
from_model(flavor_model: FlavorResponse) -> Flavor classmethod

Loads a flavor from a model.

Parameters:

Name Type Description Default
flavor_model FlavorResponse

The model to load from.

required

Raises:

Type Description
CustomFlavorImportError

If the custom flavor can't be imported.

ImportError

If the flavor can't be imported.

Returns:

Type Description
Flavor

The loaded flavor.

Source code in src/zenml/stack/flavor.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@classmethod
def from_model(cls, flavor_model: FlavorResponse) -> "Flavor":
    """Loads a flavor from a model.

    Args:
        flavor_model: The model to load from.

    Raises:
        CustomFlavorImportError: If the custom flavor can't be imported.
        ImportError: If the flavor can't be imported.

    Returns:
        The loaded flavor.
    """
    try:
        _, flavor = validate_flavor_source(
            source=flavor_model.source,
            component_type=flavor_model.type,
            validate_component_classes=False,
        )
    except (TypeError, ValueError) as err:
        if flavor_model.is_custom:
            flavor_module, _, _ = flavor_model.source.rpartition(".")
            expected_file_path = os.path.join(
                source_utils.get_source_root(),
                flavor_module.replace(".", os.path.sep),
            )
            raise CustomFlavorImportError(
                f"Couldn't import custom flavor {flavor_model.name}: "
                f"{err}. Make sure the custom flavor class "
                f"`{flavor_model.source}` is importable. If it is part of "
                "a library, make sure it is installed. If "
                "it is a local code file, make sure it exists at "
                f"`{expected_file_path}.py`."
            ) from err
        else:
            raise ImportError(
                f"Couldn't import flavor {flavor_model.name}: {err}"
            ) from err
    return flavor
generate_default_docs_url() -> str

Generate the doc urls for all inbuilt and integration flavors.

Note that this method is not going to be useful for custom flavors, which do not have any docs in the main zenml docs.

Returns:

Type Description
str

The complete url to the zenml documentation

Source code in src/zenml/stack/flavor.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def generate_default_docs_url(self) -> str:
    """Generate the doc urls for all inbuilt and integration flavors.

    Note that this method is not going to be useful for custom flavors,
    which do not have any docs in the main zenml docs.

    Returns:
        The complete url to the zenml documentation
    """
    component_type = self.type.plural.replace("_", "-")
    name = self.name.replace("_", "-")

    base = "https://docs.zenml.io"
    return f"{base}/stack-components/{component_type}/{name}"
generate_default_sdk_docs_url() -> str

Generate SDK docs url for a flavor.

Returns:

Type Description
str

The complete url to the zenml SDK docs

Source code in src/zenml/stack/flavor.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def generate_default_sdk_docs_url(self) -> str:
    """Generate SDK docs url for a flavor.

    Returns:
        The complete url to the zenml SDK docs
    """
    from zenml import __version__

    base = f"https://sdkdocs.zenml.io/{__version__}"

    component_type = self.type.plural

    if "zenml.integrations" in self.__module__:
        # Get integration name out of module path which will look something
        #  like this "zenml.integrations.<integration>....
        integration = self.__module__.split(
            "zenml.integrations.", maxsplit=1
        )[1].split(".")[0]

        # Get the config class name to point to the specific class
        config_class_name = self.config_class.__name__

        return (
            f"{base}/integration_code_docs"
            f"/integrations-{integration}"
            f"#zenml.integrations.{integration}.flavors.{config_class_name}"
        )

    else:
        return (
            f"{base}/core_code_docs/core-{component_type}/"
            f"#{self.__module__}"
        )
to_model(integration: Optional[str] = None, is_custom: bool = True) -> FlavorRequest

Converts a flavor to a model.

Parameters:

Name Type Description Default
integration Optional[str]

The integration to use for the model.

None
is_custom bool

Whether the flavor is a custom flavor.

True

Returns:

Type Description
FlavorRequest

The model.

Source code in src/zenml/stack/flavor.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def to_model(
    self,
    integration: Optional[str] = None,
    is_custom: bool = True,
) -> FlavorRequest:
    """Converts a flavor to a model.

    Args:
        integration: The integration to use for the model.
        is_custom: Whether the flavor is a custom flavor.

    Returns:
        The model.
    """
    connector_requirements = self.service_connector_requirements
    connector_type = (
        connector_requirements.connector_type
        if connector_requirements
        else None
    )
    resource_type = (
        connector_requirements.resource_type
        if connector_requirements
        else None
    )
    resource_id_attr = (
        connector_requirements.resource_id_attr
        if connector_requirements
        else None
    )

    model = FlavorRequest(
        name=self.name,
        display_name=self.display_name,
        type=self.type,
        source=source_utils.resolve(self.__class__).import_path,
        config_schema=self.config_schema,
        connector_type=connector_type,
        connector_resource_type=resource_type,
        connector_resource_id_attr=resource_id_attr,
        integration=integration,
        logo_url=self.logo_url,
        docs_url=self.docs_url,
        sdk_docs_url=self.sdk_docs_url,
        is_custom=is_custom,
    )
    return model

Integration

Base class for integration in ZenML.

Methods:
activate() -> None classmethod

Abstract method to activate the integration.

Source code in src/zenml/integrations/integration.py
136
137
138
@classmethod
def activate(cls) -> None:
    """Abstract method to activate the integration."""
check_installation() -> bool classmethod

Method to check whether the required packages are installed.

Returns:

Type Description
bool

True if all required packages are installed, False otherwise.

Source code in src/zenml/integrations/integration.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@classmethod
def check_installation(cls) -> bool:
    """Method to check whether the required packages are installed.

    Returns:
        True if all required packages are installed, False otherwise.
    """
    for requirement in cls.get_requirements():
        parsed_requirement = Requirement(requirement)

        if not requirement_installed(parsed_requirement):
            logger.debug(
                "Requirement '%s' for integration '%s' is not installed "
                "or installed with the wrong version.",
                requirement,
                cls.NAME,
            )
            return False

        dependencies = get_dependencies(parsed_requirement)

        for dependency in dependencies:
            if not requirement_installed(dependency):
                logger.debug(
                    "Requirement '%s' for integration '%s' is not "
                    "installed or installed with the wrong version.",
                    dependency,
                    cls.NAME,
                )
                return False

    logger.debug(
        f"Integration '{cls.NAME}' is installed correctly with "
        f"requirements {cls.get_requirements()}."
    )
    return True
flavors() -> List[Type[Flavor]] classmethod

Abstract method to declare new stack component flavors.

Returns:

Type Description
List[Type[Flavor]]

A list of new stack component flavors.

Source code in src/zenml/integrations/integration.py
140
141
142
143
144
145
146
147
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Abstract method to declare new stack component flavors.

    Returns:
        A list of new stack component flavors.
    """
    return []
get_requirements(target_os: Optional[str] = None, python_version: Optional[str] = None) -> List[str] classmethod

Method to get the requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None
python_version Optional[str]

The Python version to use for the requirements.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@classmethod
def get_requirements(
    cls,
    target_os: Optional[str] = None,
    python_version: Optional[str] = None,
) -> List[str]:
    """Method to get the requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.
        python_version: The Python version to use for the requirements.

    Returns:
        A list of requirements.
    """
    return cls.REQUIREMENTS
get_uninstall_requirements(target_os: Optional[str] = None) -> List[str] classmethod

Method to get the uninstall requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
@classmethod
def get_uninstall_requirements(
    cls, target_os: Optional[str] = None
) -> List[str]:
    """Method to get the uninstall requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.

    Returns:
        A list of requirements.
    """
    ret = []
    for each in cls.get_requirements(target_os=target_os):
        is_ignored = False
        for ignored in cls.REQUIREMENTS_IGNORED_ON_UNINSTALL:
            if each.startswith(ignored):
                is_ignored = True
                break
        if not is_ignored:
            ret.append(each)
    return ret

SSHIntegration

Bases: Integration

Definition of SSH integration for ZenML.

Methods:
flavors() -> List[Type[Flavor]] classmethod

Declare the stack component flavors for the SSH integration.

Returns:

Type Description
List[Type[Flavor]]

List of new stack component flavors.

Source code in src/zenml/integrations/ssh/__init__.py
32
33
34
35
36
37
38
39
40
41
42
43
44
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the SSH integration.

    Returns:
        List of new stack component flavors.
    """
    from zenml.integrations.ssh.flavors import (
        SSHOrchestratorFlavor,
        SSHStepOperatorFlavor,
    )

    return [SSHStepOperatorFlavor, SSHOrchestratorFlavor]

Modules

flavors

SSH integration flavors.

Classes
SSHOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseOrchestratorConfig, BaseSSHComponentConfig, SSHOrchestratorSettings

Configuration for the SSH orchestrator.

Source code in src/zenml/stack/stack_component.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_local: bool property

Whether the orchestrator runs the pipeline locally.

Returns:

Type Description
bool

False

is_remote: bool property

Whether the orchestrator runs the pipeline remotely.

Returns:

Type Description
bool

True

is_schedulable: bool property

Whether the orchestrator supports scheduled pipeline runs.

Returns:

Type Description
bool

False.

SSHOrchestratorFlavor

Bases: BaseOrchestratorFlavor

SSH orchestrator flavor.

Attributes
config_class: Type[SSHOrchestratorConfig] property

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[SSHOrchestratorConfig]

The config class.

docs_url: Optional[str] property

A URL to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[SSHOrchestrator] property

Implementation class for this flavor.

Returns:

Type Description
Type[SSHOrchestrator]

Implementation class for this flavor.

logo_url: str property

A URL to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Flavor name.

Returns:

Type Description
str

The flavor name.

sdk_docs_url: Optional[str] property

A URL to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

SSHOrchestratorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSSHComponentSettings

Settings for the SSH orchestrator.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
SSHStepOperatorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseStepOperatorConfig, BaseSSHComponentConfig, SSHStepOperatorSettings

Configuration for the SSH step operator.

Source code in src/zenml/stack/stack_component.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_local: bool property

Checks if this stack component is running locally.

Returns:

Type Description
bool

False

is_remote: bool property

Checks if this stack component is running remotely.

Returns:

Type Description
bool

True

SSHStepOperatorFlavor

Bases: BaseStepOperatorFlavor

SSH step operator flavor.

Attributes
config_class: Type[SSHStepOperatorConfig] property

Returns SSHStepOperatorConfig config class.

Returns:

Type Description
Type[SSHStepOperatorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[SSHStepOperator] property

Implementation class for this flavor.

Returns:

Type Description
Type[SSHStepOperator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

SSHStepOperatorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSSHComponentSettings

Settings for the SSH step operator.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
Modules
base

Shared SSH configuration.

Classes
BaseSSHComponentConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: SSHConnectionConfigMixin

Shared SSH component configuration.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
BaseSSHComponentSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Shared SSH component settings.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
SSHConnectionConfigMixin(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Shared SSH connection configuration.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
Functions:
ssh_orchestrator_flavor

SSH orchestrator flavor.

Classes
SSHOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseOrchestratorConfig, BaseSSHComponentConfig, SSHOrchestratorSettings

Configuration for the SSH orchestrator.

Source code in src/zenml/stack/stack_component.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_local: bool property

Whether the orchestrator runs the pipeline locally.

Returns:

Type Description
bool

False

is_remote: bool property

Whether the orchestrator runs the pipeline remotely.

Returns:

Type Description
bool

True

is_schedulable: bool property

Whether the orchestrator supports scheduled pipeline runs.

Returns:

Type Description
bool

False.

SSHOrchestratorFlavor

Bases: BaseOrchestratorFlavor

SSH orchestrator flavor.

Attributes
config_class: Type[SSHOrchestratorConfig] property

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[SSHOrchestratorConfig]

The config class.

docs_url: Optional[str] property

A URL to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[SSHOrchestrator] property

Implementation class for this flavor.

Returns:

Type Description
Type[SSHOrchestrator]

Implementation class for this flavor.

logo_url: str property

A URL to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Flavor name.

Returns:

Type Description
str

The flavor name.

sdk_docs_url: Optional[str] property

A URL to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

SSHOrchestratorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSSHComponentSettings

Settings for the SSH orchestrator.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
ssh_step_operator_flavor

SSH step operator flavor.

Classes
SSHStepOperatorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseStepOperatorConfig, BaseSSHComponentConfig, SSHStepOperatorSettings

Configuration for the SSH step operator.

Source code in src/zenml/stack/stack_component.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_local: bool property

Checks if this stack component is running locally.

Returns:

Type Description
bool

False

is_remote: bool property

Checks if this stack component is running remotely.

Returns:

Type Description
bool

True

SSHStepOperatorFlavor

Bases: BaseStepOperatorFlavor

SSH step operator flavor.

Attributes
config_class: Type[SSHStepOperatorConfig] property

Returns SSHStepOperatorConfig config class.

Returns:

Type Description
Type[SSHStepOperatorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[SSHStepOperator] property

Implementation class for this flavor.

Returns:

Type Description
Type[SSHStepOperator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

SSHStepOperatorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSSHComponentSettings

Settings for the SSH step operator.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)

orchestrators

SSH orchestrator.

Classes
SSHOrchestrator(*args: Any, **kwargs: Any)

Bases: ContainerizedOrchestrator

Orchestrator that runs pipelines on a remote host via SSH + Docker.

Initialize the SSH orchestrator.

Parameters:

Name Type Description Default
*args Any

Forwarded to the base orchestrator.

()
**kwargs Any

Forwarded to the base orchestrator.

{}
Source code in src/zenml/integrations/ssh/orchestrators/ssh_orchestrator.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initialize the SSH orchestrator.

    Args:
        *args: Forwarded to the base orchestrator.
        **kwargs: Forwarded to the base orchestrator.
    """
    super().__init__(*args, **kwargs)
    # Isolated-step subprocesses keyed by step_run_id. Only populated
    # inside the remote orchestrator container (dynamic path); the lock
    # guards concurrent submit/poll/stop from the dynamic runner's
    # thread pool.
    self._step_procs: Dict[UUID, "subprocess.Popen[bytes]"] = {}
    self._stopped_step_ids: set[UUID] = set()
    self._step_procs_lock = threading.Lock()
Attributes
config: SSHOrchestratorConfig property

The orchestrator config.

Returns:

Type Description
SSHOrchestratorConfig

The orchestrator config.

settings_class: Optional[Type[BaseSettings]] property

Settings class for the SSH orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

supported_execution_modes: List[ExecutionMode] property

Execution modes supported by this orchestrator.

Returns:

Type Description
List[ExecutionMode]

The supported execution modes.

validator: Optional[StackValidator] property

Validates that the stack can run containerized pipelines.

Returns:

Type Description
Optional[StackValidator]

A stack validator requiring a container registry and image

Optional[StackValidator]

builder.

Methods:
get_isolated_step_status(step_run: StepRunResponse) -> ExecutionStatus

Report the status of an isolated step subprocess.

Parameters:

Name Type Description Default
step_run StepRunResponse

The step run to check.

required

Returns:

Type Description
ExecutionStatus

The execution status.

Source code in src/zenml/integrations/ssh/orchestrators/ssh_orchestrator.py
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
def get_isolated_step_status(
    self, step_run: "StepRunResponse"
) -> ExecutionStatus:
    """Report the status of an isolated step subprocess.

    Args:
        step_run: The step run to check.

    Returns:
        The execution status.
    """
    with self._step_procs_lock:
        process = self._step_procs.get(step_run.id)
        stopped = step_run.id in self._stopped_step_ids
    if process is None:
        return ExecutionStatus.RUNNING
    return_code = process.poll()
    if return_code is None:
        return ExecutionStatus.RUNNING
    if stopped:
        return ExecutionStatus.STOPPED
    if return_code == 0:
        return ExecutionStatus.COMPLETED
    return ExecutionStatus.FAILED
get_orchestrator_run_id() -> str

The run id, read from the execution environment.

Returns:

Type Description
str

The orchestrator run id.

Raises:

Type Description
RuntimeError

If called outside the remote execution environment (the env var is only set inside the launched containers).

Source code in src/zenml/integrations/ssh/orchestrators/ssh_orchestrator.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def get_orchestrator_run_id(self) -> str:
    """The run id, read from the execution environment.

    Returns:
        The orchestrator run id.

    Raises:
        RuntimeError: If called outside the remote execution environment
            (the env var is only set inside the launched containers).
    """
    try:
        return os.environ[ENV_ZENML_SSH_RUN_ID]
    except KeyError:
        raise RuntimeError(
            f"Unable to read run id: environment variable "
            f"{ENV_ZENML_SSH_RUN_ID} is not set. This is only available "
            "inside the remote execution environment."
        )
stop_isolated_step(step_run: StepRunResponse) -> None

Terminate an isolated step subprocess (preemption / fail-fast).

Parameters:

Name Type Description Default
step_run StepRunResponse

The step run to stop.

required
Source code in src/zenml/integrations/ssh/orchestrators/ssh_orchestrator.py
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
def stop_isolated_step(self, step_run: "StepRunResponse") -> None:
    """Terminate an isolated step subprocess (preemption / fail-fast).

    Args:
        step_run: The step run to stop.
    """
    with self._step_procs_lock:
        process = self._step_procs.get(step_run.id)
        if process is None or process.poll() is not None:
            return
        self._stopped_step_ids.add(step_run.id)
    if sys.platform == "win32":
        process.terminate()
        try:
            process.wait(timeout=10)
        except subprocess.TimeoutExpired:
            process.kill()
            process.wait()
        return
    try:
        pgid = os.getpgid(process.pid)
    except ProcessLookupError:
        return
    os.killpg(pgid, signal.SIGTERM)
    try:
        process.wait(timeout=10)
    except subprocess.TimeoutExpired:
        os.killpg(pgid, signal.SIGKILL)
        process.wait()
submit_dynamic_pipeline(snapshot: PipelineSnapshotResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Optional[SubmissionResult]

Submit a dynamic pipeline by launching the orchestrator image.

Parameters:

Name Type Description Default
snapshot PipelineSnapshotResponse

The pipeline snapshot.

required
stack Stack

The active stack.

required
environment Dict[str, str]

Environment variables for the orchestrator container.

required
placeholder_run Optional[PipelineRunResponse]

The placeholder run for the pipeline.

None

Returns:

Type Description
Optional[SubmissionResult]

None

Raises:

Type Description
RuntimeError

If the dynamic pipeline has a schedule, which is not supported.

Source code in src/zenml/integrations/ssh/orchestrators/ssh_orchestrator.py
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def submit_dynamic_pipeline(
    self,
    snapshot: "PipelineSnapshotResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Optional[SubmissionResult]:
    """Submit a dynamic pipeline by launching the orchestrator image.

    Args:
        snapshot: The pipeline snapshot.
        stack: The active stack.
        environment: Environment variables for the orchestrator
            container.
        placeholder_run: The placeholder run for the pipeline.

    Returns:
        None

    Raises:
        RuntimeError: If the dynamic pipeline has a schedule, which is not
            supported.
    """
    from zenml.pipelines.dynamic.entrypoint_configuration import (
        DynamicPipelineEntrypointConfiguration,
    )

    if snapshot.schedule:
        raise RuntimeError(
            "The SSH orchestrator does not support scheduled pipelines. "
            "Remove the schedule and trigger the pipeline directly (e.g. "
            "from your own cron job or CI), or use an orchestrator that "
            "supports scheduling."
        )

    assert placeholder_run is not None

    run_id = str(placeholder_run.id)
    env = dict(environment)
    env[ENV_ZENML_SSH_RUN_ID] = run_id
    settings = cast(SSHOrchestratorSettings, self.get_settings(snapshot))
    self._launch_container(
        run_id=run_id,
        image=self.get_image(snapshot=snapshot),
        settings=settings,
        entrypoint=(
            DynamicPipelineEntrypointConfiguration.get_entrypoint_command()
        ),
        command=(
            DynamicPipelineEntrypointConfiguration.get_entrypoint_arguments(
                snapshot_id=snapshot.id,
                run_id=run_id,
            )
        ),
        environment=env,
        stack=stack,
    )
    return None
submit_isolated_step(step_run_info: StepRunInfo, environment: Dict[str, str]) -> None

Launch one isolated step as a subprocess.

Parameters:

Name Type Description Default
step_run_info StepRunInfo

The step run information.

required
environment Dict[str, str]

Environment variables for the step process.

required
Source code in src/zenml/integrations/ssh/orchestrators/ssh_orchestrator.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
def submit_isolated_step(
    self, step_run_info: "StepRunInfo", environment: Dict[str, str]
) -> None:
    """Launch one isolated step as a subprocess.

    Args:
        step_run_info: The step run information.
        environment: Environment variables for the step process.
    """
    command, args = orchestrator_utils.get_step_entrypoint_command(
        invocation_id=step_run_info.pipeline_step_name,
        config=step_run_info.config,
        entrypoint_config_class=StepOperatorEntrypointConfiguration,
        snapshot_id=step_run_info.snapshot.id,
        step_run_id=str(step_run_info.step_run_id),
    )
    logger.info(
        "Launching isolated step `%s` as a subprocess.",
        step_run_info.pipeline_step_name,
    )
    # start_new_session puts the child in its own process group so
    # stop_isolated_step can signal the whole subtree.
    process = subprocess.Popen(
        command + args,
        env={**os.environ, **environment},
        start_new_session=True,
    )
    with self._step_procs_lock:
        self._step_procs[step_run_info.step_run_id] = process
        self._stopped_step_ids.discard(step_run_info.step_run_id)
submit_pipeline(snapshot: PipelineSnapshotResponse, stack: Stack, base_environment: Dict[str, str], step_environments: Dict[str, Dict[str, str]], placeholder_run: Optional[PipelineRunResponse] = None) -> Optional[SubmissionResult]

Submit a static pipeline as a remote Docker Compose DAG.

Parameters:

Name Type Description Default
snapshot PipelineSnapshotResponse

The pipeline snapshot.

required
stack Stack

The active stack.

required
base_environment Dict[str, str]

Environment shared by all steps (unused; the per-step environments already include it).

required
step_environments Dict[str, Dict[str, str]]

Per-step environment variables.

required
placeholder_run Optional[PipelineRunResponse]

The placeholder run for the pipeline.

None

Returns:

Type Description
Optional[SubmissionResult]

None

Raises:

Type Description
RuntimeError

If the pipeline has a schedule, which is not supported.

Source code in src/zenml/integrations/ssh/orchestrators/ssh_orchestrator.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
def submit_pipeline(
    self,
    snapshot: "PipelineSnapshotResponse",
    stack: "Stack",
    base_environment: Dict[str, str],
    step_environments: Dict[str, Dict[str, str]],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Optional[SubmissionResult]:
    """Submit a static pipeline as a remote Docker Compose DAG.

    Args:
        snapshot: The pipeline snapshot.
        stack: The active stack.
        base_environment: Environment shared by all steps (unused; the
            per-step environments already include it).
        step_environments: Per-step environment variables.
        placeholder_run: The placeholder run for the pipeline.

    Returns:
        None

    Raises:
        RuntimeError: If the pipeline has a schedule, which is not
            supported.
    """
    if snapshot.schedule:
        raise RuntimeError(
            "The SSH orchestrator does not support scheduled pipelines. "
            "Remove the schedule and trigger the pipeline directly (e.g. "
            "from your own cron job or CI), or use an orchestrator that "
            "supports scheduling."
        )
    assert placeholder_run is not None
    run_id = str(placeholder_run.id)
    services = {
        f"{snapshot.id}-{step_name}": self._step_service(
            snapshot=snapshot,
            step_name=step_name,
            step=step,
            run_id=run_id,
            step_environment=step_environments[step_name],
        )
        for step_name, step in snapshot.step_configurations.items()
    }
    self._launch_compose(
        run_id=run_id,
        compose={"services": services},
        stack=stack,
    )
    return None
Modules
ssh_orchestrator

SSH orchestrator implementation.

Classes
SSHOrchestrator(*args: Any, **kwargs: Any)

Bases: ContainerizedOrchestrator

Orchestrator that runs pipelines on a remote host via SSH + Docker.

Initialize the SSH orchestrator.

Parameters:

Name Type Description Default
*args Any

Forwarded to the base orchestrator.

()
**kwargs Any

Forwarded to the base orchestrator.

{}
Source code in src/zenml/integrations/ssh/orchestrators/ssh_orchestrator.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initialize the SSH orchestrator.

    Args:
        *args: Forwarded to the base orchestrator.
        **kwargs: Forwarded to the base orchestrator.
    """
    super().__init__(*args, **kwargs)
    # Isolated-step subprocesses keyed by step_run_id. Only populated
    # inside the remote orchestrator container (dynamic path); the lock
    # guards concurrent submit/poll/stop from the dynamic runner's
    # thread pool.
    self._step_procs: Dict[UUID, "subprocess.Popen[bytes]"] = {}
    self._stopped_step_ids: set[UUID] = set()
    self._step_procs_lock = threading.Lock()
Attributes
config: SSHOrchestratorConfig property

The orchestrator config.

Returns:

Type Description
SSHOrchestratorConfig

The orchestrator config.

settings_class: Optional[Type[BaseSettings]] property

Settings class for the SSH orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

supported_execution_modes: List[ExecutionMode] property

Execution modes supported by this orchestrator.

Returns:

Type Description
List[ExecutionMode]

The supported execution modes.

validator: Optional[StackValidator] property

Validates that the stack can run containerized pipelines.

Returns:

Type Description
Optional[StackValidator]

A stack validator requiring a container registry and image

Optional[StackValidator]

builder.

Methods:
get_isolated_step_status(step_run: StepRunResponse) -> ExecutionStatus

Report the status of an isolated step subprocess.

Parameters:

Name Type Description Default
step_run StepRunResponse

The step run to check.

required

Returns:

Type Description
ExecutionStatus

The execution status.

Source code in src/zenml/integrations/ssh/orchestrators/ssh_orchestrator.py
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
def get_isolated_step_status(
    self, step_run: "StepRunResponse"
) -> ExecutionStatus:
    """Report the status of an isolated step subprocess.

    Args:
        step_run: The step run to check.

    Returns:
        The execution status.
    """
    with self._step_procs_lock:
        process = self._step_procs.get(step_run.id)
        stopped = step_run.id in self._stopped_step_ids
    if process is None:
        return ExecutionStatus.RUNNING
    return_code = process.poll()
    if return_code is None:
        return ExecutionStatus.RUNNING
    if stopped:
        return ExecutionStatus.STOPPED
    if return_code == 0:
        return ExecutionStatus.COMPLETED
    return ExecutionStatus.FAILED
get_orchestrator_run_id() -> str

The run id, read from the execution environment.

Returns:

Type Description
str

The orchestrator run id.

Raises:

Type Description
RuntimeError

If called outside the remote execution environment (the env var is only set inside the launched containers).

Source code in src/zenml/integrations/ssh/orchestrators/ssh_orchestrator.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def get_orchestrator_run_id(self) -> str:
    """The run id, read from the execution environment.

    Returns:
        The orchestrator run id.

    Raises:
        RuntimeError: If called outside the remote execution environment
            (the env var is only set inside the launched containers).
    """
    try:
        return os.environ[ENV_ZENML_SSH_RUN_ID]
    except KeyError:
        raise RuntimeError(
            f"Unable to read run id: environment variable "
            f"{ENV_ZENML_SSH_RUN_ID} is not set. This is only available "
            "inside the remote execution environment."
        )
stop_isolated_step(step_run: StepRunResponse) -> None

Terminate an isolated step subprocess (preemption / fail-fast).

Parameters:

Name Type Description Default
step_run StepRunResponse

The step run to stop.

required
Source code in src/zenml/integrations/ssh/orchestrators/ssh_orchestrator.py
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
def stop_isolated_step(self, step_run: "StepRunResponse") -> None:
    """Terminate an isolated step subprocess (preemption / fail-fast).

    Args:
        step_run: The step run to stop.
    """
    with self._step_procs_lock:
        process = self._step_procs.get(step_run.id)
        if process is None or process.poll() is not None:
            return
        self._stopped_step_ids.add(step_run.id)
    if sys.platform == "win32":
        process.terminate()
        try:
            process.wait(timeout=10)
        except subprocess.TimeoutExpired:
            process.kill()
            process.wait()
        return
    try:
        pgid = os.getpgid(process.pid)
    except ProcessLookupError:
        return
    os.killpg(pgid, signal.SIGTERM)
    try:
        process.wait(timeout=10)
    except subprocess.TimeoutExpired:
        os.killpg(pgid, signal.SIGKILL)
        process.wait()
submit_dynamic_pipeline(snapshot: PipelineSnapshotResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Optional[SubmissionResult]

Submit a dynamic pipeline by launching the orchestrator image.

Parameters:

Name Type Description Default
snapshot PipelineSnapshotResponse

The pipeline snapshot.

required
stack Stack

The active stack.

required
environment Dict[str, str]

Environment variables for the orchestrator container.

required
placeholder_run Optional[PipelineRunResponse]

The placeholder run for the pipeline.

None

Returns:

Type Description
Optional[SubmissionResult]

None

Raises:

Type Description
RuntimeError

If the dynamic pipeline has a schedule, which is not supported.

Source code in src/zenml/integrations/ssh/orchestrators/ssh_orchestrator.py
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def submit_dynamic_pipeline(
    self,
    snapshot: "PipelineSnapshotResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Optional[SubmissionResult]:
    """Submit a dynamic pipeline by launching the orchestrator image.

    Args:
        snapshot: The pipeline snapshot.
        stack: The active stack.
        environment: Environment variables for the orchestrator
            container.
        placeholder_run: The placeholder run for the pipeline.

    Returns:
        None

    Raises:
        RuntimeError: If the dynamic pipeline has a schedule, which is not
            supported.
    """
    from zenml.pipelines.dynamic.entrypoint_configuration import (
        DynamicPipelineEntrypointConfiguration,
    )

    if snapshot.schedule:
        raise RuntimeError(
            "The SSH orchestrator does not support scheduled pipelines. "
            "Remove the schedule and trigger the pipeline directly (e.g. "
            "from your own cron job or CI), or use an orchestrator that "
            "supports scheduling."
        )

    assert placeholder_run is not None

    run_id = str(placeholder_run.id)
    env = dict(environment)
    env[ENV_ZENML_SSH_RUN_ID] = run_id
    settings = cast(SSHOrchestratorSettings, self.get_settings(snapshot))
    self._launch_container(
        run_id=run_id,
        image=self.get_image(snapshot=snapshot),
        settings=settings,
        entrypoint=(
            DynamicPipelineEntrypointConfiguration.get_entrypoint_command()
        ),
        command=(
            DynamicPipelineEntrypointConfiguration.get_entrypoint_arguments(
                snapshot_id=snapshot.id,
                run_id=run_id,
            )
        ),
        environment=env,
        stack=stack,
    )
    return None
submit_isolated_step(step_run_info: StepRunInfo, environment: Dict[str, str]) -> None

Launch one isolated step as a subprocess.

Parameters:

Name Type Description Default
step_run_info StepRunInfo

The step run information.

required
environment Dict[str, str]

Environment variables for the step process.

required
Source code in src/zenml/integrations/ssh/orchestrators/ssh_orchestrator.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
def submit_isolated_step(
    self, step_run_info: "StepRunInfo", environment: Dict[str, str]
) -> None:
    """Launch one isolated step as a subprocess.

    Args:
        step_run_info: The step run information.
        environment: Environment variables for the step process.
    """
    command, args = orchestrator_utils.get_step_entrypoint_command(
        invocation_id=step_run_info.pipeline_step_name,
        config=step_run_info.config,
        entrypoint_config_class=StepOperatorEntrypointConfiguration,
        snapshot_id=step_run_info.snapshot.id,
        step_run_id=str(step_run_info.step_run_id),
    )
    logger.info(
        "Launching isolated step `%s` as a subprocess.",
        step_run_info.pipeline_step_name,
    )
    # start_new_session puts the child in its own process group so
    # stop_isolated_step can signal the whole subtree.
    process = subprocess.Popen(
        command + args,
        env={**os.environ, **environment},
        start_new_session=True,
    )
    with self._step_procs_lock:
        self._step_procs[step_run_info.step_run_id] = process
        self._stopped_step_ids.discard(step_run_info.step_run_id)
submit_pipeline(snapshot: PipelineSnapshotResponse, stack: Stack, base_environment: Dict[str, str], step_environments: Dict[str, Dict[str, str]], placeholder_run: Optional[PipelineRunResponse] = None) -> Optional[SubmissionResult]

Submit a static pipeline as a remote Docker Compose DAG.

Parameters:

Name Type Description Default
snapshot PipelineSnapshotResponse

The pipeline snapshot.

required
stack Stack

The active stack.

required
base_environment Dict[str, str]

Environment shared by all steps (unused; the per-step environments already include it).

required
step_environments Dict[str, Dict[str, str]]

Per-step environment variables.

required
placeholder_run Optional[PipelineRunResponse]

The placeholder run for the pipeline.

None

Returns:

Type Description
Optional[SubmissionResult]

None

Raises:

Type Description
RuntimeError

If the pipeline has a schedule, which is not supported.

Source code in src/zenml/integrations/ssh/orchestrators/ssh_orchestrator.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
def submit_pipeline(
    self,
    snapshot: "PipelineSnapshotResponse",
    stack: "Stack",
    base_environment: Dict[str, str],
    step_environments: Dict[str, Dict[str, str]],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Optional[SubmissionResult]:
    """Submit a static pipeline as a remote Docker Compose DAG.

    Args:
        snapshot: The pipeline snapshot.
        stack: The active stack.
        base_environment: Environment shared by all steps (unused; the
            per-step environments already include it).
        step_environments: Per-step environment variables.
        placeholder_run: The placeholder run for the pipeline.

    Returns:
        None

    Raises:
        RuntimeError: If the pipeline has a schedule, which is not
            supported.
    """
    if snapshot.schedule:
        raise RuntimeError(
            "The SSH orchestrator does not support scheduled pipelines. "
            "Remove the schedule and trigger the pipeline directly (e.g. "
            "from your own cron job or CI), or use an orchestrator that "
            "supports scheduling."
        )
    assert placeholder_run is not None
    run_id = str(placeholder_run.id)
    services = {
        f"{snapshot.id}-{step_name}": self._step_service(
            snapshot=snapshot,
            step_name=step_name,
            step=step,
            run_id=run_id,
            step_environment=step_environments[step_name],
        )
        for step_name, step in snapshot.step_configurations.items()
    }
    self._launch_compose(
        run_id=run_id,
        compose={"services": services},
        stack=stack,
    )
    return None
Functions: Modules

ssh_client

SSH client wrapper around paramiko.

Classes
RemoteCommandResult(exit_code: int, stdout: str = '', stderr: str = '') dataclass

Result of a remote command execution.

Attributes:

Name Type Description
exit_code int

Process exit code (0 = success).

stdout str

Captured standard output.

stderr str

Captured standard error (empty when combined with stdout).

SSHClient(config: SSHConnectionConfigMixin) dataclass

Context-managed SSH client that wraps paramiko.

Attributes:

Name Type Description
config SSHConnectionConfigMixin

Connection configuration.

Methods:
exec(command: str, *, stream: bool = False, get_pty: bool = False, combine_stderr: bool = False) -> RemoteCommandResult

Execute a command on the remote host.

Parameters:

Name Type Description Default
command str

Shell command to execute.

required
stream bool

If True, stream stdout to the local logger in real time.

False
get_pty bool

Request a pseudo-terminal (improves log buffering for long-running commands).

False
combine_stderr bool

If True, merge stderr into stdout on the channel to avoid buffer deadlocks on long output.

False

Returns:

Type Description
RemoteCommandResult

RemoteCommandResult with exit code and captured output.

Source code in src/zenml/integrations/ssh/ssh_client.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
def exec(
    self,
    command: str,
    *,
    stream: bool = False,
    get_pty: bool = False,
    combine_stderr: bool = False,
) -> RemoteCommandResult:
    """Execute a command on the remote host.

    Args:
        command: Shell command to execute.
        stream: If True, stream stdout to the local logger in real time.
        get_pty: Request a pseudo-terminal (improves log buffering for
            long-running commands).
        combine_stderr: If True, merge stderr into stdout on the channel
            to avoid buffer deadlocks on long output.

    Returns:
        RemoteCommandResult with exit code and captured output.
    """
    client = self._get_client()

    if stream:
        return self._exec_streaming(
            client,
            command,
            get_pty=get_pty,
            combine_stderr=combine_stderr,
        )

    _, stdout_ch, stderr_ch = client.exec_command(command, get_pty=get_pty)  # nosec
    stdout_text = stdout_ch.read().decode("utf-8", errors="replace")
    stderr_text = stderr_ch.read().decode("utf-8", errors="replace")
    exit_code = stdout_ch.channel.recv_exit_status()

    return RemoteCommandResult(
        exit_code=exit_code,
        stdout=stdout_text,
        stderr=stderr_text,
    )
put_text(remote_path: str, content: str, *, mode: int = 384) -> None

Upload text content to a file on the remote host via SFTP.

Parameters:

Name Type Description Default
remote_path str

Absolute path on the remote host.

required
content str

Text content to write.

required
mode int

File permissions (default: owner read/write only).

384
Source code in src/zenml/integrations/ssh/ssh_client.py
373
374
375
376
377
378
379
380
381
382
383
384
385
386
def put_text(
    self, remote_path: str, content: str, *, mode: int = 0o600
) -> None:
    """Upload text content to a file on the remote host via SFTP.

    Args:
        remote_path: Absolute path on the remote host.
        content: Text content to write.
        mode: File permissions (default: owner read/write only).
    """
    with self.sftp() as sftp:
        with sftp.file(remote_path, "w") as f:
            f.write(content)
        sftp.chmod(remote_path, mode)
read_text(remote_path: str) -> str

Read a remote file's text contents via SFTP.

Parameters:

Name Type Description Default
remote_path str

Absolute path on the remote host.

required

Returns:

Type Description
str

The file contents as a string.

Raises:

Type Description
FileNotFoundError

If the file does not exist.

Source code in src/zenml/integrations/ssh/ssh_client.py
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
def read_text(self, remote_path: str) -> str:
    """Read a remote file's text contents via SFTP.

    Args:
        remote_path: Absolute path on the remote host.

    Returns:
        The file contents as a string.

    Raises:
        FileNotFoundError: If the file does not exist.
    """
    with self.sftp() as sftp:
        try:
            with sftp.file(remote_path, "r") as f:
                raw: bytes = f.read()
                return raw.decode("utf-8", errors="replace")
        except FileNotFoundError:
            raise
        except OSError as e:
            raise FileNotFoundError(
                f"Remote file not found or unreadable: {remote_path}"
            ) from e
sftp() -> Iterator[paramiko.SFTPClient]

Open an SFTP session over the active connection.

Yields:

Type Description
SFTPClient

An open paramiko SFTP client.

Source code in src/zenml/integrations/ssh/ssh_client.py
359
360
361
362
363
364
365
366
367
368
369
370
371
@contextmanager
def sftp(self) -> Iterator["paramiko.SFTPClient"]:
    """Open an SFTP session over the active connection.

    Yields:
        An open paramiko SFTP client.
    """
    client = self._get_client()
    sftp = client.open_sftp()
    try:
        yield sftp
    finally:
        sftp.close()
Functions:

step_operators

SSH step operator.

Classes
SSHStepOperator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, environment: Optional[Dict[str, str]] = None, secrets: Optional[List[UUID]] = None, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseStepOperator

Step operator that executes steps on a remote host via SSH + Docker.

Source code in src/zenml/stack/stack_component.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    environment: Optional[Dict[str, str]] = None,
    secrets: Optional[List[UUID]] = None,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        environment: Environment variables to set when running on this
            component.
        secrets: Secrets to set as environment variables when running on
            this component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.environment = environment or {}
    self.secrets = secrets or []
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: SSHStepOperatorConfig property

Get the SSH step operator configuration.

Returns:

Type Description
SSHStepOperatorConfig

The SSH step operator configuration.

settings_class: Optional[Type[BaseSettings]] property

Get the settings class for the SSH step operator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The SSH step operator settings class.

validator: Optional[StackValidator] property

Validate that the stack meets remote execution requirements.

Returns:

Type Description
Optional[StackValidator]

A stack validator.

Methods:
cancel(step_run: StepRunResponse) -> None

Cancel a submitted step by stopping its Docker container.

Parameters:

Name Type Description Default
step_run StepRunResponse

The step run to cancel.

required
Source code in src/zenml/integrations/ssh/step_operators/ssh_step_operator.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
def cancel(self, step_run: "StepRunResponse") -> None:
    """Cancel a submitted step by stopping its Docker container.

    Args:
        step_run: The step run to cancel.
    """
    container_name = step_run.run_metadata.get(
        SSH_CONTAINER_NAME_METADATA_KEY
    )
    if container_name is None:
        logger.warning(
            "No container name recorded for step `%s`", step_run.name
        )
        return

    docker = shlex.quote(self.config.docker_binary)
    name = shlex.quote(str(container_name))
    try:
        with SSHClient(self.config) as ssh:
            ssh.exec(f"{docker} stop {name}")
    except Exception:
        logger.warning(
            "Canceling container %s for step `%s` failed.",
            container_name,
            step_run.name,
        )
get_docker_builds(snapshot: PipelineSnapshotBase) -> List[BuildConfiguration]

Declare Docker builds needed for steps using this operator.

Parameters:

Name Type Description Default
snapshot PipelineSnapshotBase

The pipeline snapshot.

required

Returns:

Type Description
List[BuildConfiguration]

A list of Docker build configurations, one per step that uses

List[BuildConfiguration]

this step operator.

Source code in src/zenml/integrations/ssh/step_operators/ssh_step_operator.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def get_docker_builds(
    self, snapshot: "PipelineSnapshotBase"
) -> List["BuildConfiguration"]:
    """Declare Docker builds needed for steps using this operator.

    Args:
        snapshot: The pipeline snapshot.

    Returns:
        A list of Docker build configurations, one per step that uses
        this step operator.
    """
    builds = []
    for step_name, step in snapshot.step_configurations.items():
        if step.config.uses_step_operator(self.name):
            build = BuildConfiguration(
                key=SSH_STEP_OPERATOR_DOCKER_IMAGE_KEY,
                settings=step.config.docker_settings,
                step_name=step_name,
            )
            builds.append(build)
    return builds
get_status(step_run: StepRunResponse) -> ExecutionStatus

Get the execution status of a submitted step.

Parameters:

Name Type Description Default
step_run StepRunResponse

The step run to check.

required

Returns:

Type Description
ExecutionStatus

The current execution status.

Source code in src/zenml/integrations/ssh/step_operators/ssh_step_operator.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def get_status(self, step_run: "StepRunResponse") -> ExecutionStatus:
    """Get the execution status of a submitted step.

    Args:
        step_run: The step run to check.

    Returns:
        The current execution status.
    """
    container_name = step_run.run_metadata.get(
        SSH_CONTAINER_NAME_METADATA_KEY
    )
    if container_name is None:
        logger.warning(
            "No container name recorded for step `%s`", step_run.name
        )
        return ExecutionStatus.FAILED

    docker = shlex.quote(self.config.docker_binary)
    name = shlex.quote(str(container_name))
    with SSHClient(self.config) as ssh:
        result = ssh.exec(
            f"{docker} inspect --format "
            f"'{{{{.State.Status}}}} {{{{.State.ExitCode}}}}' {name}"
        )
        if result.exit_code != 0:
            # The container is gone (never created, or pruned). Without it
            # we cannot prove success, so report FAILED
            logger.debug(
                "docker inspect failed for container %s: %s",
                container_name,
                result.stderr.strip(),
            )
            return ExecutionStatus.FAILED

        state, _, exit_code = result.stdout.strip().partition(" ")
        if state in ("running", "created", "restarting", "paused"):
            return ExecutionStatus.RUNNING
        if state == "exited" and exit_code.strip() == "0":
            return ExecutionStatus.COMPLETED
        return ExecutionStatus.FAILED
submit(info: StepRunInfo, entrypoint_command: List[str], environment: Dict[str, str]) -> None

Submit a step for asynchronous execution on the remote host.

Parameters:

Name Type Description Default
info StepRunInfo

The step run information.

required
entrypoint_command List[str]

The entrypoint command for the step.

required
environment Dict[str, str]

Environment variables for the step container.

required

Raises:

Type Description
RuntimeError

If the image pull or container start fails.

Source code in src/zenml/integrations/ssh/step_operators/ssh_step_operator.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def submit(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
    environment: Dict[str, str],
) -> None:
    """Submit a step for asynchronous execution on the remote host.

    Args:
        info: The step run information.
        entrypoint_command: The entrypoint command for the step.
        environment: Environment variables for the step container.

    Raises:
        RuntimeError: If the image pull or container start fails.
    """
    settings = cast(SSHStepOperatorSettings, self.get_settings(info))
    image_name = info.get_image(key=SSH_STEP_OPERATOR_DOCKER_IMAGE_KEY)

    step_run_id = str(info.step_run_id)
    container_name = f"zenml-step-{step_run_id}"[:128]
    remote_workdir = self.config.remote_workdir
    env_file_path = f"{remote_workdir}/env-{step_run_id}"
    env_content = serialize_env_for_docker_env_file(environment)
    docker = shlex.quote(self.config.docker_binary)

    run_command = build_docker_run_command(
        docker_binary=self.config.docker_binary,
        image=image_name,
        args=entrypoint_command,
        container_name=container_name,
        env_file=env_file_path,
        gpu_indices=settings.gpu_indices,
        mounts=settings.mounts,
        extra_args=settings.docker_run_args,
    )

    logger.info(
        "Submitting step '%s' to %s:%d (image: %s, container: %s)",
        info.pipeline_step_name,
        self.config.hostname,
        self.config.port,
        image_name,
        container_name,
    )
    if settings.gpu_indices:
        logger.info("GPU indices: %s", settings.gpu_indices)

    cleanup_command = None
    if self.config.cleanup_old_files:
        cleanup_command = (
            f"find {shlex.quote(remote_workdir)} -maxdepth 1 "
            "-name 'env-*' -ctime +7 -delete"
        )
    container_registry = None
    if self.config.authenticate_docker:
        from zenml.client import Client

        container_registry = Client().active_stack.container_registry

    with SSHClient(self.config) as ssh:
        prepare_remote_workdir(
            ssh=ssh,
            docker_binary=self.config.docker_binary,
            workdir=remote_workdir,
            minimum_free_disk_gb=self.config.minimum_free_disk_gb,
            cleanup_command=cleanup_command,
            container_registry=container_registry,
        )
        ssh.put_text(env_file_path, env_content, mode=0o600)

        pull = ssh.exec(f"{docker} pull {shlex.quote(image_name)}")
        if pull.exit_code != 0:
            ssh.exec(f"rm -f {shlex.quote(env_file_path)}")
            raise RuntimeError(
                f"Failed to pull image '{image_name}' on "
                f"{self.config.hostname}: {pull.stderr.strip()}"
            )

        result = ssh.exec(run_command)
        # Docker reads the env-file at container creation; delete it right
        # away so the step's secrets do not linger on the remote host.
        ssh.exec(f"rm -f {shlex.quote(env_file_path)}")
        if result.exit_code != 0:
            raise RuntimeError(
                f"Failed to start container '{container_name}' on "
                f"{self.config.hostname}: {result.stderr.strip()}"
            )

    metadata: Dict[str, "MetadataType"] = {
        SSH_CONTAINER_NAME_METADATA_KEY: container_name,
    }
    publish_step_run_metadata(
        step_run_id=info.step_run_id,
        step_run_metadata={self.id: metadata},
    )
    info.step_run.run_metadata.update(metadata)
Modules
ssh_step_operator

SSH step operator implementation.

Classes
SSHStepOperator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, environment: Optional[Dict[str, str]] = None, secrets: Optional[List[UUID]] = None, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseStepOperator

Step operator that executes steps on a remote host via SSH + Docker.

Source code in src/zenml/stack/stack_component.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    environment: Optional[Dict[str, str]] = None,
    secrets: Optional[List[UUID]] = None,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        environment: Environment variables to set when running on this
            component.
        secrets: Secrets to set as environment variables when running on
            this component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.environment = environment or {}
    self.secrets = secrets or []
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: SSHStepOperatorConfig property

Get the SSH step operator configuration.

Returns:

Type Description
SSHStepOperatorConfig

The SSH step operator configuration.

settings_class: Optional[Type[BaseSettings]] property

Get the settings class for the SSH step operator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The SSH step operator settings class.

validator: Optional[StackValidator] property

Validate that the stack meets remote execution requirements.

Returns:

Type Description
Optional[StackValidator]

A stack validator.

Methods:
cancel(step_run: StepRunResponse) -> None

Cancel a submitted step by stopping its Docker container.

Parameters:

Name Type Description Default
step_run StepRunResponse

The step run to cancel.

required
Source code in src/zenml/integrations/ssh/step_operators/ssh_step_operator.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
def cancel(self, step_run: "StepRunResponse") -> None:
    """Cancel a submitted step by stopping its Docker container.

    Args:
        step_run: The step run to cancel.
    """
    container_name = step_run.run_metadata.get(
        SSH_CONTAINER_NAME_METADATA_KEY
    )
    if container_name is None:
        logger.warning(
            "No container name recorded for step `%s`", step_run.name
        )
        return

    docker = shlex.quote(self.config.docker_binary)
    name = shlex.quote(str(container_name))
    try:
        with SSHClient(self.config) as ssh:
            ssh.exec(f"{docker} stop {name}")
    except Exception:
        logger.warning(
            "Canceling container %s for step `%s` failed.",
            container_name,
            step_run.name,
        )
get_docker_builds(snapshot: PipelineSnapshotBase) -> List[BuildConfiguration]

Declare Docker builds needed for steps using this operator.

Parameters:

Name Type Description Default
snapshot PipelineSnapshotBase

The pipeline snapshot.

required

Returns:

Type Description
List[BuildConfiguration]

A list of Docker build configurations, one per step that uses

List[BuildConfiguration]

this step operator.

Source code in src/zenml/integrations/ssh/step_operators/ssh_step_operator.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def get_docker_builds(
    self, snapshot: "PipelineSnapshotBase"
) -> List["BuildConfiguration"]:
    """Declare Docker builds needed for steps using this operator.

    Args:
        snapshot: The pipeline snapshot.

    Returns:
        A list of Docker build configurations, one per step that uses
        this step operator.
    """
    builds = []
    for step_name, step in snapshot.step_configurations.items():
        if step.config.uses_step_operator(self.name):
            build = BuildConfiguration(
                key=SSH_STEP_OPERATOR_DOCKER_IMAGE_KEY,
                settings=step.config.docker_settings,
                step_name=step_name,
            )
            builds.append(build)
    return builds
get_status(step_run: StepRunResponse) -> ExecutionStatus

Get the execution status of a submitted step.

Parameters:

Name Type Description Default
step_run StepRunResponse

The step run to check.

required

Returns:

Type Description
ExecutionStatus

The current execution status.

Source code in src/zenml/integrations/ssh/step_operators/ssh_step_operator.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def get_status(self, step_run: "StepRunResponse") -> ExecutionStatus:
    """Get the execution status of a submitted step.

    Args:
        step_run: The step run to check.

    Returns:
        The current execution status.
    """
    container_name = step_run.run_metadata.get(
        SSH_CONTAINER_NAME_METADATA_KEY
    )
    if container_name is None:
        logger.warning(
            "No container name recorded for step `%s`", step_run.name
        )
        return ExecutionStatus.FAILED

    docker = shlex.quote(self.config.docker_binary)
    name = shlex.quote(str(container_name))
    with SSHClient(self.config) as ssh:
        result = ssh.exec(
            f"{docker} inspect --format "
            f"'{{{{.State.Status}}}} {{{{.State.ExitCode}}}}' {name}"
        )
        if result.exit_code != 0:
            # The container is gone (never created, or pruned). Without it
            # we cannot prove success, so report FAILED
            logger.debug(
                "docker inspect failed for container %s: %s",
                container_name,
                result.stderr.strip(),
            )
            return ExecutionStatus.FAILED

        state, _, exit_code = result.stdout.strip().partition(" ")
        if state in ("running", "created", "restarting", "paused"):
            return ExecutionStatus.RUNNING
        if state == "exited" and exit_code.strip() == "0":
            return ExecutionStatus.COMPLETED
        return ExecutionStatus.FAILED
submit(info: StepRunInfo, entrypoint_command: List[str], environment: Dict[str, str]) -> None

Submit a step for asynchronous execution on the remote host.

Parameters:

Name Type Description Default
info StepRunInfo

The step run information.

required
entrypoint_command List[str]

The entrypoint command for the step.

required
environment Dict[str, str]

Environment variables for the step container.

required

Raises:

Type Description
RuntimeError

If the image pull or container start fails.

Source code in src/zenml/integrations/ssh/step_operators/ssh_step_operator.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def submit(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
    environment: Dict[str, str],
) -> None:
    """Submit a step for asynchronous execution on the remote host.

    Args:
        info: The step run information.
        entrypoint_command: The entrypoint command for the step.
        environment: Environment variables for the step container.

    Raises:
        RuntimeError: If the image pull or container start fails.
    """
    settings = cast(SSHStepOperatorSettings, self.get_settings(info))
    image_name = info.get_image(key=SSH_STEP_OPERATOR_DOCKER_IMAGE_KEY)

    step_run_id = str(info.step_run_id)
    container_name = f"zenml-step-{step_run_id}"[:128]
    remote_workdir = self.config.remote_workdir
    env_file_path = f"{remote_workdir}/env-{step_run_id}"
    env_content = serialize_env_for_docker_env_file(environment)
    docker = shlex.quote(self.config.docker_binary)

    run_command = build_docker_run_command(
        docker_binary=self.config.docker_binary,
        image=image_name,
        args=entrypoint_command,
        container_name=container_name,
        env_file=env_file_path,
        gpu_indices=settings.gpu_indices,
        mounts=settings.mounts,
        extra_args=settings.docker_run_args,
    )

    logger.info(
        "Submitting step '%s' to %s:%d (image: %s, container: %s)",
        info.pipeline_step_name,
        self.config.hostname,
        self.config.port,
        image_name,
        container_name,
    )
    if settings.gpu_indices:
        logger.info("GPU indices: %s", settings.gpu_indices)

    cleanup_command = None
    if self.config.cleanup_old_files:
        cleanup_command = (
            f"find {shlex.quote(remote_workdir)} -maxdepth 1 "
            "-name 'env-*' -ctime +7 -delete"
        )
    container_registry = None
    if self.config.authenticate_docker:
        from zenml.client import Client

        container_registry = Client().active_stack.container_registry

    with SSHClient(self.config) as ssh:
        prepare_remote_workdir(
            ssh=ssh,
            docker_binary=self.config.docker_binary,
            workdir=remote_workdir,
            minimum_free_disk_gb=self.config.minimum_free_disk_gb,
            cleanup_command=cleanup_command,
            container_registry=container_registry,
        )
        ssh.put_text(env_file_path, env_content, mode=0o600)

        pull = ssh.exec(f"{docker} pull {shlex.quote(image_name)}")
        if pull.exit_code != 0:
            ssh.exec(f"rm -f {shlex.quote(env_file_path)}")
            raise RuntimeError(
                f"Failed to pull image '{image_name}' on "
                f"{self.config.hostname}: {pull.stderr.strip()}"
            )

        result = ssh.exec(run_command)
        # Docker reads the env-file at container creation; delete it right
        # away so the step's secrets do not linger on the remote host.
        ssh.exec(f"rm -f {shlex.quote(env_file_path)}")
        if result.exit_code != 0:
            raise RuntimeError(
                f"Failed to start container '{container_name}' on "
                f"{self.config.hostname}: {result.stderr.strip()}"
            )

    metadata: Dict[str, "MetadataType"] = {
        SSH_CONTAINER_NAME_METADATA_KEY: container_name,
    }
    publish_step_run_metadata(
        step_run_id=info.step_run_id,
        step_run_metadata={self.id: metadata},
    )
    info.step_run.run_metadata.update(metadata)
Functions:

utils

SSH integration utilities.

Classes
Functions:
build_compose_gpu_deploy(gpu_indices: Sequence[int]) -> Dict[str, Any]

Build the Compose deploy section reserving the given GPUs.

Parameters:

Name Type Description Default
gpu_indices Sequence[int]

GPU device indices to reserve.

required

Returns:

Type Description
Dict[str, Any]

A Compose service deploy section.

Source code in src/zenml/integrations/ssh/utils.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def build_compose_gpu_deploy(gpu_indices: Sequence[int]) -> Dict[str, Any]:
    """Build the Compose deploy section reserving the given GPUs.

    Args:
        gpu_indices: GPU device indices to reserve.

    Returns:
        A Compose service deploy section.
    """
    device_ids = [str(i) for i in normalize_gpu_indices(gpu_indices)]
    return {
        "resources": {
            "reservations": {
                "devices": [
                    {
                        "driver": "nvidia",
                        "device_ids": device_ids,
                        "capabilities": ["gpu"],
                    }
                ]
            }
        }
    }
build_docker_gpus_flag(gpu_indices: Sequence[int]) -> str

Build the value for the Docker --gpus flag.

Parameters:

Name Type Description Default
gpu_indices Sequence[int]

Normalized (sorted, unique) GPU device indices.

required

Returns:

Type Description
str

String suitable for docker run --gpus '<value>', e.g.

str

'"device=0,2"' (outer single-quotes added by the caller).

Raises:

Type Description
ValueError

If no GPU indices are provided.

Source code in src/zenml/integrations/ssh/utils.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def build_docker_gpus_flag(gpu_indices: Sequence[int]) -> str:
    """Build the value for the Docker --gpus flag.

    Args:
        gpu_indices: Normalized (sorted, unique) GPU device indices.

    Returns:
        String suitable for ``docker run --gpus '<value>'``, e.g.
        ``'"device=0,2"'`` (outer single-quotes added by the caller).

    Raises:
        ValueError: If no GPU indices are provided.
    """
    if not gpu_indices:
        raise ValueError("gpu_indices must contain at least one index.")
    device_list = ",".join(str(i) for i in gpu_indices)
    return f'"device={device_list}"'
build_docker_run_command(*, docker_binary: str, image: str, args: Sequence[str], container_name: str, env_file: Optional[str] = None, network: Optional[str] = None, entrypoint: Optional[str] = None, gpu_indices: Optional[Sequence[int]] = None, mounts: Optional[Mapping[str, str]] = None, extra_args: Optional[Sequence[str]] = None) -> str

Build a detached docker run command.

Parameters:

Name Type Description Default
docker_binary str

Path to the Docker binary on the remote host.

required
image str

Fully-qualified image to run.

required
args Sequence[str]

Arguments passed after the image.

required
container_name str

Container name.

required
env_file Optional[str]

Path to a Docker env-file.

None
network Optional[str]

Docker network mode.

None
entrypoint Optional[str]

Entrypoint override.

None
gpu_indices Optional[Sequence[int]]

GPU device indices to attach, or None for CPU.

None
mounts Optional[Mapping[str, str]]

Host-path to container-path bind mounts.

None
extra_args Optional[Sequence[str]]

Additional docker run arguments, inserted before the image name.

None

Returns:

Type Description
str

The full docker run -d command string.

Source code in src/zenml/integrations/ssh/utils.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def build_docker_run_command(
    *,
    docker_binary: str,
    image: str,
    args: Sequence[str],
    container_name: str,
    env_file: Optional[str] = None,
    network: Optional[str] = None,
    entrypoint: Optional[str] = None,
    gpu_indices: Optional[Sequence[int]] = None,
    mounts: Optional[Mapping[str, str]] = None,
    extra_args: Optional[Sequence[str]] = None,
) -> str:
    """Build a detached ``docker run`` command.

    Args:
        docker_binary: Path to the Docker binary on the remote host.
        image: Fully-qualified image to run.
        args: Arguments passed after the image.
        container_name: Container name.
        env_file: Path to a Docker env-file.
        network: Docker network mode.
        entrypoint: Entrypoint override.
        gpu_indices: GPU device indices to attach, or None for CPU.
        mounts: Host-path to container-path bind mounts.
        extra_args: Additional `docker run` arguments, inserted before the
            image name.

    Returns:
        The full ``docker run -d`` command string.
    """
    parts: List[str] = [
        shlex.quote(docker_binary),
        "run",
        "-d",
        "--name",
        shlex.quote(container_name),
    ]
    if network:
        parts += ["--network", shlex.quote(network)]
    if env_file:
        parts += ["--env-file", shlex.quote(env_file)]
    if gpu_indices:
        # The flag value carries its own quotes and is interpreted by the
        # remote shell, e.g. --gpus "device=0,1".
        parts += [
            "--gpus",
            build_docker_gpus_flag(normalize_gpu_indices(gpu_indices)),
        ]
    for mapping in build_mount_mappings(mounts or {}):
        parts += ["-v", shlex.quote(mapping)]
    if extra_args:
        parts += [shlex.quote(arg) for arg in extra_args]
    if entrypoint:
        parts += ["--entrypoint", shlex.quote(entrypoint)]
    parts.append(shlex.quote(image))
    parts += [shlex.quote(arg) for arg in args]
    return " ".join(parts)
build_mount_mappings(mounts: Mapping[str, str]) -> List[str]

Build validated host:container bind-mount mappings.

Parameters:

Name Type Description Default
mounts Mapping[str, str]

Mapping of host paths to container paths.

required

Returns:

Type Description
List[str]

List of validated "host:container" mapping strings.

Source code in src/zenml/integrations/ssh/utils.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def build_mount_mappings(mounts: Mapping[str, str]) -> List[str]:
    """Build validated host:container bind-mount mappings.

    Args:
        mounts: Mapping of host paths to container paths.

    Returns:
        List of validated "host:container" mapping strings.
    """
    return [
        f"{validate_mount_path(host)}:{validate_mount_path(container)}"
        for host, container in mounts.items()
    ]
check_remote_disk(ssh: SSHClient, remote_path: str, minimum_free_disk_gb: float) -> None

Fail fast if the remote host is low on disk for the given path.

Parameters:

Name Type Description Default
ssh SSHClient

The open SSH connection.

required
remote_path str

An existing remote path on the filesystem to check.

required
minimum_free_disk_gb float

Minimum free space required, in GB. Values of 0 or below disable the check.

required

Raises:

Type Description
RuntimeError

If free disk is below minimum_free_disk_gb.

Source code in src/zenml/integrations/ssh/utils.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
def check_remote_disk(
    ssh: "SSHClient",
    remote_path: str,
    minimum_free_disk_gb: float,
) -> None:
    """Fail fast if the remote host is low on disk for the given path.

    Args:
        ssh: The open SSH connection.
        remote_path: An existing remote path on the filesystem to check.
        minimum_free_disk_gb: Minimum free space required, in GB. Values of 0
            or below disable the check.

    Raises:
        RuntimeError: If free disk is below minimum_free_disk_gb.
    """
    if minimum_free_disk_gb <= 0:
        return
    free_bytes = get_free_disk_bytes(ssh, remote_path)
    if free_bytes is None:
        # SFTP statvfs unsupported on this host; skip rather than block.
        return
    free_gb = free_bytes / (1024**3)
    if free_gb < minimum_free_disk_gb:
        raise RuntimeError(
            f"The remote host has only {free_gb:.1f} GB free on "
            f"the filesystem holding {remote_path}, below the required "
            f"{minimum_free_disk_gb:.1f} GB. The SSH integration stores a "
            f"Docker image per pipeline version, which accumulates over time. "
            f"Free space on the host (e.g. `docker image prune -a`), grow the "
            f"disk, or lower `minimum_free_disk_gb`."
        )
docker_login(ssh: SSHClient, container_registry: BaseContainerRegistry, docker_binary: str) -> None

Log the remote Docker into the container registry.

Parameters:

Name Type Description Default
ssh SSHClient

The open SSH connection.

required
container_registry BaseContainerRegistry

The container registry to authenticate against.

required
docker_binary str

Path to the Docker binary on the remote host.

required

Raises:

Type Description
RuntimeError

If registry credentials are missing or the remote docker login fails.

Source code in src/zenml/integrations/ssh/utils.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
def docker_login(
    ssh: "SSHClient",
    container_registry: "BaseContainerRegistry",
    docker_binary: str,
) -> None:
    """Log the remote Docker into the container registry.

    Args:
        ssh: The open SSH connection.
        container_registry: The container registry to authenticate against.
        docker_binary: Path to the Docker binary on the remote host.

    Raises:
        RuntimeError: If registry credentials are missing or the remote
            `docker login` fails.
    """
    if not container_registry.credentials:
        raise RuntimeError(
            "The container registry in the stack has no credentials or "
            "service connector configured, but `authenticate_docker` "
            "is enabled. Configure registry credentials or disable "
            "`authenticate_docker`."
        )
    username, password = container_registry.credentials
    # --password-stdin keeps the password out of docker's argv. The remote
    # shell still handles it briefly, so users must opt in explicitly.
    command = (
        f"printf %s {shlex.quote(password)} | "
        f"{shlex.quote(docker_binary)} login -u {shlex.quote(username)} "
        f"--password-stdin {shlex.quote(container_registry.config.uri)}"
    )
    result = ssh.exec(command)
    if result.exit_code != 0:
        raise RuntimeError(
            f"`docker login` failed on host: {result.stderr or result.stdout}"
        )
get_free_disk_bytes(ssh: SSHClient, remote_path: str) -> Optional[int]

Get free disk space on the filesystem holding a remote path.

Parameters:

Name Type Description Default
ssh SSHClient

An open SSH connection.

required
remote_path str

An existing path on the remote host.

required

Returns:

Type Description
Optional[int]

Free bytes available to a non-root user, or None if the SFTP server

Optional[int]

does not support statvfs.

Source code in src/zenml/integrations/ssh/utils.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def get_free_disk_bytes(ssh: "SSHClient", remote_path: str) -> Optional[int]:
    """Get free disk space on the filesystem holding a remote path.

    Args:
        ssh: An open SSH connection.
        remote_path: An existing path on the remote host.

    Returns:
        Free bytes available to a non-root user, or None if the SFTP server
        does not support statvfs.
    """
    with ssh.sftp() as sftp:
        try:
            # statvfs is a paramiko SFTP extension present at runtime but
            # missing from the type stubs.
            stats = sftp.statvfs(remote_path)  # type: ignore[attr-defined]
            return int(stats.f_bavail * stats.f_frsize)
        except Exception:
            logger.debug(
                "Could not query free disk space on the remote host via "
                "SFTP statvfs.",
                exc_info=True,
            )
            return None
normalize_gpu_indices(indices: Sequence[int]) -> List[int]

Normalize GPU indices into a sorted, unique, validated list.

Parameters:

Name Type Description Default
indices Sequence[int]

GPU device indices (may be unsorted or contain duplicates).

required

Returns:

Type Description
List[int]

Sorted unique list of non-negative GPU indices.

Raises:

Type Description
ValueError

If any index is negative.

Source code in src/zenml/integrations/ssh/utils.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def normalize_gpu_indices(indices: Sequence[int]) -> List[int]:
    """Normalize GPU indices into a sorted, unique, validated list.

    Args:
        indices: GPU device indices (may be unsorted or contain duplicates).

    Returns:
        Sorted unique list of non-negative GPU indices.

    Raises:
        ValueError: If any index is negative.
    """
    for idx in indices:
        if idx < 0:
            raise ValueError(
                f"GPU index must be non-negative, got {idx}. "
                "Use the device indices reported by nvidia-smi on the "
                "remote host."
            )
    return sorted(set(indices))
prepare_remote_workdir(ssh: SSHClient, *, docker_binary: str, workdir: str, minimum_free_disk_gb: float, cleanup_command: Optional[str] = None, container_registry: Optional[BaseContainerRegistry] = None) -> None

Run preflight checks and prepare a remote working directory.

Parameters:

Name Type Description Default
ssh SSHClient

The open SSH connection.

required
docker_binary str

Path to the Docker binary on the remote host.

required
workdir str

Remote directory to create and disk-check.

required
minimum_free_disk_gb float

Minimum free space required, in GB.

required
cleanup_command Optional[str]

Command removing stale files, or None to skip.

None
container_registry Optional[BaseContainerRegistry]

Container registry to authenticate Docker against, or None to skip.

None

Raises:

Type Description
RuntimeError

If a required tool is missing or a remote command fails.

Source code in src/zenml/integrations/ssh/utils.py
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
def prepare_remote_workdir(
    ssh: "SSHClient",
    *,
    docker_binary: str,
    workdir: str,
    minimum_free_disk_gb: float,
    cleanup_command: Optional[str] = None,
    container_registry: Optional["BaseContainerRegistry"] = None,
) -> None:
    """Run preflight checks and prepare a remote working directory.

    Args:
        ssh: The open SSH connection.
        docker_binary: Path to the Docker binary on the remote host.
        workdir: Remote directory to create and disk-check.
        minimum_free_disk_gb: Minimum free space required, in GB.
        cleanup_command: Command removing stale files, or None to skip.
        container_registry: Container registry to authenticate Docker against,
            or None to skip.

    Raises:
        RuntimeError: If a required tool is missing or a remote command fails.
    """
    run_preflight_checks(ssh=ssh, docker_binary=docker_binary)
    mkdir = ssh.exec(f"mkdir -p {shlex.quote(workdir)}")
    if mkdir.exit_code != 0:
        raise RuntimeError(
            f"Failed to create remote directory {workdir} on "
            f"host: {mkdir.stderr}"
        )
    check_remote_disk(
        ssh=ssh,
        remote_path=workdir,
        minimum_free_disk_gb=minimum_free_disk_gb,
    )
    if cleanup_command is not None:
        cleanup = ssh.exec(cleanup_command)
        if cleanup.exit_code != 0:
            logger.warning(
                "Failed to clean old SSH files on host: %s",
                cleanup.stderr or cleanup.stdout,
            )
    if container_registry is not None:
        docker_login(
            ssh=ssh,
            container_registry=container_registry,
            docker_binary=docker_binary,
        )
run_preflight_checks(ssh: SSHClient, docker_binary: str) -> None

Verify the remote host has the required tools installed.

Parameters:

Name Type Description Default
ssh SSHClient

The open SSH connection.

required
docker_binary str

Path to the Docker binary on the remote host.

required

Raises:

Type Description
RuntimeError

If a required tool is missing.

Source code in src/zenml/integrations/ssh/utils.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def run_preflight_checks(ssh: "SSHClient", docker_binary: str) -> None:
    """Verify the remote host has the required tools installed.

    Args:
        ssh: The open SSH connection.
        docker_binary: Path to the Docker binary on the remote host.

    Raises:
        RuntimeError: If a required tool is missing.
    """
    result = ssh.exec(f"{shlex.quote(docker_binary)} --version")
    if result.exit_code != 0:
        raise RuntimeError(
            f"Preflight check failed: Docker is not available on "
            f"host. Install Docker on the remote host: "
            f"https://docs.docker.com/engine/install/\n"
            f"stderr: {result.stderr.strip()}"
        )
    logger.debug("Preflight OK: Docker -> %s", result.stdout.strip())
serialize_env_for_docker_env_file(env: Mapping[str, str]) -> str

Serialize environment variables into Docker --env-file format.

Parameters:

Name Type Description Default
env Mapping[str, str]

Mapping of environment variable names to values.

required

Returns:

Type Description
str

String content suitable for writing to a Docker env-file.

Raises:

Type Description
ValueError

If any key or value contains forbidden characters.

Source code in src/zenml/integrations/ssh/utils.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def serialize_env_for_docker_env_file(env: Mapping[str, str]) -> str:
    """Serialize environment variables into Docker --env-file format.

    Args:
        env: Mapping of environment variable names to values.

    Returns:
        String content suitable for writing to a Docker env-file.

    Raises:
        ValueError: If any key or value contains forbidden characters.
    """
    lines: List[str] = []
    for key, value in sorted(env.items()):
        if "\x00" in key or "=" in key:
            raise ValueError(
                f"Environment variable name {key!r} contains forbidden "
                "characters (NUL or '='). This would corrupt the env-file."
            )
        if "\x00" in value or "\n" in value:
            raise ValueError(
                f"Environment variable {key!r} has a value containing "
                "a newline or NUL character. Docker env-file format is "
                "line-based and cannot represent multi-line values safely. "
                "Consider base64-encoding the value."
            )
        lines.append(f"{key}={value}")
    return "\n".join(lines) + "\n" if lines else ""
validate_mount_path(path: str) -> str

Validate a bind-mount path against injection.

Parameters:

Name Type Description Default
path str

The host or container path.

required

Returns:

Type Description
str

The validated path.

Raises:

Type Description
RuntimeError

If the path is not a plain absolute path.

Source code in src/zenml/integrations/ssh/utils.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def validate_mount_path(path: str) -> str:
    """Validate a bind-mount path against injection.

    Args:
        path: The host or container path.

    Returns:
        The validated path.

    Raises:
        RuntimeError: If the path is not a plain absolute path.
    """
    if not _MOUNT_PATH_PATTERN.match(path):
        raise RuntimeError(
            f"Invalid mount path {path!r}: only absolute POSIX or Windows "
            "paths without ':' are allowed."
        )
    return path