Skip to content

Hyperai

zenml.integrations.hyperai

Initialization of the HyperAI integration.

Attributes

HYPERAI = 'hyperai' module-attribute

HYPERAI_CONNECTOR_TYPE = 'hyperai' module-attribute

HYPERAI_RESOURCE_TYPE = 'hyperai-instance' module-attribute

Classes

Flavor

Class for ZenML Flavors.

Attributes
config_class: Type[StackComponentConfig] abstractmethod property

Returns StackComponentConfig config class.

Returns:

Type Description
Type[StackComponentConfig]

The config class.

config_schema: Dict[str, Any] property

The config schema for a flavor.

Returns:

Type Description
Dict[str, Any]

The config schema.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[StackComponent] abstractmethod property

Implementation class for this flavor.

Returns:

Type Description
Type[StackComponent]

The implementation class for this flavor.

logo_url: Optional[str] property

A url to represent the flavor in the dashboard.

Returns:

Type Description
Optional[str]

The flavor logo.

name: str abstractmethod property

The flavor name.

Returns:

Type Description
str

The flavor name.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

type: StackComponentType abstractmethod property

The stack component type.

Returns:

Type Description
StackComponentType

The stack component type.

Functions
from_model(flavor_model: FlavorResponse) -> Flavor classmethod

Loads a flavor from a model.

Parameters:

Name Type Description Default
flavor_model FlavorResponse

The model to load from.

required

Raises:

Type Description
CustomFlavorImportError

If the custom flavor can't be imported.

ImportError

If the flavor can't be imported.

Returns:

Type Description
Flavor

The loaded flavor.

Source code in src/zenml/stack/flavor.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
@classmethod
def from_model(cls, flavor_model: FlavorResponse) -> "Flavor":
    """Loads a flavor from a model.

    Args:
        flavor_model: The model to load from.

    Raises:
        CustomFlavorImportError: If the custom flavor can't be imported.
        ImportError: If the flavor can't be imported.

    Returns:
        The loaded flavor.
    """
    try:
        flavor = source_utils.load(flavor_model.source)()
    except (ModuleNotFoundError, ImportError, NotImplementedError) as err:
        if flavor_model.is_custom:
            flavor_module, _ = flavor_model.source.rsplit(".", maxsplit=1)
            expected_file_path = os.path.join(
                source_utils.get_source_root(),
                flavor_module.replace(".", os.path.sep),
            )
            raise CustomFlavorImportError(
                f"Couldn't import custom flavor {flavor_model.name}: "
                f"{err}. Make sure the custom flavor class "
                f"`{flavor_model.source}` is importable. If it is part of "
                "a library, make sure it is installed. If "
                "it is a local code file, make sure it exists at "
                f"`{expected_file_path}.py`."
            )
        else:
            raise ImportError(
                f"Couldn't import flavor {flavor_model.name}: {err}"
            )
    return cast(Flavor, flavor)
generate_default_docs_url() -> str

Generate the doc urls for all inbuilt and integration flavors.

Note that this method is not going to be useful for custom flavors, which do not have any docs in the main zenml docs.

Returns:

Type Description
str

The complete url to the zenml documentation

Source code in src/zenml/stack/flavor.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def generate_default_docs_url(self) -> str:
    """Generate the doc urls for all inbuilt and integration flavors.

    Note that this method is not going to be useful for custom flavors,
    which do not have any docs in the main zenml docs.

    Returns:
        The complete url to the zenml documentation
    """
    from zenml import __version__

    component_type = self.type.plural.replace("_", "-")
    name = self.name.replace("_", "-")

    try:
        is_latest = is_latest_zenml_version()
    except RuntimeError:
        # We assume in error cases that we are on the latest version
        is_latest = True

    if is_latest:
        base = "https://docs.zenml.io"
    else:
        base = f"https://zenml-io.gitbook.io/zenml-legacy-documentation/v/{__version__}"
    return f"{base}/stack-components/{component_type}/{name}"
generate_default_sdk_docs_url() -> str

Generate SDK docs url for a flavor.

Returns:

Type Description
str

The complete url to the zenml SDK docs

Source code in src/zenml/stack/flavor.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def generate_default_sdk_docs_url(self) -> str:
    """Generate SDK docs url for a flavor.

    Returns:
        The complete url to the zenml SDK docs
    """
    from zenml import __version__

    base = f"https://sdkdocs.zenml.io/{__version__}"

    component_type = self.type.plural

    if "zenml.integrations" in self.__module__:
        # Get integration name out of module path which will look something
        #  like this "zenml.integrations.<integration>....
        integration = self.__module__.split(
            "zenml.integrations.", maxsplit=1
        )[1].split(".")[0]

        return (
            f"{base}/integration_code_docs"
            f"/integrations-{integration}/#{self.__module__}"
        )

    else:
        return (
            f"{base}/core_code_docs/core-{component_type}/"
            f"#{self.__module__}"
        )
to_model(integration: Optional[str] = None, is_custom: bool = True) -> FlavorRequest

Converts a flavor to a model.

Parameters:

Name Type Description Default
integration Optional[str]

The integration to use for the model.

None
is_custom bool

Whether the flavor is a custom flavor.

True

Returns:

Type Description
FlavorRequest

The model.

