Skip to content

Hyperai

zenml.integrations.hyperai special

Initialization of the HyperAI integration.

HyperAIIntegration (Integration)

Definition of HyperAI integration for ZenML.

Source code in zenml/integrations/hyperai/__init__.py
class HyperAIIntegration(Integration):
    """Definition of HyperAI integration for ZenML."""

    NAME = HYPERAI
    REQUIREMENTS = [
        "paramiko>=3.4.0",
    ]

    @classmethod
    def activate(cls) -> None:
        """Activates the integration."""
        from zenml.integrations.hyperai import service_connectors  # noqa

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the HyperAI integration.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.hyperai.flavors import (
            HyperAIOrchestratorFlavor
        )

        return [HyperAIOrchestratorFlavor]

activate() classmethod

Activates the integration.

Source code in zenml/integrations/hyperai/__init__.py
@classmethod
def activate(cls) -> None:
    """Activates the integration."""
    from zenml.integrations.hyperai import service_connectors  # noqa

flavors() classmethod

Declare the stack component flavors for the HyperAI integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/hyperai/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the HyperAI integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.hyperai.flavors import (
        HyperAIOrchestratorFlavor
    )

    return [HyperAIOrchestratorFlavor]

flavors special

HyperAI integration flavors.

hyperai_orchestrator_flavor

Implementation of the ZenML HyperAI orchestrator.

HyperAIOrchestratorConfig (BaseOrchestratorConfig, HyperAIOrchestratorSettings)

Configuration for the HyperAI orchestrator.

Attributes:

Name Type Description
container_registry_autologin bool

If True, the orchestrator will attempt to automatically log in to the container registry specified in the stack configuration on the HyperAI instance. This is useful if the container registry requires authentication and the HyperAI instance has not been manually logged in to the container registry. Defaults to False.

automatic_cleanup_pipeline_files bool

If True, the orchestrator will automatically clean up old pipeline files that are on the HyperAI instance. Pipeline files will be cleaned up if they are 7 days old or older. Defaults to True.

gpu_enabled_in_container bool

If True, the orchestrator will enable GPU support in the Docker container that runs on the HyperAI instance. Defaults to True.

Source code in zenml/integrations/hyperai/flavors/hyperai_orchestrator_flavor.py
class HyperAIOrchestratorConfig(
    BaseOrchestratorConfig, HyperAIOrchestratorSettings
):
    """Configuration for the HyperAI orchestrator.

    Attributes:
        container_registry_autologin: If True, the orchestrator will attempt to
            automatically log in to the container registry specified in the stack
            configuration on the HyperAI instance. This is useful if the container
            registry requires authentication and the HyperAI instance has not been
            manually logged in to the container registry. Defaults to `False`.
        automatic_cleanup_pipeline_files: If True, the orchestrator will
            automatically clean up old pipeline files that are on the HyperAI
            instance. Pipeline files will be cleaned up if they are 7 days old or
            older. Defaults to `True`.
        gpu_enabled_in_container: If True, the orchestrator will enable GPU
            support in the Docker container that runs on the HyperAI instance.
            Defaults to `True`.

    """

    container_registry_autologin: bool = False
    automatic_cleanup_pipeline_files: bool = True
    gpu_enabled_in_container: bool = True

    @property
    def is_remote(self) -> bool:
        """Checks if this stack component is running remotely.

        This designation is used to determine if the stack component can be
        used with a local ZenML database or if it requires a remote ZenML
        server.

        Returns:
            True if this config is for a remote component, False otherwise.
        """
        return True
is_remote: bool property readonly

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

HyperAIOrchestratorFlavor (BaseOrchestratorFlavor)

Flavor for the HyperAI orchestrator.

Source code in zenml/integrations/hyperai/flavors/hyperai_orchestrator_flavor.py
class HyperAIOrchestratorFlavor(BaseOrchestratorFlavor):
    """Flavor for the HyperAI orchestrator."""

    @property
    def name(self) -> str:
        """Name of the orchestrator flavor.

        Returns:
            Name of the orchestrator flavor.
        """
        return "hyperai"

    @property
    def service_connector_requirements(
        self,
    ) -> Optional[ServiceConnectorRequirements]:
        """Service connector resource requirements for service connectors.

        Specifies resource requirements that are used to filter the available
        service connector types that are compatible with this flavor.

        Returns:
            Requirements for compatible service connectors, if a service
            connector is required for this flavor.
        """
        return ServiceConnectorRequirements(
            resource_type=HYPERAI_RESOURCE_TYPE
        )

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/connectors/hyperai/hyperai.png"

    @property
    def config_class(self) -> Type[BaseOrchestratorConfig]:
        """Config class for the base orchestrator flavor.

        Returns:
            The config class.
        """
        return HyperAIOrchestratorConfig

    @property
    def implementation_class(self) -> Type["HyperAIOrchestrator"]:
        """Implementation class for this flavor.

        Returns:
            Implementation class for this flavor.
        """
        from zenml.integrations.hyperai.orchestrators import (
            HyperAIOrchestrator,
        )

        return HyperAIOrchestrator
config_class: Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig] property readonly

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[HyperAIOrchestrator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[HyperAIOrchestrator]

Implementation class for this flavor.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the orchestrator flavor.

Returns:

Type Description
str

Name of the orchestrator flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements] property readonly

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service connector is required for this flavor.