Source code in src/zenml/stack/flavor.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def to_model(
    self,
    integration: Optional[str] = None,
    is_custom: bool = True,
) -> FlavorRequest:
    """Converts a flavor to a model.

    Args:
        integration: The integration to use for the model.
        is_custom: Whether the flavor is a custom flavor.

    Returns:
        The model.
    """
    connector_requirements = self.service_connector_requirements
    connector_type = (
        connector_requirements.connector_type
        if connector_requirements
        else None
    )
    resource_type = (
        connector_requirements.resource_type
        if connector_requirements
        else None
    )
    resource_id_attr = (
        connector_requirements.resource_id_attr
        if connector_requirements
        else None
    )

    model = FlavorRequest(
        name=self.name,
        type=self.type,
        source=source_utils.resolve(self.__class__).import_path,
        config_schema=self.config_schema,
        connector_type=connector_type,
        connector_resource_type=resource_type,
        connector_resource_id_attr=resource_id_attr,
        integration=integration,
        logo_url=self.logo_url,
        docs_url=self.docs_url,
        sdk_docs_url=self.sdk_docs_url,
        is_custom=is_custom,
    )
    return model

HyperAIIntegration

Bases: Integration

Definition of HyperAI integration for ZenML.

Functions
activate() -> None classmethod

Activates the integration.

Source code in src/zenml/integrations/hyperai/__init__.py
34
35
36
37
@classmethod
def activate(cls) -> None:
    """Activates the integration."""
    from zenml.integrations.hyperai import service_connectors  # noqa
flavors() -> List[Type[Flavor]] classmethod

Declare the stack component flavors for the HyperAI integration.

Returns:

Type Description
List[Type[Flavor]]

List of stack component flavors for this integration.

Source code in src/zenml/integrations/hyperai/__init__.py
39
40
41
42
43
44
45
46
47
48
49
50
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the HyperAI integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.hyperai.flavors import (
        HyperAIOrchestratorFlavor
    )

    return [HyperAIOrchestratorFlavor]

Integration

Base class for integration in ZenML.

Functions
activate() -> None classmethod

Abstract method to activate the integration.

Source code in src/zenml/integrations/integration.py
175
176
177
@classmethod
def activate(cls) -> None:
    """Abstract method to activate the integration."""
check_installation() -> bool classmethod

Method to check whether the required packages are installed.

Returns:

Type Description
bool

True if all required packages are installed, False otherwise.