HyperAIOrchestratorSettings (BaseSettings)

HyperAI orchestrator settings.

Attributes:

Name Type Description
mounts_from_to Dict[str, str]

A dictionary mapping from paths on the HyperAI instance to paths within the Docker container. This allows users to mount directories from the HyperAI instance into the Docker container that runs on it.

Source code in zenml/integrations/hyperai/flavors/hyperai_orchestrator_flavor.py
class HyperAIOrchestratorSettings(BaseSettings):
    """HyperAI orchestrator settings.

    Attributes:
        mounts_from_to: A dictionary mapping from paths on the HyperAI instance
            to paths within the Docker container. This allows users to mount
            directories from the HyperAI instance into the Docker container that runs
            on it.
    """

    mounts_from_to: Dict[str, str] = {}

orchestrators special

HyperAI orchestrator.

hyperai_orchestrator

Implementation of the ZenML HyperAI orchestrator.

HyperAIOrchestrator (ContainerizedOrchestrator)

Orchestrator responsible for running pipelines on HyperAI instances.

Source code in zenml/integrations/hyperai/orchestrators/hyperai_orchestrator.py
class HyperAIOrchestrator(ContainerizedOrchestrator):
    """Orchestrator responsible for running pipelines on HyperAI instances."""

    @property
    def config(self) -> HyperAIOrchestratorConfig:
        """Returns the `HyperAIOrchestratorConfig` config.

        Returns:
            The configuration.
        """
        return cast(HyperAIOrchestratorConfig, self._config)

    @property
    def settings_class(self) -> Optional[Type["BaseSettings"]]:
        """Settings class for the HyperAI orchestrator.

        Returns:
            The settings class.
        """
        return HyperAIOrchestratorSettings

    @property
    def validator(self) -> Optional[StackValidator]:
        """Ensures there is an image builder in the stack.

        Returns:
            A `StackValidator` instance.
        """
        return StackValidator(
            required_components={
                StackComponentType.CONTAINER_REGISTRY,
                StackComponentType.IMAGE_BUILDER,
            }
        )

    def get_orchestrator_run_id(self) -> str:
        """Returns the active orchestrator run id.

        Raises:
            RuntimeError: If the environment variable specifying the run id
                is not set.

        Returns:
            The orchestrator run id.
        """
        try:
            return os.environ[ENV_ZENML_HYPERAI_RUN_ID]
        except KeyError:
            raise RuntimeError(
                "Unable to read run id from environment variable "
                f"{ENV_ZENML_HYPERAI_RUN_ID}."
            )

    def _validate_mount_path(self, path: str) -> str:
        """Validates if a given string is in a valid path format.

        Args:
            path: The path to be validated.

        Returns:
            The path in a valid format.

        Raises:
            RuntimeError: If the path is not in a valid format.
        """
        # Define a regular expression pattern to match a valid path format
        pattern = r'^(?:[a-zA-Z]:\\(\\[^\\/:*?"<>|]*)*$|^/([^\0]*)*$)'

        if bool(re.match(pattern, path)):
            return path
        else:
            raise RuntimeError(
                f"Path '{path}' is not in a valid format, so a mount cannot be established."
            )

    def _escape_shell_command(self, command: str) -> str:
        """Escapes a shell command.

        Args:
            command: The command to escape.

        Returns:
            The escaped command.
        """
        return quote(command)

    def _scp_to_hyperai_instance(
        self,
        paramiko_client: paramiko.SSHClient,
        f: IO[str],
        directory_name: str,
        file_name: str,
        description: str,
    ) -> None:
        """Copies a file to a HyperAI instance using SCP.

        Args:
            paramiko_client: The SSH client to use for the SCP transfer.
            f: The file to transfer.
            directory_name: The directory on the HyperAI instance to transfer
                the file to.
            file_name: The name of the file being transferred.
            description: A description of the file being transferred.

        Raises:
            RuntimeError: If the file cannot be written to the HyperAI instance.
        """
        try:
            scp_client = paramiko_client.open_sftp()
            scp_client.put(f.name, f"{directory_name}/{file_name}")
            scp_client.close()
        except FileNotFoundError:
            raise RuntimeError(
                f"Failed to write {description} to HyperAI instance. Does the user have permissions to write?"
            )

    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeploymentResponse",
        stack: "Stack",
        environment: Dict[str, str],
    ) -> Any:
        """Sequentially runs all pipeline steps in Docker containers.

        Assumes that:
        - A HyperAI (hyperai.ai) instance is running on the configured IP address.
        - The HyperAI instance has been configured to allow SSH connections from the
            machine running the pipeline.
        - Docker and Docker Compose are installed on the HyperAI instance.
        - A key pair has been generated and the public key has been added to the
            HyperAI instance's `authorized_keys` file.
        - The private key is available in a HyperAI service connector linked to this
            orchestrator.

        Args:
            deployment: The pipeline deployment to prepare or run.
            stack: The stack the pipeline will run on.
            environment: Environment variables to set in the orchestration
                environment.

        Raises:
            RuntimeError: If a step fails.
        """
        from zenml.integrations.hyperai.service_connectors.hyperai_service_connector import (
            HyperAIServiceConnector,
        )

        # Basic Docker Compose definition
        compose_definition: Dict[str, Any] = {"version": "3", "services": {}}

        # Get deployment id
        deployment_id = deployment.id

        # Set environment
        os.environ[ENV_ZENML_HYPERAI_RUN_ID] = str(deployment_id)
        environment[ENV_ZENML_HYPERAI_RUN_ID] = str(deployment_id)

        # Add each step as a service to the Docker Compose definition
        logger.info("Preparing pipeline steps for deployment.")
        for step_name, step in deployment.step_configurations.items():
            # Get image
            image = self.get_image(deployment=deployment, step_name=step_name)

            # Get settings
            step_settings = cast(
                HyperAIOrchestratorSettings, self.get_settings(step)
            )

            # Define container name as combination between deployment id and step name
            container_name = f"{deployment_id}-{step_name}"

            # Make Compose service definition for step
            compose_definition["services"][container_name] = {
                "image": image,
                "container_name": container_name,
                "network_mode": "host",
                "entrypoint": StepEntrypointConfiguration.get_entrypoint_command(),
                "command": StepEntrypointConfiguration.get_entrypoint_arguments(
                    step_name=step_name, deployment_id=deployment.id
                ),
                "volumes": [
                    "{}:{}".format(
                        self._validate_mount_path(mount_from),
                        self._validate_mount_path(mount_to),
                    )
                    for mount_from, mount_to in step_settings.mounts_from_to.items()
                ],
            }

            # Depending on GPU setting, add GPU support to service definition
            if self.config.gpu_enabled_in_container:
                compose_definition["services"][container_name]["deploy"] = {
                    "resources": {
                        "reservations": {
                            "devices": [
                                {"driver": "nvidia", "capabilities": ["gpu"]}
                            ]
                        }
                    }
                }

            # Depending on whether it is a scheduled or a realtime pipeline, add
            # potential .env file to service definition for deployment ID override.
            if deployment.schedule:
                # drop ZENML_HYPERAI_ORCHESTRATOR_RUN_ID from environment but only if it is set
                if ENV_ZENML_HYPERAI_RUN_ID in environment:
                    del environment[ENV_ZENML_HYPERAI_RUN_ID]
                compose_definition["services"][container_name]["env_file"] = [
                    ".env"
                ]

            compose_definition["services"][container_name]["environment"] = (
                environment
            )

            # Add dependency on upstream steps if applicable
            upstream_steps = step.spec.upstream_steps

            if len(upstream_steps) > 0:
                compose_definition["services"][container_name][
                    "depends_on"
                ] = {}

                for upstream_step_name in upstream_steps:
                    upstream_container_name = (
                        f"{deployment_id}-{upstream_step_name}"
                    )
                    compose_definition["services"][container_name][
                        "depends_on"
                    ].update(
                        {
                            upstream_container_name: {
                                "condition": "service_completed_successfully"
                            }
                        }
                    )

        # Convert into yaml
        logger.info("Finalizing Docker Compose definition.")
        compose_definition_yaml: str = yaml.dump(compose_definition)

        # Connect to configured HyperAI instance
        logger.info(
            "Connecting to HyperAI instance and placing Docker Compose file."
        )
        paramiko_client: paramiko.SSHClient
        if connector := self.get_connector():
            paramiko_client = connector.connect()
            if paramiko_client is None:
                raise RuntimeError(
                    "Expected to receive a `paramiko.SSHClient` object from the "
                    "linked connector, but got `None`. This likely originates from "
                    "a misconfigured service connector, typically caused by a wrong "
                    "SSH key type being selected. Please check your "
                    "`hyperai_orchestrator` configuration and make sure that the "
                    "`ssh_key_type` of its connected service connector is set to the "
                    "correct value."
                )
            elif not isinstance(paramiko_client, paramiko.SSHClient):
                raise RuntimeError(
                    f"Expected to receive a `paramiko.SSHClient` object from the "
                    f"linked connector, but got type `{type(paramiko_client)}`."
                )
        else:
            raise RuntimeError(
                "You must link a HyperAI service connector to the orchestrator."
            )

        # Get container registry autologin setting
        if self.config.container_registry_autologin:
            logger.info(
                "Attempting to automatically log in to container registry used by stack."
            )

            # Select stack container registry
            container_registry = stack.container_registry

            # Raise error if no container registry is found
            if not container_registry:
                raise RuntimeError(
                    "Unable to find container registry in stack."
                )

            # Get container registry credentials from its config
            credentials = container_registry.credentials
            if credentials is None:
                raise RuntimeError(
                    "The container registry in the active stack has no "
                    "credentials or service connector configured, but the "
                    "HyperAI orchestrator is set to autologin to the container "
                    "registry. Please configure the container registry with "
                    "credentials or turn off the `container_registry_autologin` "
                    "setting in the HyperAI orchestrator configuration."
                )

            container_registry_url = container_registry.config.uri
            (
                container_registry_username,
                container_registry_password,
            ) = credentials

            # Escape inputs
            container_registry_username = self._escape_shell_command(
                container_registry_username
            )
            container_registry_url = self._escape_shell_command(
                container_registry_url
            )

            # Log in to container registry using --password-stdin
            stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
                f"docker login -u {container_registry_username} "
                f"--password-stdin {container_registry_url}"
            )
            # Send the password to stdin
            stdin.channel.send(
                f"{container_registry_password}\n".encode("utf-8")
            )
            stdin.channel.shutdown_write()

            # Log stdout
            for line in stdout.readlines():
                logger.info(line)

        # Get username from connector
        assert isinstance(connector, HyperAIServiceConnector)
        username = connector.config.username

        # Set up pipeline-runs directory if it doesn't exist
        nonscheduled_directory_name = self._escape_shell_command(
            f"/home/{username}/pipeline-runs"
        )
        directory_name = (
            nonscheduled_directory_name
            if not deployment.schedule
            else self._escape_shell_command(
                f"/home/{username}/scheduled-pipeline-runs"
            )
        )
        stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
            f"mkdir -p {directory_name}"
        )

        # Get pipeline run id and create directory for it
        orchestrator_run_id = self.get_orchestrator_run_id()
        directory_name = self._escape_shell_command(
            f"{directory_name}/{orchestrator_run_id}"
        )
        stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
            f"mkdir -p {directory_name}"
        )

        # Remove all folders from nonscheduled pipelines if they are 7 days old or older
        if self.config.automatic_cleanup_pipeline_files:
            logger.info(
                "Cleaning up old pipeline files on HyperAI instance. This may take a while."
            )
            stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
                f"find {nonscheduled_directory_name} -type d -ctime +7 -exec rm -rf {{}} +"
            )

        # Create temporary file and write Docker Compose file to it
        with tempfile.NamedTemporaryFile(mode="w", delete=True) as f:
            # Write Docker Compose file to temporary file
            with f.file as f_:
                f_.write(compose_definition_yaml)

            # Scp Docker Compose file to HyperAI instance
            self._scp_to_hyperai_instance(
                paramiko_client,
                f,
                directory_name,
                file_name="docker-compose.yml",
                description="Docker Compose file",
            )

        # Create temporary file and write script to it
        with tempfile.NamedTemporaryFile(mode="w", delete=True) as f:
            # Define bash line and command line
            bash_line = "#!/bin/bash\n"
            command_line = f'cd {directory_name} && echo {ENV_ZENML_HYPERAI_RUN_ID}="{deployment_id}_$(date +\%s)" > .env && docker compose up -d'

            # Write script to temporary file
            with f.file as f_:
                f_.write(bash_line)
                f_.write(command_line)

            # Scp script to HyperAI instance
            self._scp_to_hyperai_instance(
                paramiko_client,
                f,
                directory_name,
                file_name="run_pipeline.sh",
                description="startup script",
            )

        # Run or schedule Docker Compose file depending on settings
        if not deployment.schedule:
            logger.info(
                "Starting ZenML pipeline on HyperAI instance. Depending on the size of your container image, this may take a while..."
            )
            stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
                f"cd {directory_name} && docker compose up -d"
            )

            # Log errors in case of failure
            for line in stderr.readlines():
                logger.info(line)
        elif deployment.schedule and deployment.schedule.cron_expression:
            # Get cron expression for scheduled pipeline
            cron_expression = deployment.schedule.cron_expression
            if not cron_expression:
                raise RuntimeError(
                    "A cron expression is required for scheduled pipelines."
                )
            expected_cron_pattern = r"^(?:(?:[0-9]|[1-5][0-9]|60)(?:,(?:[0-9]|[1-5][0-9]|60))*|[*](?:\/[1-9][0-9]*)?)(?:[ \t]+(?:(?:[0-9]|[0-5][0-9]|60)(?:,(?:[0-9]|[0-5][0-9]|60))*|[*](?:\/[1-9][0-9]*)?)){4}$"
            if not re.match(expected_cron_pattern, cron_expression):
                raise RuntimeError(
                    f"The cron expression '{cron_expression}' is not in a valid format."
                )

            # Log about scheduling
            logger.info(f"Requested cron expression: {cron_expression}")
            logger.info("Scheduling ZenML pipeline on HyperAI instance...")

            # Create cron job for scheduled pipeline on HyperAI instance
            stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
                f"(crontab -l ; echo '{cron_expression} bash {directory_name}/run_pipeline.sh') | crontab -"
            )

            logger.info(
                f"Pipeline scheduled successfully in crontab with cron expression: {cron_expression}"
            )
        elif deployment.schedule and deployment.schedule.run_once_start_time:
            # Get start time for scheduled pipeline
            start_time = deployment.schedule.run_once_start_time

            # Log about scheduling
            logger.info(f"Requested start time: {start_time}")
            logger.info("Scheduling ZenML pipeline on HyperAI instance...")

            # Check if `at` is installed on HyperAI instance
            stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
                "which at"
            )
            if not stdout.readlines():
                raise RuntimeError(
                    "The `at` command is not installed on the HyperAI instance. Please install it to use start times for scheduled pipelines."
                )

            # Convert start time into YYYYMMDDHHMM.SS format
            start_time_str = start_time.strftime("%Y%m%d%H%M.%S")

            # Create cron job for scheduled pipeline on HyperAI instance
            stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
                f"echo 'bash {directory_name}/run_pipeline.sh' | at -t {start_time_str}"
            )

            logger.info(
                f"Pipeline scheduled successfully to run once at: {start_time}"
            )
        else:
            raise RuntimeError(
                "A cron expression or start time is required for scheduled pipelines."
            )