Source code in src/zenml/integrations/integration.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@classmethod
def check_installation(cls) -> bool:
    """Method to check whether the required packages are installed.

    Returns:
        True if all required packages are installed, False otherwise.
    """
    for r in cls.get_requirements():
        try:
            # First check if the base package is installed
            dist = pkg_resources.get_distribution(r)

            # Next, check if the dependencies (including extras) are
            # installed
            deps: List[Requirement] = []

            _, extras = parse_requirement(r)
            if extras:
                extra_list = extras[1:-1].split(",")
                for extra in extra_list:
                    try:
                        requirements = dist.requires(extras=[extra])  # type: ignore[arg-type]
                    except pkg_resources.UnknownExtra as e:
                        logger.debug(f"Unknown extra: {str(e)}")
                        return False
                    deps.extend(requirements)
            else:
                deps = dist.requires()

            for ri in deps:
                try:
                    # Remove the "extra == ..." part from the requirement string
                    cleaned_req = re.sub(
                        r"; extra == \"\w+\"", "", str(ri)
                    )
                    pkg_resources.get_distribution(cleaned_req)
                except pkg_resources.DistributionNotFound as e:
                    logger.debug(
                        f"Unable to find required dependency "
                        f"'{e.req}' for requirement '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False
                except pkg_resources.VersionConflict as e:
                    logger.debug(
                        f"Package version '{e.dist}' does not match "
                        f"version '{e.req}' required by '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False

        except pkg_resources.DistributionNotFound as e:
            logger.debug(
                f"Unable to find required package '{e.req}' for "
                f"integration {cls.NAME}."
            )
            return False
        except pkg_resources.VersionConflict as e:
            logger.debug(
                f"Package version '{e.dist}' does not match version "
                f"'{e.req}' necessary for integration {cls.NAME}."
            )
            return False

    logger.debug(
        f"Integration {cls.NAME} is installed correctly with "
        f"requirements {cls.get_requirements()}."
    )
    return True
flavors() -> List[Type[Flavor]] classmethod

Abstract method to declare new stack component flavors.

Returns:

Type Description
List[Type[Flavor]]

A list of new stack component flavors.

Source code in src/zenml/integrations/integration.py
179
180
181
182
183
184
185
186
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Abstract method to declare new stack component flavors.

    Returns:
        A list of new stack component flavors.
    """
    return []
get_requirements(target_os: Optional[str] = None, python_version: Optional[str] = None) -> List[str] classmethod

Method to get the requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None
python_version Optional[str]

The Python version to use for the requirements.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@classmethod
def get_requirements(
    cls,
    target_os: Optional[str] = None,
    python_version: Optional[str] = None,
) -> List[str]:
    """Method to get the requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.
        python_version: The Python version to use for the requirements.

    Returns:
        A list of requirements.
    """
    return cls.REQUIREMENTS
get_uninstall_requirements(target_os: Optional[str] = None) -> List[str] classmethod

Method to get the uninstall requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@classmethod
def get_uninstall_requirements(
    cls, target_os: Optional[str] = None
) -> List[str]:
    """Method to get the uninstall requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.

    Returns:
        A list of requirements.
    """
    ret = []
    for each in cls.get_requirements(target_os=target_os):
        is_ignored = False
        for ignored in cls.REQUIREMENTS_IGNORED_ON_UNINSTALL:
            if each.startswith(ignored):
                is_ignored = True
                break
        if not is_ignored:
            ret.append(each)
    return ret
plugin_flavors() -> List[Type[BasePluginFlavor]] classmethod

Abstract method to declare new plugin flavors.

Returns:

Type Description
List[Type[BasePluginFlavor]]

A list of new plugin flavors.

Source code in src/zenml/integrations/integration.py
188
189
190
191
192
193
194
195
@classmethod
def plugin_flavors(cls) -> List[Type["BasePluginFlavor"]]:
    """Abstract method to declare new plugin flavors.

    Returns:
        A list of new plugin flavors.
    """
    return []

Modules

flavors

HyperAI integration flavors.

Classes
HyperAIOrchestratorFlavor

Bases: BaseOrchestratorFlavor

Flavor for the HyperAI orchestrator.

Attributes
config_class: Type[BaseOrchestratorConfig] property

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[BaseOrchestratorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[HyperAIOrchestrator] property

Implementation class for this flavor.

Returns:

Type Description
Type[HyperAIOrchestrator]

Implementation class for this flavor.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the orchestrator flavor.

Returns:

Type Description
str

Name of the orchestrator flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

Modules
hyperai_orchestrator_flavor

Implementation of the ZenML HyperAI orchestrator.

Classes
HyperAIOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseOrchestratorConfig, HyperAIOrchestratorSettings

Configuration for the HyperAI orchestrator.

Attributes:

Name Type Description
container_registry_autologin bool

If True, the orchestrator will attempt to automatically log in to the container registry specified in the stack configuration on the HyperAI instance. This is useful if the container registry requires authentication and the HyperAI instance has not been manually logged in to the container registry. Defaults to False.

automatic_cleanup_pipeline_files bool

If True, the orchestrator will automatically clean up old pipeline files that are on the HyperAI instance. Pipeline files will be cleaned up if they are 7 days old or older. Defaults to True.

gpu_enabled_in_container bool

If True, the orchestrator will enable GPU support in the Docker container that runs on the HyperAI instance. Defaults to True.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_remote: bool property

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

is_schedulable: bool property

Whether the orchestrator is schedulable or not.

Returns:

Type Description
bool

Whether the orchestrator is schedulable or not.

HyperAIOrchestratorFlavor

Bases: BaseOrchestratorFlavor

Flavor for the HyperAI orchestrator.

Attributes
config_class: Type[BaseOrchestratorConfig] property

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[BaseOrchestratorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[HyperAIOrchestrator] property

Implementation class for this flavor.

Returns:

Type Description
Type[HyperAIOrchestrator]

Implementation class for this flavor.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the orchestrator flavor.

Returns:

Type Description
str

Name of the orchestrator flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

HyperAIOrchestratorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

HyperAI orchestrator settings.

Attributes:

Name Type Description
mounts_from_to Dict[str, str]

A dictionary mapping from paths on the HyperAI instance to paths within the Docker container. This allows users to mount directories from the HyperAI instance into the Docker container that runs on it.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
Functions

orchestrators

HyperAI orchestrator.

Classes
HyperAIOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: ContainerizedOrchestrator

Orchestrator responsible for running pipelines on HyperAI instances.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: HyperAIOrchestratorConfig property

Returns the HyperAIOrchestratorConfig config.

Returns:

Type Description
HyperAIOrchestratorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property

Settings class for the HyperAI orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property

Ensures there is an image builder in the stack.

Returns:

Type Description
Optional[StackValidator]

A StackValidator instance.

Functions
get_orchestrator_run_id() -> str

Returns the active orchestrator run id.

Raises:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in src/zenml/integrations/hyperai/orchestrators/hyperai_orchestrator.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    try:
        return os.environ[ENV_ZENML_HYPERAI_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_HYPERAI_RUN_ID}."
        )
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Any

Sequentially runs all pipeline steps in Docker containers.

Assumes that: - A HyperAI (hyperai.ai) instance is running on the configured IP address. - The HyperAI instance has been configured to allow SSH connections from the machine running the pipeline. - Docker and Docker Compose are installed on the HyperAI instance. - A key pair has been generated and the public key has been added to the HyperAI instance's authorized_keys file. - The private key is available in a HyperAI service connector linked to this orchestrator.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment.

None

Raises:

Type Description
RuntimeError

If a step fails.

Source code in src/zenml/integrations/hyperai/orchestrators/hyperai_orchestrator.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Any:
    """Sequentially runs all pipeline steps in Docker containers.

    Assumes that:
    - A HyperAI (hyperai.ai) instance is running on the configured IP address.
    - The HyperAI instance has been configured to allow SSH connections from the
        machine running the pipeline.
    - Docker and Docker Compose are installed on the HyperAI instance.
    - A key pair has been generated and the public key has been added to the
        HyperAI instance's `authorized_keys` file.
    - The private key is available in a HyperAI service connector linked to this
        orchestrator.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment.
        placeholder_run: An optional placeholder run for the deployment.

    Raises:
        RuntimeError: If a step fails.
    """
    from zenml.integrations.hyperai.service_connectors.hyperai_service_connector import (
        HyperAIServiceConnector,
    )

    # Basic Docker Compose definition
    compose_definition: Dict[str, Any] = {"version": "3", "services": {}}

    # Get deployment id
    deployment_id = deployment.id

    # Set environment
    os.environ[ENV_ZENML_HYPERAI_RUN_ID] = str(deployment_id)
    environment[ENV_ZENML_HYPERAI_RUN_ID] = str(deployment_id)

    # Add each step as a service to the Docker Compose definition
    logger.info("Preparing pipeline steps for deployment.")
    for step_name, step in deployment.step_configurations.items():
        # Get image
        image = self.get_image(deployment=deployment, step_name=step_name)

        # Get settings
        step_settings = cast(
            HyperAIOrchestratorSettings, self.get_settings(step)
        )

        # Define container name as combination between deployment id and step name
        container_name = f"{deployment_id}-{step_name}"

        # Make Compose service definition for step
        compose_definition["services"][container_name] = {
            "image": image,
            "container_name": container_name,
            "network_mode": "host",
            "entrypoint": StepEntrypointConfiguration.get_entrypoint_command(),
            "command": StepEntrypointConfiguration.get_entrypoint_arguments(
                step_name=step_name, deployment_id=deployment.id
            ),
            "volumes": [
                "{}:{}".format(
                    self._validate_mount_path(mount_from),
                    self._validate_mount_path(mount_to),
                )
                for mount_from, mount_to in step_settings.mounts_from_to.items()
            ],
        }

        # Depending on GPU setting, add GPU support to service definition
        if self.config.gpu_enabled_in_container:
            compose_definition["services"][container_name]["deploy"] = {
                "resources": {
                    "reservations": {
                        "devices": [
                            {"driver": "nvidia", "capabilities": ["gpu"]}
                        ]
                    }
                }
            }

        # Depending on whether it is a scheduled or a realtime pipeline, add
        # potential .env file to service definition for deployment ID override.
        if deployment.schedule:
            # drop ZENML_HYPERAI_ORCHESTRATOR_RUN_ID from environment but only if it is set
            if ENV_ZENML_HYPERAI_RUN_ID in environment:
                del environment[ENV_ZENML_HYPERAI_RUN_ID]
            compose_definition["services"][container_name]["env_file"] = [
                ".env"
            ]

        compose_definition["services"][container_name]["environment"] = (
            environment
        )

        # Add dependency on upstream steps if applicable
        upstream_steps = step.spec.upstream_steps

        if len(upstream_steps) > 0:
            compose_definition["services"][container_name][
                "depends_on"
            ] = {}

            for upstream_step_name in upstream_steps:
                upstream_container_name = (
                    f"{deployment_id}-{upstream_step_name}"
                )
                compose_definition["services"][container_name][
                    "depends_on"
                ].update(
                    {
                        upstream_container_name: {
                            "condition": "service_completed_successfully"
                        }
                    }
                )

    # Convert into yaml
    logger.info("Finalizing Docker Compose definition.")
    compose_definition_yaml: str = yaml.dump(compose_definition)

    # Connect to configured HyperAI instance
    logger.info(
        "Connecting to HyperAI instance and placing Docker Compose file."
    )
    paramiko_client: paramiko.SSHClient
    if connector := self.get_connector():
        paramiko_client = connector.connect()
        if paramiko_client is None:
            raise RuntimeError(
                "Expected to receive a `paramiko.SSHClient` object from the "
                "linked connector, but got `None`. This likely originates from "
                "a misconfigured service connector, typically caused by a wrong "
                "SSH key type being selected. Please check your "
                "`hyperai_orchestrator` configuration and make sure that the "
                "`ssh_key_type` of its connected service connector is set to the "
                "correct value."
            )
        elif not isinstance(paramiko_client, paramiko.SSHClient):
            raise RuntimeError(
                f"Expected to receive a `paramiko.SSHClient` object from the "
                f"linked connector, but got type `{type(paramiko_client)}`."
            )
    else:
        raise RuntimeError(
            "You must link a HyperAI service connector to the orchestrator."
        )

    # Get container registry autologin setting
    if self.config.container_registry_autologin:
        logger.info(
            "Attempting to automatically log in to container registry used by stack."
        )

        # Select stack container registry
        container_registry = stack.container_registry

        # Raise error if no container registry is found
        if not container_registry:
            raise RuntimeError(
                "Unable to find container registry in stack."
            )

        # Get container registry credentials from its config
        credentials = container_registry.credentials
        if credentials is None:
            raise RuntimeError(
                "The container registry in the active stack has no "
                "credentials or service connector configured, but the "
                "HyperAI orchestrator is set to autologin to the container "
                "registry. Please configure the container registry with "
                "credentials or turn off the `container_registry_autologin` "
                "setting in the HyperAI orchestrator configuration."
            )

        container_registry_url = container_registry.config.uri
        (
            container_registry_username,
            container_registry_password,
        ) = credentials

        # Escape inputs
        container_registry_username = self._escape_shell_command(
            container_registry_username
        )
        container_registry_url = self._escape_shell_command(
            container_registry_url
        )

        # Log in to container registry using --password-stdin
        stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
            f"docker login -u {container_registry_username} "
            f"--password-stdin {container_registry_url}"
        )
        # Send the password to stdin
        stdin.channel.send(
            f"{container_registry_password}\n".encode("utf-8")
        )
        stdin.channel.shutdown_write()

        # Log stdout
        for line in stdout.readlines():
            logger.info(line)

    # Get username from connector
    assert isinstance(connector, HyperAIServiceConnector)
    username = connector.config.username

    # Set up pipeline-runs directory if it doesn't exist
    nonscheduled_directory_name = self._escape_shell_command(
        f"/home/{username}/pipeline-runs"
    )
    directory_name = (
        nonscheduled_directory_name
        if not deployment.schedule
        else self._escape_shell_command(
            f"/home/{username}/scheduled-pipeline-runs"
        )
    )
    stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
        f"mkdir -p {directory_name}"
    )

    # Get pipeline run id and create directory for it
    orchestrator_run_id = self.get_orchestrator_run_id()
    directory_name = self._escape_shell_command(
        f"{directory_name}/{orchestrator_run_id}"
    )
    stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
        f"mkdir -p {directory_name}"
    )

    # Remove all folders from nonscheduled pipelines if they are 7 days old or older
    if self.config.automatic_cleanup_pipeline_files:
        logger.info(
            "Cleaning up old pipeline files on HyperAI instance. This may take a while."
        )
        stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
            f"find {nonscheduled_directory_name} -type d -ctime +7 -exec rm -rf {{}} +"
        )

    # Create temporary file and write Docker Compose file to it
    with tempfile.NamedTemporaryFile(mode="w", delete=True) as f:
        # Write Docker Compose file to temporary file
        with f.file as f_:
            f_.write(compose_definition_yaml)

        # Scp Docker Compose file to HyperAI instance
        self._scp_to_hyperai_instance(
            paramiko_client,
            f,
            directory_name,
            file_name="docker-compose.yml",
            description="Docker Compose file",
        )

    # Create temporary file and write script to it
    with tempfile.NamedTemporaryFile(mode="w", delete=True) as f:
        # Define bash line and command line
        bash_line = "#!/bin/bash\n"
        command_line = f'cd {directory_name} && echo {ENV_ZENML_HYPERAI_RUN_ID}="{deployment_id}_$(date +\%s)" > .env && docker compose up -d'

        # Write script to temporary file
        with f.file as f_:
            f_.write(bash_line)
            f_.write(command_line)

        # Scp script to HyperAI instance
        self._scp_to_hyperai_instance(
            paramiko_client,
            f,
            directory_name,
            file_name="run_pipeline.sh",
            description="startup script",
        )

    # Run or schedule Docker Compose file depending on settings
    if not deployment.schedule:
        logger.info(
            "Starting ZenML pipeline on HyperAI instance. Depending on the size of your container image, this may take a while..."
        )
        stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
            f"cd {directory_name} && docker compose up -d"
        )

        # Log errors in case of failure
        for line in stderr.readlines():
            logger.info(line)
    elif deployment.schedule and deployment.schedule.cron_expression:
        # Get cron expression for scheduled pipeline
        cron_expression = deployment.schedule.cron_expression
        if not cron_expression:
            raise RuntimeError(
                "A cron expression is required for scheduled pipelines."
            )
        expected_cron_pattern = r"^(?:(?:[0-9]|[1-5][0-9]|60)(?:,(?:[0-9]|[1-5][0-9]|60))*|[*](?:\/[1-9][0-9]*)?)(?:[ \t]+(?:(?:[0-9]|[0-5][0-9]|60)(?:,(?:[0-9]|[0-5][0-9]|60))*|[*](?:\/[1-9][0-9]*)?)){4}$"
        if not re.match(expected_cron_pattern, cron_expression):
            raise RuntimeError(
                f"The cron expression '{cron_expression}' is not in a valid format."
            )

        # Log about scheduling
        logger.info(f"Requested cron expression: {cron_expression}")
        logger.info("Scheduling ZenML pipeline on HyperAI instance...")

        # Create cron job for scheduled pipeline on HyperAI instance
        stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
            f"(crontab -l ; echo '{cron_expression} bash {directory_name}/run_pipeline.sh') | crontab -"
        )

        logger.info(
            f"Pipeline scheduled successfully in crontab with cron expression: {cron_expression}"
        )
    elif deployment.schedule and deployment.schedule.run_once_start_time:
        # Get start time for scheduled pipeline
        start_time = deployment.schedule.run_once_start_time

        # Log about scheduling
        logger.info(f"Requested start time: {start_time}")
        logger.info("Scheduling ZenML pipeline on HyperAI instance...")

        # Check if `at` is installed on HyperAI instance
        stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
            "which at"
        )
        if not stdout.readlines():
            raise RuntimeError(
                "The `at` command is not installed on the HyperAI instance. Please install it to use start times for scheduled pipelines."
            )

        # Convert start time into YYYYMMDDHHMM.SS format
        start_time_str = start_time.strftime("%Y%m%d%H%M.%S")

        # Create cron job for scheduled pipeline on HyperAI instance
        stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
            f"echo 'bash {directory_name}/run_pipeline.sh' | at -t {start_time_str}"
        )

        logger.info(
            f"Pipeline scheduled successfully to run once at: {start_time}"
        )
    else:
        raise RuntimeError(
            "A cron expression or start time is required for scheduled pipelines."
        )
Modules
hyperai_orchestrator

Implementation of the ZenML HyperAI orchestrator.

Classes
HyperAIOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: ContainerizedOrchestrator

Orchestrator responsible for running pipelines on HyperAI instances.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: HyperAIOrchestratorConfig property

Returns the HyperAIOrchestratorConfig config.

Returns:

Type Description
HyperAIOrchestratorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property

Settings class for the HyperAI orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property

Ensures there is an image builder in the stack.

Returns:

Type Description
Optional[StackValidator]

A StackValidator instance.

Functions
get_orchestrator_run_id() -> str

Returns the active orchestrator run id.

Raises:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in src/zenml/integrations/hyperai/orchestrators/hyperai_orchestrator.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    try:
        return os.environ[ENV_ZENML_HYPERAI_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_HYPERAI_RUN_ID}."
        )
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Any

Sequentially runs all pipeline steps in Docker containers.

Assumes that: - A HyperAI (hyperai.ai) instance is running on the configured IP address. - The HyperAI instance has been configured to allow SSH connections from the machine running the pipeline. - Docker and Docker Compose are installed on the HyperAI instance. - A key pair has been generated and the public key has been added to the HyperAI instance's authorized_keys file. - The private key is available in a HyperAI service connector linked to this orchestrator.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment.

None

Raises:

Type Description
RuntimeError

If a step fails.

Source code in src/zenml/integrations/hyperai/orchestrators/hyperai_orchestrator.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Any:
    """Sequentially runs all pipeline steps in Docker containers.

    Assumes that:
    - A HyperAI (hyperai.ai) instance is running on the configured IP address.
    - The HyperAI instance has been configured to allow SSH connections from the
        machine running the pipeline.
    - Docker and Docker Compose are installed on the HyperAI instance.
    - A key pair has been generated and the public key has been added to the
        HyperAI instance's `authorized_keys` file.
    - The private key is available in a HyperAI service connector linked to this
        orchestrator.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment.
        placeholder_run: An optional placeholder run for the deployment.

    Raises:
        RuntimeError: If a step fails.
    """
    from zenml.integrations.hyperai.service_connectors.hyperai_service_connector import (
        HyperAIServiceConnector,
    )

    # Basic Docker Compose definition
    compose_definition: Dict[str, Any] = {"version": "3", "services": {}}

    # Get deployment id
    deployment_id = deployment.id

    # Set environment
    os.environ[ENV_ZENML_HYPERAI_RUN_ID] = str(deployment_id)
    environment[ENV_ZENML_HYPERAI_RUN_ID] = str(deployment_id)

    # Add each step as a service to the Docker Compose definition
    logger.info("Preparing pipeline steps for deployment.")
    for step_name, step in deployment.step_configurations.items():
        # Get image
        image = self.get_image(deployment=deployment, step_name=step_name)

        # Get settings
        step_settings = cast(
            HyperAIOrchestratorSettings, self.get_settings(step)
        )

        # Define container name as combination between deployment id and step name
        container_name = f"{deployment_id}-{step_name}"

        # Make Compose service definition for step
        compose_definition["services"][container_name] = {
            "image": image,
            "container_name": container_name,
            "network_mode": "host",
            "entrypoint": StepEntrypointConfiguration.get_entrypoint_command(),
            "command": StepEntrypointConfiguration.get_entrypoint_arguments(
                step_name=step_name, deployment_id=deployment.id
            ),
            "volumes": [
                "{}:{}".format(
                    self._validate_mount_path(mount_from),
                    self._validate_mount_path(mount_to),
                )
                for mount_from, mount_to in step_settings.mounts_from_to.items()
            ],
        }

        # Depending on GPU setting, add GPU support to service definition
        if self.config.gpu_enabled_in_container:
            compose_definition["services"][container_name]["deploy"] = {
                "resources": {
                    "reservations": {
                        "devices": [
                            {"driver": "nvidia", "capabilities": ["gpu"]}
                        ]
                    }
                }
            }

        # Depending on whether it is a scheduled or a realtime pipeline, add
        # potential .env file to service definition for deployment ID override.
        if deployment.schedule:
            # drop ZENML_HYPERAI_ORCHESTRATOR_RUN_ID from environment but only if it is set
            if ENV_ZENML_HYPERAI_RUN_ID in environment:
                del environment[ENV_ZENML_HYPERAI_RUN_ID]
            compose_definition["services"][container_name]["env_file"] = [
                ".env"
            ]

        compose_definition["services"][container_name]["environment"] = (
            environment
        )

        # Add dependency on upstream steps if applicable
        upstream_steps = step.spec.upstream_steps

        if len(upstream_steps) > 0:
            compose_definition["services"][container_name][
                "depends_on"
            ] = {}

            for upstream_step_name in upstream_steps:
                upstream_container_name = (
                    f"{deployment_id}-{upstream_step_name}"
                )
                compose_definition["services"][container_name][
                    "depends_on"
                ].update(
                    {
                        upstream_container_name: {
                            "condition": "service_completed_successfully"
                        }
                    }
                )

    # Convert into yaml
    logger.info("Finalizing Docker Compose definition.")
    compose_definition_yaml: str = yaml.dump(compose_definition)

    # Connect to configured HyperAI instance
    logger.info(
        "Connecting to HyperAI instance and placing Docker Compose file."
    )
    paramiko_client: paramiko.SSHClient
    if connector := self.get_connector():
        paramiko_client = connector.connect()
        if paramiko_client is None:
            raise RuntimeError(
                "Expected to receive a `paramiko.SSHClient` object from the "
                "linked connector, but got `None`. This likely originates from "
                "a misconfigured service connector, typically caused by a wrong "
                "SSH key type being selected. Please check your "
                "`hyperai_orchestrator` configuration and make sure that the "
                "`ssh_key_type` of its connected service connector is set to the "
                "correct value."
            )
        elif not isinstance(paramiko_client, paramiko.SSHClient):
            raise RuntimeError(
                f"Expected to receive a `paramiko.SSHClient` object from the "
                f"linked connector, but got type `{type(paramiko_client)}`."
            )
    else:
        raise RuntimeError(
            "You must link a HyperAI service connector to the orchestrator."
        )

    # Get container registry autologin setting
    if self.config.container_registry_autologin:
        logger.info(
            "Attempting to automatically log in to container registry used by stack."
        )

        # Select stack container registry
        container_registry = stack.container_registry

        # Raise error if no container registry is found
        if not container_registry:
            raise RuntimeError(
                "Unable to find container registry in stack."
            )

        # Get container registry credentials from its config
        credentials = container_registry.credentials
        if credentials is None:
            raise RuntimeError(
                "The container registry in the active stack has no "
                "credentials or service connector configured, but the "
                "HyperAI orchestrator is set to autologin to the container "
                "registry. Please configure the container registry with "
                "credentials or turn off the `container_registry_autologin` "
                "setting in the HyperAI orchestrator configuration."
            )

        container_registry_url = container_registry.config.uri
        (
            container_registry_username,
            container_registry_password,
        ) = credentials

        # Escape inputs
        container_registry_username = self._escape_shell_command(
            container_registry_username
        )
        container_registry_url = self._escape_shell_command(
            container_registry_url
        )

        # Log in to container registry using --password-stdin
        stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
            f"docker login -u {container_registry_username} "
            f"--password-stdin {container_registry_url}"
        )
        # Send the password to stdin
        stdin.channel.send(
            f"{container_registry_password}\n".encode("utf-8")
        )
        stdin.channel.shutdown_write()

        # Log stdout
        for line in stdout.readlines():
            logger.info(line)

    # Get username from connector
    assert isinstance(connector, HyperAIServiceConnector)
    username = connector.config.username

    # Set up pipeline-runs directory if it doesn't exist
    nonscheduled_directory_name = self._escape_shell_command(
        f"/home/{username}/pipeline-runs"
    )
    directory_name = (
        nonscheduled_directory_name
        if not deployment.schedule
        else self._escape_shell_command(
            f"/home/{username}/scheduled-pipeline-runs"
        )
    )
    stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
        f"mkdir -p {directory_name}"
    )

    # Get pipeline run id and create directory for it
    orchestrator_run_id = self.get_orchestrator_run_id()
    directory_name = self._escape_shell_command(
        f"{directory_name}/{orchestrator_run_id}"
    )
    stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
        f"mkdir -p {directory_name}"
    )

    # Remove all folders from nonscheduled pipelines if they are 7 days old or older
    if self.config.automatic_cleanup_pipeline_files:
        logger.info(
            "Cleaning up old pipeline files on HyperAI instance. This may take a while."
        )
        stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
            f"find {nonscheduled_directory_name} -type d -ctime +7 -exec rm -rf {{}} +"
        )

    # Create temporary file and write Docker Compose file to it
    with tempfile.NamedTemporaryFile(mode="w", delete=True) as f:
        # Write Docker Compose file to temporary file
        with f.file as f_:
            f_.write(compose_definition_yaml)

        # Scp Docker Compose file to HyperAI instance
        self._scp_to_hyperai_instance(
            paramiko_client,
            f,
            directory_name,
            file_name="docker-compose.yml",
            description="Docker Compose file",
        )

    # Create temporary file and write script to it
    with tempfile.NamedTemporaryFile(mode="w", delete=True) as f:
        # Define bash line and command line
        bash_line = "#!/bin/bash\n"
        command_line = f'cd {directory_name} && echo {ENV_ZENML_HYPERAI_RUN_ID}="{deployment_id}_$(date +\%s)" > .env && docker compose up -d'

        # Write script to temporary file
        with f.file as f_:
            f_.write(bash_line)
            f_.write(command_line)

        # Scp script to HyperAI instance
        self._scp_to_hyperai_instance(
            paramiko_client,
            f,
            directory_name,
            file_name="run_pipeline.sh",
            description="startup script",
        )

    # Run or schedule Docker Compose file depending on settings
    if not deployment.schedule:
        logger.info(
            "Starting ZenML pipeline on HyperAI instance. Depending on the size of your container image, this may take a while..."
        )
        stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
            f"cd {directory_name} && docker compose up -d"
        )

        # Log errors in case of failure
        for line in stderr.readlines():
            logger.info(line)
    elif deployment.schedule and deployment.schedule.cron_expression:
        # Get cron expression for scheduled pipeline
        cron_expression = deployment.schedule.cron_expression
        if not cron_expression:
            raise RuntimeError(
                "A cron expression is required for scheduled pipelines."
            )
        expected_cron_pattern = r"^(?:(?:[0-9]|[1-5][0-9]|60)(?:,(?:[0-9]|[1-5][0-9]|60))*|[*](?:\/[1-9][0-9]*)?)(?:[ \t]+(?:(?:[0-9]|[0-5][0-9]|60)(?:,(?:[0-9]|[0-5][0-9]|60))*|[*](?:\/[1-9][0-9]*)?)){4}$"
        if not re.match(expected_cron_pattern, cron_expression):
            raise RuntimeError(
                f"The cron expression '{cron_expression}' is not in a valid format."
            )

        # Log about scheduling
        logger.info(f"Requested cron expression: {cron_expression}")
        logger.info("Scheduling ZenML pipeline on HyperAI instance...")

        # Create cron job for scheduled pipeline on HyperAI instance
        stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
            f"(crontab -l ; echo '{cron_expression} bash {directory_name}/run_pipeline.sh') | crontab -"
        )

        logger.info(
            f"Pipeline scheduled successfully in crontab with cron expression: {cron_expression}"
        )
    elif deployment.schedule and deployment.schedule.run_once_start_time:
        # Get start time for scheduled pipeline
        start_time = deployment.schedule.run_once_start_time

        # Log about scheduling
        logger.info(f"Requested start time: {start_time}")
        logger.info("Scheduling ZenML pipeline on HyperAI instance...")

        # Check if `at` is installed on HyperAI instance
        stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
            "which at"
        )
        if not stdout.readlines():
            raise RuntimeError(
                "The `at` command is not installed on the HyperAI instance. Please install it to use start times for scheduled pipelines."
            )

        # Convert start time into YYYYMMDDHHMM.SS format
        start_time_str = start_time.strftime("%Y%m%d%H%M.%S")

        # Create cron job for scheduled pipeline on HyperAI instance
        stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
            f"echo 'bash {directory_name}/run_pipeline.sh' | at -t {start_time_str}"
        )

        logger.info(
            f"Pipeline scheduled successfully to run once at: {start_time}"
        )
    else:
        raise RuntimeError(
            "A cron expression or start time is required for scheduled pipelines."
        )
Functions

service_connectors

HyperAI Service Connector.

Classes
HyperAIServiceConnector(**kwargs: Any)

Bases: ServiceConnector

HyperAI service connector.

Source code in src/zenml/service_connectors/service_connector.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def __init__(self, **kwargs: Any) -> None:
    """Initialize a new service connector instance.

    Args:
        kwargs: Additional keyword arguments to pass to the base class
            constructor.
    """
    super().__init__(**kwargs)

    # Convert the resource ID to its canonical form. For resource types
    # that don't support multiple instances:
    # - if a resource ID is not provided, we use the default resource ID for
    # the resource type
    # - if a resource ID is provided, we verify that it matches the default
    # resource ID for the resource type
    if self.resource_type:
        try:
            self.resource_id = self._validate_resource_id(
                self.resource_type, self.resource_id
            )
        except AuthorizationException as e:
            error = (
                f"Authorization error validating resource ID "
                f"{self.resource_id} for resource type "
                f"{self.resource_type}: {e}"
            )
            # Log an exception if debug logging is enabled
            if logger.isEnabledFor(logging.DEBUG):
                logger.exception(error)
            else:
                logger.warning(error)

            self.resource_id = None
Modules
hyperai_service_connector

HyperAI Service Connector.

The HyperAI Service Connector allows authenticating to HyperAI (hyperai.ai) GPU equipped instances.

Classes
HyperAIAuthenticationMethods

Bases: StrEnum

HyperAI Authentication methods.

HyperAIConfiguration

Bases: HyperAICredentials

HyperAI client configuration.

HyperAICredentials

Bases: AuthenticationConfig

HyperAI client authentication credentials.

HyperAIServiceConnector(**kwargs: Any)

Bases: ServiceConnector

HyperAI service connector.

Source code in src/zenml/service_connectors/service_connector.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def __init__(self, **kwargs: Any) -> None:
    """Initialize a new service connector instance.

    Args:
        kwargs: Additional keyword arguments to pass to the base class
            constructor.
    """
    super().__init__(**kwargs)

    # Convert the resource ID to its canonical form. For resource types
    # that don't support multiple instances:
    # - if a resource ID is not provided, we use the default resource ID for
    # the resource type
    # - if a resource ID is provided, we verify that it matches the default
    # resource ID for the resource type
    if self.resource_type:
        try:
            self.resource_id = self._validate_resource_id(
                self.resource_type, self.resource_id
            )
        except AuthorizationException as e:
            error = (
                f"Authorization error validating resource ID "
                f"{self.resource_id} for resource type "
                f"{self.resource_type}: {e}"
            )
            # Log an exception if debug logging is enabled
            if logger.isEnabledFor(logging.DEBUG):
                logger.exception(error)
            else:
                logger.warning(error)

            self.resource_id = None
Functions