config: HyperAIOrchestratorConfig property readonly

Returns the HyperAIOrchestratorConfig config.

Returns:

Type Description
HyperAIOrchestratorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property readonly

Settings class for the HyperAI orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[zenml.stack.stack_validator.StackValidator] property readonly

Ensures there is an image builder in the stack.

Returns:

Type Description
Optional[zenml.stack.stack_validator.StackValidator]

A StackValidator instance.

get_orchestrator_run_id(self)

Returns the active orchestrator run id.

Exceptions:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in zenml/integrations/hyperai/orchestrators/hyperai_orchestrator.py
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    try:
        return os.environ[ENV_ZENML_HYPERAI_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_HYPERAI_RUN_ID}."
        )
prepare_or_run_pipeline(self, deployment, stack, environment)

Sequentially runs all pipeline steps in Docker containers.

Assumes that: - A HyperAI (hyperai.ai) instance is running on the configured IP address. - The HyperAI instance has been configured to allow SSH connections from the machine running the pipeline. - Docker and Docker Compose are installed on the HyperAI instance. - A key pair has been generated and the public key has been added to the HyperAI instance's authorized_keys file. - The private key is available in a HyperAI service connector linked to this orchestrator.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required

Exceptions:

Type Description
RuntimeError

If a step fails.

Source code in zenml/integrations/hyperai/orchestrators/hyperai_orchestrator.py
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
) -> Any:
    """Sequentially runs all pipeline steps in Docker containers.

    Assumes that:
    - A HyperAI (hyperai.ai) instance is running on the configured IP address.
    - The HyperAI instance has been configured to allow SSH connections from the
        machine running the pipeline.
    - Docker and Docker Compose are installed on the HyperAI instance.
    - A key pair has been generated and the public key has been added to the
        HyperAI instance's `authorized_keys` file.
    - The private key is available in a HyperAI service connector linked to this
        orchestrator.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment.

    Raises:
        RuntimeError: If a step fails.
    """
    from zenml.integrations.hyperai.service_connectors.hyperai_service_connector import (
        HyperAIServiceConnector,
    )

    # Basic Docker Compose definition
    compose_definition: Dict[str, Any] = {"version": "3", "services": {}}

    # Get deployment id
    deployment_id = deployment.id

    # Set environment
    os.environ[ENV_ZENML_HYPERAI_RUN_ID] = str(deployment_id)
    environment[ENV_ZENML_HYPERAI_RUN_ID] = str(deployment_id)

    # Add each step as a service to the Docker Compose definition
    logger.info("Preparing pipeline steps for deployment.")
    for step_name, step in deployment.step_configurations.items():
        # Get image
        image = self.get_image(deployment=deployment, step_name=step_name)

        # Get settings
        step_settings = cast(
            HyperAIOrchestratorSettings, self.get_settings(step)
        )

        # Define container name as combination between deployment id and step name
        container_name = f"{deployment_id}-{step_name}"

        # Make Compose service definition for step
        compose_definition["services"][container_name] = {
            "image": image,
            "container_name": container_name,
            "network_mode": "host",
            "entrypoint": StepEntrypointConfiguration.get_entrypoint_command(),
            "command": StepEntrypointConfiguration.get_entrypoint_arguments(
                step_name=step_name, deployment_id=deployment.id
            ),
            "volumes": [
                "{}:{}".format(
                    self._validate_mount_path(mount_from),
                    self._validate_mount_path(mount_to),
                )
                for mount_from, mount_to in step_settings.mounts_from_to.items()
            ],
        }

        # Depending on GPU setting, add GPU support to service definition
        if self.config.gpu_enabled_in_container:
            compose_definition["services"][container_name]["deploy"] = {
                "resources": {
                    "reservations": {
                        "devices": [
                            {"driver": "nvidia", "capabilities": ["gpu"]}
                        ]
                    }
                }
            }

        # Depending on whether it is a scheduled or a realtime pipeline, add
        # potential .env file to service definition for deployment ID override.
        if deployment.schedule:
            # drop ZENML_HYPERAI_ORCHESTRATOR_RUN_ID from environment but only if it is set
            if ENV_ZENML_HYPERAI_RUN_ID in environment:
                del environment[ENV_ZENML_HYPERAI_RUN_ID]
            compose_definition["services"][container_name]["env_file"] = [
                ".env"
            ]

        compose_definition["services"][container_name]["environment"] = (
            environment
        )

        # Add dependency on upstream steps if applicable
        upstream_steps = step.spec.upstream_steps

        if len(upstream_steps) > 0:
            compose_definition["services"][container_name][
                "depends_on"
            ] = {}

            for upstream_step_name in upstream_steps:
                upstream_container_name = (
                    f"{deployment_id}-{upstream_step_name}"
                )
                compose_definition["services"][container_name][
                    "depends_on"
                ].update(
                    {
                        upstream_container_name: {
                            "condition": "service_completed_successfully"
                        }
                    }
                )

    # Convert into yaml
    logger.info("Finalizing Docker Compose definition.")
    compose_definition_yaml: str = yaml.dump(compose_definition)

    # Connect to configured HyperAI instance
    logger.info(
        "Connecting to HyperAI instance and placing Docker Compose file."
    )
    paramiko_client: paramiko.SSHClient
    if connector := self.get_connector():
        paramiko_client = connector.connect()
        if paramiko_client is None:
            raise RuntimeError(
                "Expected to receive a `paramiko.SSHClient` object from the "
                "linked connector, but got `None`. This likely originates from "
                "a misconfigured service connector, typically caused by a wrong "
                "SSH key type being selected. Please check your "
                "`hyperai_orchestrator` configuration and make sure that the "
                "`ssh_key_type` of its connected service connector is set to the "
                "correct value."
            )
        elif not isinstance(paramiko_client, paramiko.SSHClient):
            raise RuntimeError(
                f"Expected to receive a `paramiko.SSHClient` object from the "
                f"linked connector, but got type `{type(paramiko_client)}`."
            )
    else:
        raise RuntimeError(
            "You must link a HyperAI service connector to the orchestrator."
        )

    # Get container registry autologin setting
    if self.config.container_registry_autologin:
        logger.info(
            "Attempting to automatically log in to container registry used by stack."
        )

        # Select stack container registry
        container_registry = stack.container_registry

        # Raise error if no container registry is found
        if not container_registry:
            raise RuntimeError(
                "Unable to find container registry in stack."
            )

        # Get container registry credentials from its config
        credentials = container_registry.credentials
        if credentials is None:
            raise RuntimeError(
                "The container registry in the active stack has no "
                "credentials or service connector configured, but the "
                "HyperAI orchestrator is set to autologin to the container "
                "registry. Please configure the container registry with "
                "credentials or turn off the `container_registry_autologin` "
                "setting in the HyperAI orchestrator configuration."
            )

        container_registry_url = container_registry.config.uri
        (
            container_registry_username,
            container_registry_password,
        ) = credentials

        # Escape inputs
        container_registry_username = self._escape_shell_command(
            container_registry_username
        )
        container_registry_url = self._escape_shell_command(
            container_registry_url
        )

        # Log in to container registry using --password-stdin
        stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
            f"docker login -u {container_registry_username} "
            f"--password-stdin {container_registry_url}"
        )
        # Send the password to stdin
        stdin.channel.send(
            f"{container_registry_password}\n".encode("utf-8")
        )
        stdin.channel.shutdown_write()

        # Log stdout
        for line in stdout.readlines():
            logger.info(line)

    # Get username from connector
    assert isinstance(connector, HyperAIServiceConnector)
    username = connector.config.username

    # Set up pipeline-runs directory if it doesn't exist
    nonscheduled_directory_name = self._escape_shell_command(
        f"/home/{username}/pipeline-runs"
    )
    directory_name = (
        nonscheduled_directory_name
        if not deployment.schedule
        else self._escape_shell_command(
            f"/home/{username}/scheduled-pipeline-runs"
        )
    )
    stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
        f"mkdir -p {directory_name}"
    )

    # Get pipeline run id and create directory for it
    orchestrator_run_id = self.get_orchestrator_run_id()
    directory_name = self._escape_shell_command(
        f"{directory_name}/{orchestrator_run_id}"
    )
    stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
        f"mkdir -p {directory_name}"
    )

    # Remove all folders from nonscheduled pipelines if they are 7 days old or older
    if self.config.automatic_cleanup_pipeline_files:
        logger.info(
            "Cleaning up old pipeline files on HyperAI instance. This may take a while."
        )
        stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
            f"find {nonscheduled_directory_name} -type d -ctime +7 -exec rm -rf {{}} +"
        )

    # Create temporary file and write Docker Compose file to it
    with tempfile.NamedTemporaryFile(mode="w", delete=True) as f:
        # Write Docker Compose file to temporary file
        with f.file as f_:
            f_.write(compose_definition_yaml)

        # Scp Docker Compose file to HyperAI instance
        self._scp_to_hyperai_instance(
            paramiko_client,
            f,
            directory_name,
            file_name="docker-compose.yml",
            description="Docker Compose file",
        )

    # Create temporary file and write script to it
    with tempfile.NamedTemporaryFile(mode="w", delete=True) as f:
        # Define bash line and command line
        bash_line = "#!/bin/bash\n"
        command_line = f'cd {directory_name} && echo {ENV_ZENML_HYPERAI_RUN_ID}="{deployment_id}_$(date +\%s)" > .env && docker compose up -d'

        # Write script to temporary file
        with f.file as f_:
            f_.write(bash_line)
            f_.write(command_line)

        # Scp script to HyperAI instance
        self._scp_to_hyperai_instance(
            paramiko_client,
            f,
            directory_name,
            file_name="run_pipeline.sh",
            description="startup script",
        )

    # Run or schedule Docker Compose file depending on settings
    if not deployment.schedule:
        logger.info(
            "Starting ZenML pipeline on HyperAI instance. Depending on the size of your container image, this may take a while..."
        )
        stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
            f"cd {directory_name} && docker compose up -d"
        )

        # Log errors in case of failure
        for line in stderr.readlines():
            logger.info(line)
    elif deployment.schedule and deployment.schedule.cron_expression:
        # Get cron expression for scheduled pipeline
        cron_expression = deployment.schedule.cron_expression
        if not cron_expression:
            raise RuntimeError(
                "A cron expression is required for scheduled pipelines."
            )
        expected_cron_pattern = r"^(?:(?:[0-9]|[1-5][0-9]|60)(?:,(?:[0-9]|[1-5][0-9]|60))*|[*](?:\/[1-9][0-9]*)?)(?:[ \t]+(?:(?:[0-9]|[0-5][0-9]|60)(?:,(?:[0-9]|[0-5][0-9]|60))*|[*](?:\/[1-9][0-9]*)?)){4}$"
        if not re.match(expected_cron_pattern, cron_expression):
            raise RuntimeError(
                f"The cron expression '{cron_expression}' is not in a valid format."
            )

        # Log about scheduling
        logger.info(f"Requested cron expression: {cron_expression}")
        logger.info("Scheduling ZenML pipeline on HyperAI instance...")

        # Create cron job for scheduled pipeline on HyperAI instance
        stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
            f"(crontab -l ; echo '{cron_expression} bash {directory_name}/run_pipeline.sh') | crontab -"
        )

        logger.info(
            f"Pipeline scheduled successfully in crontab with cron expression: {cron_expression}"
        )
    elif deployment.schedule and deployment.schedule.run_once_start_time:
        # Get start time for scheduled pipeline
        start_time = deployment.schedule.run_once_start_time

        # Log about scheduling
        logger.info(f"Requested start time: {start_time}")
        logger.info("Scheduling ZenML pipeline on HyperAI instance...")

        # Check if `at` is installed on HyperAI instance
        stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
            "which at"
        )
        if not stdout.readlines():
            raise RuntimeError(
                "The `at` command is not installed on the HyperAI instance. Please install it to use start times for scheduled pipelines."
            )

        # Convert start time into YYYYMMDDHHMM.SS format
        start_time_str = start_time.strftime("%Y%m%d%H%M.%S")

        # Create cron job for scheduled pipeline on HyperAI instance
        stdin, stdout, stderr = paramiko_client.exec_command(  # nosec
            f"echo 'bash {directory_name}/run_pipeline.sh' | at -t {start_time_str}"
        )

        logger.info(
            f"Pipeline scheduled successfully to run once at: {start_time}"
        )
    else:
        raise RuntimeError(
            "A cron expression or start time is required for scheduled pipelines."
        )

service_connectors special

HyperAI Service Connector.

hyperai_service_connector

HyperAI Service Connector.

The HyperAI Service Connector allows authenticating to HyperAI (hyperai.ai) GPU equipped instances.

HyperAIAuthenticationMethods (StrEnum)

HyperAI Authentication methods.

Source code in zenml/integrations/hyperai/service_connectors/hyperai_service_connector.py
class HyperAIAuthenticationMethods(StrEnum):
    """HyperAI Authentication methods."""

    RSA_KEY_OPTIONAL_PASSPHRASE = "rsa-key"
    DSA_KEY_OPTIONAL_PASSPHRASE = "dsa-key"
    ECDSA_KEY_OPTIONAL_PASSPHRASE = "ecdsa-key"
    ED25519_KEY_OPTIONAL_PASSPHRASE = "ed25519-key"
HyperAIConfiguration (HyperAICredentials)

HyperAI client configuration.

Source code in zenml/integrations/hyperai/service_connectors/hyperai_service_connector.py
class HyperAIConfiguration(HyperAICredentials):
    """HyperAI client configuration."""

    hostnames: List[str] = Field(
        title="Hostnames of the supported HyperAI instances.",
    )

    username: str = Field(
        title="Username to use to connect to the HyperAI instance.",
    )
HyperAICredentials (AuthenticationConfig)

HyperAI client authentication credentials.

Source code in zenml/integrations/hyperai/service_connectors/hyperai_service_connector.py
class HyperAICredentials(AuthenticationConfig):
    """HyperAI client authentication credentials."""

    base64_ssh_key: PlainSerializedSecretStr = Field(
        title="SSH key (base64)",
    )
    ssh_passphrase: Optional[PlainSerializedSecretStr] = Field(
        default=None,
        title="SSH key passphrase",
    )
HyperAIServiceConnector (ServiceConnector)

HyperAI service connector.

Source code in zenml/integrations/hyperai/service_connectors/hyperai_service_connector.py
class HyperAIServiceConnector(ServiceConnector):
    """HyperAI service connector."""

    config: HyperAIConfiguration

    @classmethod
    def _get_connector_type(cls) -> ServiceConnectorTypeModel:
        """Get the service connector specification.

        Returns:
            The service connector specification.
        """
        return HYPERAI_SERVICE_CONNECTOR_TYPE_SPEC

    def _paramiko_key_type_given_auth_method(self) -> Type[paramiko.PKey]:
        """Get the Paramiko key type given the authentication method.

        Returns:
            The Paramiko key type.

        Raises:
            ValueError: If the authentication method is invalid.
        """
        mapping = {
            HyperAIAuthenticationMethods.RSA_KEY_OPTIONAL_PASSPHRASE: paramiko.RSAKey,
            HyperAIAuthenticationMethods.DSA_KEY_OPTIONAL_PASSPHRASE: paramiko.DSSKey,
            HyperAIAuthenticationMethods.ECDSA_KEY_OPTIONAL_PASSPHRASE: paramiko.ECDSAKey,
            HyperAIAuthenticationMethods.ED25519_KEY_OPTIONAL_PASSPHRASE: paramiko.Ed25519Key,
        }

        try:
            return mapping[HyperAIAuthenticationMethods(self.auth_method)]
        except KeyError:
            raise ValueError(
                f"Invalid authentication method: {self.auth_method}"
            )

    def _create_paramiko_client(
        self, hostname: str
    ) -> paramiko.client.SSHClient:
        """Create a Paramiko SSH client based on the configuration.

        Args:
            hostname: The hostname of the HyperAI instance.

        Returns:
            A Paramiko SSH client.

        Raises:
            AuthorizationException: If the client cannot be created.
        """
        if self.config.ssh_passphrase is None:
            ssh_passphrase = None
        else:
            ssh_passphrase = self.config.ssh_passphrase.get_secret_value()

        # Connect to the HyperAI instance
        try:
            # Convert the SSH key from base64 to string
            base64_key_value = self.config.base64_ssh_key.get_secret_value()
            ssh_key = base64.b64decode(base64_key_value).decode("utf-8")
            paramiko_key = None

            with io.StringIO(ssh_key) as f:
                paramiko_key = self._paramiko_key_type_given_auth_method().from_private_key(
                    f, password=ssh_passphrase
                )

            # Trim whitespace from the IP address
            hostname = hostname.strip()

            paramiko_client = paramiko.client.SSHClient()
            paramiko_client.set_missing_host_key_policy(
                paramiko.AutoAddPolicy()  # nosec
            )
            paramiko_client.connect(
                hostname=hostname,
                username=self.config.username,
                pkey=paramiko_key,
                timeout=30,
            )

            return paramiko_client

        except paramiko.ssh_exception.BadHostKeyException as e:
            logger.error("Bad host key: %s", e)
        except paramiko.ssh_exception.AuthenticationException as e:
            logger.error("Authentication failed: %s", e)
        except paramiko.ssh_exception.SSHException as e:
            logger.error(
                "SSH error: %s. A common cause for this error is selection of the wrong key type in your service connector.",
                e,
            )
        except Exception as e:
            logger.error(
                "Unknown error while connecting to HyperAI instance: %s. Please check your network connection, IP address, and authentication details.",
                e,
            )

        raise AuthorizationException(
            "Could not create SSH client for HyperAI instance."
        )

    def _authorize_client(self, hostname: str) -> None:
        """Verify that the client can authenticate with the HyperAI instance.

        Args:
            hostname: The hostname of the HyperAI instance.
        """
        logger.info("Verifying connection to HyperAI instance...")

        paramiko_client = self._create_paramiko_client(hostname)
        paramiko_client.close()

    def _connect_to_resource(
        self,
        **kwargs: Any,
    ) -> Any:
        """Connect to a HyperAI instance. Returns an authenticated SSH client.

        Args:
            kwargs: Additional implementation specific keyword arguments to pass
                to the session or client constructor.

        Returns:
            An authenticated Paramiko SSH client.
        """
        logger.info("Connecting to HyperAI instance...")
        assert self.resource_id is not None

        paramiko_client = self._create_paramiko_client(self.resource_id)
        return paramiko_client

    def _configure_local_client(
        self,
        **kwargs: Any,
    ) -> None:
        """There is no local client for the HyperAI connector, so it does nothing.

        Args:
            kwargs: Additional implementation specific keyword arguments to pass
                to the session or client constructor.

        Raises:
            NotImplementedError: If there is no local client for the HyperAI
                connector.
        """
        raise NotImplementedError(
            "There is no local client for the HyperAI connector."
        )

    @classmethod
    def _auto_configure(
        cls,
        auth_method: Optional[str] = None,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
        **kwargs: Any,
    ) -> "HyperAIServiceConnector":
        """Auto-configure the connector.

        Not supported by the HyperAI connector.

        Args:
            auth_method: The particular authentication method to use. If not
                specified, the connector implementation must decide which
                authentication method to use or raise an exception.
            resource_type: The type of resource to configure.
            resource_id: The ID of the resource to configure. The
                implementation may choose to either require or ignore this
                parameter if it does not support or detect an resource type that
                supports multiple instances.
            kwargs: Additional implementation specific keyword arguments to use.

        Raises:
            NotImplementedError: If the connector auto-configuration fails or
                is not supported.
        """
        raise NotImplementedError(
            "Auto-configuration is not supported by the HyperAI connector."
        )

    def _verify(
        self,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
    ) -> List[str]:
        """Verify that a connection can be established to the HyperAI instance.

        Args:
            resource_type: The type of resource to verify. Must be set to the
                Docker resource type.
            resource_id: The HyperAI instance to verify.

        Returns:
            The resource ID if the connection can be established.

        Raises:
            ValueError: If the resource ID is not in the list of configured
                hostnames.
        """
        if resource_id:
            if resource_id not in self.config.hostnames:
                raise ValueError(
                    f"The supplied hostname '{resource_id}' is not in the list "
                    f"of configured hostnames: {self.config.hostnames}. Please "
                    f"check your configuration."
                )
            hostnames = [resource_id]
        else:
            hostnames = self.config.hostnames

        resources = []
        for hostname in hostnames:
            self._authorize_client(hostname)
            resources.append(hostname)

        return resources