Skip to content

Lightning

zenml.integrations.lightning special

Initialization of the Lightning integration for ZenML.

LightningIntegration (Integration)

Definition of Lightning Integration for ZenML.

Source code in zenml/integrations/lightning/__init__.py
class LightningIntegration(Integration):
    """Definition of Lightning Integration for ZenML."""

    NAME = LIGHTNING
    REQUIREMENTS = ["lightning-sdk>=0.1.17"]

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Lightning integration.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.lightning.flavors import (
            LightningOrchestratorFlavor,
        )

        return [
            LightningOrchestratorFlavor,
        ]

flavors() classmethod

Declare the stack component flavors for the Lightning integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/lightning/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Lightning integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.lightning.flavors import (
        LightningOrchestratorFlavor,
    )

    return [
        LightningOrchestratorFlavor,
    ]

flavors special

Lightning integration flavors.

lightning_orchestrator_flavor

Lightning orchestrator base config and settings.

LightningOrchestratorConfig (BaseOrchestratorConfig, LightningOrchestratorSettings)

Lightning orchestrator base config.

Source code in zenml/integrations/lightning/flavors/lightning_orchestrator_flavor.py
class LightningOrchestratorConfig(
    BaseOrchestratorConfig, LightningOrchestratorSettings
):
    """Lightning orchestrator base config."""

    @property
    def is_local(self) -> bool:
        """Checks if this stack component is running locally.

        Returns:
            True if this config is for a local component, False otherwise.
        """
        return False

    @property
    def is_synchronous(self) -> bool:
        """Whether the orchestrator runs synchronous or not.

        Returns:
            Whether the orchestrator runs synchronous or not.
        """
        return self.synchronous

    @property
    def is_schedulable(self) -> bool:
        """Whether the orchestrator is schedulable or not.

        Returns:
            Whether the orchestrator is schedulable or not.
        """
        return False
is_local: bool property readonly

Checks if this stack component is running locally.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

is_schedulable: bool property readonly

Whether the orchestrator is schedulable or not.

Returns:

Type Description
bool

Whether the orchestrator is schedulable or not.

is_synchronous: bool property readonly

Whether the orchestrator runs synchronous or not.

Returns:

Type Description
bool

Whether the orchestrator runs synchronous or not.

LightningOrchestratorFlavor (BaseOrchestratorFlavor)

Lightning orchestrator flavor.

Source code in zenml/integrations/lightning/flavors/lightning_orchestrator_flavor.py
class LightningOrchestratorFlavor(BaseOrchestratorFlavor):
    """Lightning orchestrator flavor."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return LIGHTNING_ORCHESTRATOR_FLAVOR

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/lightning.png"

    @property
    def config_class(self) -> Type[LightningOrchestratorConfig]:
        """Returns `KubeflowOrchestratorConfig` config class.

        Returns:
                The config class.
        """
        return LightningOrchestratorConfig

    @property
    def implementation_class(self) -> Type["LightningOrchestrator"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.lightning.orchestrators import (
            LightningOrchestrator,
        )

        return LightningOrchestrator
config_class: Type[zenml.integrations.lightning.flavors.lightning_orchestrator_flavor.LightningOrchestratorConfig] property readonly

Returns KubeflowOrchestratorConfig config class.

Returns:

Type Description
Type[zenml.integrations.lightning.flavors.lightning_orchestrator_flavor.LightningOrchestratorConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[LightningOrchestrator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[LightningOrchestrator]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

LightningOrchestratorSettings (BaseSettings)

Lightning orchestrator base settings.

Attributes:

Name Type Description
main_studio_name Optional[str]

Main studio name.

machine_type Optional[str]

Machine type.

user_id Optional[str]

User id.

api_key Optional[str]

api_key.

username Optional[str]

Username.

teamspace Optional[str]

Teamspace.

organization Optional[str]

Organization.

custom_commands Optional[List[str]]

Custom commands to run.

synchronous bool

If True, the client running a pipeline using this orchestrator waits until all steps finish running. If False, the client returns immediately and the pipeline is executed asynchronously. Defaults to True. This setting only has an effect when specified on the pipeline and will be ignored if specified on steps.

Source code in zenml/integrations/lightning/flavors/lightning_orchestrator_flavor.py
class LightningOrchestratorSettings(BaseSettings):
    """Lightning orchestrator base settings.

    Attributes:
        main_studio_name: Main studio name.
        machine_type: Machine type.
        user_id: User id.
        api_key: api_key.
        username: Username.
        teamspace: Teamspace.
        organization: Organization.
        custom_commands: Custom commands to run.
        synchronous: If `True`, the client running a pipeline using this
            orchestrator waits until all steps finish running. If `False`,
            the client returns immediately and the pipeline is executed
            asynchronously. Defaults to `True`. This setting only
            has an effect when specified on the pipeline and will be ignored if
            specified on steps.
    """

    # Resources
    main_studio_name: Optional[str] = None
    machine_type: Optional[str] = None
    user_id: Optional[str] = SecretField(default=None)
    api_key: Optional[str] = SecretField(default=None)
    username: Optional[str] = None
    teamspace: Optional[str] = None
    organization: Optional[str] = None
    custom_commands: Optional[List[str]] = None
    synchronous: bool = True

orchestrators special

Initialization of the Lightning ZenML orchestrator.

lightning_orchestrator

Implementation of the Lightning orchestrator.

LightningOrchestrator (WheeledOrchestrator)

Base class for Orchestrator responsible for running pipelines remotely in a VM.

This orchestrator does not support running on a schedule.

Source code in zenml/integrations/lightning/orchestrators/lightning_orchestrator.py
class LightningOrchestrator(WheeledOrchestrator):
    """Base class for Orchestrator responsible for running pipelines remotely in a VM.

    This orchestrator does not support running on a schedule.
    """

    @property
    def validator(self) -> Optional[StackValidator]:
        """Validates the stack.

        In the remote case, checks that the stack contains a container registry,
        image builder and only remote components.

        Returns:
            A `StackValidator` instance.
        """

        def _validate_remote_components(
            stack: "Stack",
        ) -> Tuple[bool, str]:
            for component in stack.components.values():
                if not component.config.is_local:
                    continue

                # return False, (
                #    f"The Lightning orchestrator runs pipelines remotely, "
                #    f"but the '{component.name}' {component.type.value} is "
                #    "a local stack component and will not be available in "
                #    "the Lightning step.\nPlease ensure that you always "
                #    "use non-local stack components with the Lightning "
                #    "orchestrator."
                # )

            return True, ""

        return StackValidator(
            custom_validation_function=_validate_remote_components,
        )

    def _set_lightning_env_vars(
        self,
        deployment: "PipelineDeploymentResponse",
    ) -> None:
        """Set up the Lightning client using environment variables.

        Args:
            deployment: The pipeline deployment to prepare or run.

        Raises:
            ValueError: If the user id and api key or username and organization
        """
        settings = cast(
            LightningOrchestratorSettings, self.get_settings(deployment)
        )
        if not settings.user_id or not settings.api_key:
            raise ValueError(
                "Lightning orchestrator requires `user_id` and `api_key` both to be set in the settings."
            )
        os.environ["LIGHTNING_USER_ID"] = settings.user_id
        os.environ["LIGHTNING_API_KEY"] = settings.api_key
        if settings.username:
            os.environ["LIGHTNING_USERNAME"] = settings.username
        elif settings.organization:
            os.environ["LIGHTNING_ORG"] = settings.organization
        else:
            raise ValueError(
                "Lightning orchestrator requires either `username` or `organization` to be set in the settings."
            )
        if settings.teamspace:
            os.environ["LIGHTNING_TEAMSPACE"] = settings.teamspace

    @property
    def config(self) -> LightningOrchestratorConfig:
        """Returns the `LightningOrchestratorConfig` config.

        Returns:
            The configuration.
        """
        return cast(LightningOrchestratorConfig, self._config)

    @property
    def settings_class(self) -> Type[LightningOrchestratorSettings]:
        """Settings class for the Lightning orchestrator.

        Returns:
            The settings class.
        """
        return LightningOrchestratorSettings

    def get_orchestrator_run_id(self) -> str:
        """Returns the active orchestrator run id.

        Raises:
            RuntimeError: If no run id exists. This happens when this method
                gets called while the orchestrator is not running a pipeline.

        Returns:
            The orchestrator run id.

        Raises:
            RuntimeError: If the run id cannot be read from the environment.
        """
        try:
            return os.environ[ENV_ZENML_LIGHTNING_ORCHESTRATOR_RUN_ID]
        except KeyError:
            raise RuntimeError(
                "Unable to read run id from environment variable "
                f"{ENV_ZENML_LIGHTNING_ORCHESTRATOR_RUN_ID}."
            )

    @property
    def root_directory(self) -> str:
        """Path to the root directory for all files concerning this orchestrator.

        Returns:
            Path to the root directory.
        """
        return os.path.join(
            io_utils.get_global_config_directory(),
            "lightning",
            str(self.id),
        )

    @property
    def pipeline_directory(self) -> str:
        """Returns path to a directory in which the kubeflow pipeline files are stored.

        Returns:
            Path to the pipeline directory.
        """
        return os.path.join(self.root_directory, "pipelines")

    def setup_credentials(self) -> None:
        """Set up credentials for the orchestrator."""
        connector = self.get_connector()
        assert connector is not None
        connector.configure_local_client()

    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeploymentResponse",
        stack: "Stack",
        environment: Dict[str, str],
    ) -> Any:
        """Creates a wheel and uploads the pipeline to Lightning.

        This functions as an intermediary representation of the pipeline which
        is then deployed to the kubeflow pipelines instance.

        How it works:
        -------------
        Before this method is called the `prepare_pipeline_deployment()`
        method builds a docker image that contains the code for the
        pipeline, all steps the context around these files.

        Based on this docker image a callable is created which builds
        task for each step (`_construct_lightning_pipeline`).
        To do this the entrypoint of the docker image is configured to
        run the correct step within the docker image. The dependencies
        between these task are then also configured onto each
        task by pointing at the downstream steps.

        Args:
            deployment: The pipeline deployment to prepare or run.
            stack: The stack the pipeline will run on.
            environment: Environment variables to set in the orchestration
                environment.

        Raises:
            ValueError: If the schedule is not set or if the cron expression
                is not set.
        """
        settings = cast(
            LightningOrchestratorSettings, self.get_settings(deployment)
        )
        if deployment.schedule:
            if (
                deployment.schedule.catchup
                or deployment.schedule.interval_second
            ):
                logger.warning(
                    "Lightning orchestrator only uses schedules with the "
                    "`cron_expression` property, with optional `start_time` and/or `end_time`. "
                    "All other properties are ignored."
                )
            if deployment.schedule.cron_expression is None:
                raise ValueError(
                    "Property `cron_expression` must be set when passing "
                    "schedule to a Lightning orchestrator."
                )
            if deployment.schedule.cron_expression:
                raise ValueError(
                    "Property `schedule_timezone` must be set when passing "
                    "`cron_expression` to a Lightning orchestrator."
                    "Lightning orchestrator requires a Java Timezone ID to run the pipeline on schedule."
                    "Please refer to https://docs.oracle.com/middleware/1221/wcs/tag-ref/MISC/TimeZones.html for more information."
                )

        # Get deployment id
        deployment_id = deployment.id

        pipeline_name = deployment.pipeline_configuration.name
        orchestrator_run_name = get_orchestrator_run_name(pipeline_name)

        # Copy the repository to a temporary directory and add a setup.py file
        # repository_temp_dir = (
        #    self.copy_repository_to_temp_dir_and_add_setup_py()
        # )

        # Create a wheel for the package in the temporary directory
        # wheel_path = self.create_wheel(temp_dir=repository_temp_dir)
        code_archive = code_utils.CodeArchive(
            root=source_utils.get_source_root()
        )
        logger.info("Archiving pipeline code...")
        with tempfile.NamedTemporaryFile(
            mode="w+b", delete=False, suffix=".tar.gz"
        ) as code_file:
            code_archive.write_archive(code_file)
            code_path = code_file.name
        filename = f"{orchestrator_run_name}.tar.gz"
        # Construct the env variables for the pipeline
        env_vars = environment.copy()
        orchestrator_run_id = str(uuid4())
        env_vars[ENV_ZENML_LIGHTNING_ORCHESTRATOR_RUN_ID] = orchestrator_run_id
        # Set up some variables for configuration
        env_vars[ENV_ZENML_CUSTOM_SOURCE_ROOT] = (
            LIGHTNING_ZENML_DEFAULT_CUSTOM_REPOSITORY_PATH
        )
        env_vars[ENV_ZENML_WHEEL_PACKAGE_NAME] = self.package_name

        # Create a line-by-line export of environment variables
        env_exports = "\n".join(
            [f"export {key}='{value}'" for key, value in env_vars.items()]
        )

        # Write the environment variables to a temporary file
        with tempfile.NamedTemporaryFile(
            mode="w", delete=False, suffix=".studiorc"
        ) as temp_file:
            temp_file.write(env_exports)
            env_file_path = temp_file.name

        # Gather the requirements
        pipeline_docker_settings = (
            deployment.pipeline_configuration.docker_settings
        )
        pipeline_requirements = gather_requirements(pipeline_docker_settings)
        pipeline_requirements_to_string = " ".join(
            f'"{req}"' for req in pipeline_requirements
        )

        def _construct_lightning_steps(
            deployment: "PipelineDeploymentResponse",
        ) -> Dict[str, Dict[str, Any]]:
            """Construct the steps for the pipeline.

            Args:
                deployment: The pipeline deployment to prepare or run.

            Returns:
                The steps for the pipeline.
            """
            steps = {}
            for step_name, step in deployment.step_configurations.items():
                # The arguments are passed to configure the entrypoint of the
                # docker container when the step is called.
                entrypoint_command = (
                    StepEntrypointConfiguration.get_entrypoint_command()
                )
                entrypoint_arguments = (
                    StepEntrypointConfiguration.get_entrypoint_arguments(
                        step_name=step_name,
                        deployment_id=deployment_id,
                    )
                )
                entrypoint = entrypoint_command + entrypoint_arguments
                entrypoint_string = " ".join(entrypoint)

                step_settings = cast(
                    LightningOrchestratorSettings, self.get_settings(step)
                )

                # Gather the requirements
                step_docker_settings = step.config.docker_settings
                step_requirements = gather_requirements(step_docker_settings)
                step_requirements_to_string = " ".join(
                    f'"{req}"' for req in step_requirements
                )

                # Construct the command to run the step
                run_command = f"{entrypoint_string}"
                commands = [run_command]
                steps[step_name] = {
                    "commands": commands,
                    "requirements": step_requirements_to_string,
                    "machine": step_settings.machine_type
                    if step_settings != settings
                    else None,
                }
            return steps

        if not settings.synchronous:
            entrypoint_command = LightningOrchestratorEntrypointConfiguration.get_entrypoint_command()
            entrypoint_arguments = LightningOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
                run_name=orchestrator_run_name,
                deployment_id=deployment.id,
            )
            entrypoint = entrypoint_command + entrypoint_arguments
            entrypoint_string = " ".join(entrypoint)
            logger.info("Setting up Lightning AI client")
            self._set_lightning_env_vars(deployment)

            studio_name = sanitize_studio_name(
                "zenml_async_orchestrator_studio"
            )
            logger.info(f"Creating main studio: {studio_name}")
            studio = Studio(name=studio_name)
            studio.start()

            logger.info(
                "Uploading wheel package and installing dependencies on main studio"
            )
            studio.run(
                f"mkdir -p /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]}"
            )
            studio.upload_file(
                code_path,
                remote_path=f"/teamspace/studios/this_studio/zenml_codes/{filename}",
            )
            time.sleep(10)
            studio.run(
                f"tar -xvzf /teamspace/studios/this_studio/zenml_codes/{filename} -C /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]}"
            )
            studio.upload_file(env_file_path)
            time.sleep(6)
            studio.run(
                f"cp {env_file_path.split('/')[-1]} ./.lightning_studio/.studiorc"
            )
            studio.run(f"rm {env_file_path.split('/')[-1]}")

            studio.run("pip install uv")
            logger.info(
                f"Installing requirements: {pipeline_requirements_to_string}"
            )
            studio.run(f"uv pip install {pipeline_requirements_to_string}")
            studio.run("pip install zenml")

            for custom_command in settings.custom_commands or []:
                studio.run(
                    f"cd /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]} && {custom_command}"
                )
            # studio.run(f"pip install {wheel_path.rsplit('/', 1)[-1]}")
            logger.info("Running pipeline in async mode")
            studio.run(
                f"nohup bash -c 'cd /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]} && {entrypoint_string}' > log_{filename.rsplit('.', 2)[0]}.txt 2>&1 &"
            )
            logger.info(
                f"The pipeline is running in async mode, you can keep checking the logs by running the following command: `lightning download -s vision-model/zenml-async-orchestrator-studio -p /teamspace/studios/this_studio/log_{filename.rsplit('.', 2)[0]}.txt && cat log_{filename.rsplit('.', 2)[0]}.txt`"
            )
        else:
            self._upload_and_run_pipeline(
                deployment,
                orchestrator_run_id,
                pipeline_requirements_to_string,
                settings,
                _construct_lightning_steps(deployment),
                code_path,
                filename,
                env_file_path,
            )
        os.unlink(env_file_path)

    def _upload_and_run_pipeline(
        self,
        deployment: "PipelineDeploymentResponse",
        orchestrator_run_id: str,
        requirements: str,
        settings: LightningOrchestratorSettings,
        steps_commands: Dict[str, Dict[str, Any]],
        code_path: str,
        filename: str,
        env_file_path: str,
    ) -> None:
        """Upload and run the pipeline on Lightning Studio.

        Args:
            deployment: The pipeline deployment to prepare or run.
            orchestrator_run_id: The orchestrator run id.
            requirements: The requirements for the pipeline.
            settings: The orchestrator settings.
            steps_commands: The commands for the steps.
            code_path: The path to the wheel package.
            filename: The name of the code archive.
            env_file_path: The path to the environment file.

        Raises:
            Exception: If an error occurs while running the pipeline.
        """
        logger.info("Setting up Lightning AI client")
        self._set_lightning_env_vars(deployment)

        if settings.main_studio_name:
            studio_name = settings.main_studio_name
            studio = Studio(name=studio_name)
            if (
                studio.machine != settings.machine_type
                and settings.machine_type
            ):
                studio.switch_machine(Machine(settings.machine_type))
        else:
            studio_name = sanitize_studio_name(
                f"zenml_{orchestrator_run_id}_pipeline"
            )
            logger.info(f"Creating main studio: {studio_name}")
            studio = Studio(name=studio_name)
            if settings.machine_type:
                studio.start(Machine(settings.machine_type))
            else:
                studio.start()
        try:
            studio.run(
                f"mkdir -p /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]}"
            )
            studio.upload_file(
                code_path,
                remote_path=f"/teamspace/studios/this_studio/zenml_codes/{filename}",
            )
            time.sleep(6)
            studio.run(
                f"tar -xvzf /teamspace/studios/this_studio/zenml_codes/{filename} -C /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]}"
            )
            logger.info(
                "Uploading wheel package and installing dependencies on main studio"
            )
            # for some reason uploading file directly to /teamspace/studios/this_studio/.lightning_studio/.studiorc
            # doesn't work, while other file names can be uploaded just fine.
            # below is a workaround to copy the file to the correct location.
            # we first upload it under a different name and then copy it to the
            # right location.
            studio.upload_file(env_file_path)
            time.sleep(6)
            studio.run(
                f"cp {env_file_path.split('/')[-1]} ./.lightning_studio/.studiorc"
            )
            studio.run(f"rm {env_file_path.split('/')[-1]}")

            studio.run("pip install uv")
            studio.run(f"uv pip install {requirements}")
            studio.run("pip install zenml")
            # studio.run(f"pip install {wheel_path.rsplit('/', 1)[-1]}")
            for command in settings.custom_commands or []:
                output = studio.run(
                    f"cd /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]} && {command}"
                )
                logger.info(f"Custom command output: {output}")

            for step_name, details in steps_commands.items():
                if details["machine"]:
                    logger.info(f"Executing step: {step_name} in new studio")
                    self._run_step_in_new_studio(
                        orchestrator_run_id,
                        step_name,
                        details,
                        code_path,
                        filename,
                        env_file_path,
                        settings.custom_commands,
                    )
                else:
                    logger.info(f"Executing step: {step_name} in main studio")
                    self._run_step_in_main_studio(studio, details, filename)
        except Exception as e:
            logger.error(f"Error running pipeline: {e}")
            raise e
        finally:
            if (
                studio.status != studio.status.NotCreated
                and settings.main_studio_name is None
            ):
                logger.info("Deleting main studio")
                studio.delete()

    def _run_step_in_new_studio(
        self,
        orchestrator_run_id: str,
        step_name: str,
        details: Dict[str, Any],
        code_path: str,
        filename: str,
        env_file_path: str,
        custom_commands: Optional[List[str]] = None,
    ) -> None:
        """Run a step in a new studio.

        Args:
            orchestrator_run_id: The orchestrator run id.
            step_name: The name of the step.
            details: The details of the step.
            code_path: The path to the wheel package.
            filename: The name of the code archive.
            env_file_path: The path to the environment file.
            custom_commands: Custom commands to run.
        """
        studio_name = sanitize_studio_name(
            f"zenml_{orchestrator_run_id}_{step_name}"
        )
        logger.info(f"Creating new studio for step {step_name}: {studio_name}")
        studio = Studio(name=studio_name)
        studio.start(Machine(details["machine"]))
        studio.run(
            f"mkdir -p /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]}"
        )
        studio.upload_file(
            code_path,
            remote_path=f"/teamspace/studios/this_studio/zenml_codes/{filename}",
        )
        time.sleep(6)
        studio.run(
            f"tar -xvzf /teamspace/studios/this_studio/zenml_codes/{filename} -C /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]}"
        )
        studio.upload_file(env_file_path)
        time.sleep(6)
        studio.run(
            f"cp {env_file_path.split('/')[-1]} ./.lightning_studio/.studiorc"
        )
        studio.run(f"rm {env_file_path.split('/')[-1]}")

        studio.run("pip install uv")
        studio.run(f"uv pip install {details['requirements']}")
        studio.run("pip install zenml")
        # studio.run(f"pip install {wheel_path.rsplit('/', 1)[-1]}")
        for command in custom_commands or []:
            output = studio.run(
                f"cd /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]} && {command}"
            )
            logger.info(f"Custom command output: {output}")
        for command in details["commands"]:
            output = studio.run(
                f"cd /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]} && {command}"
            )
            logger.info(f"Step {step_name} output: {output}")
        studio.delete()

    def _run_step_in_main_studio(
        self, studio: Studio, details: Dict[str, Any], filename: str
    ) -> None:
        """Run a step in the main studio.

        Args:
            studio: The studio to run the step in.
            details: The details of the step.
            filename: The name of the code archive.
        """
        for command in details["commands"]:
            output = studio.run(
                f"cd /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]} && {command}"
            )
            logger.info(f"Step output: {output}")
config: LightningOrchestratorConfig property readonly

Returns the LightningOrchestratorConfig config.

Returns:

Type Description
LightningOrchestratorConfig

The configuration.

pipeline_directory: str property readonly

Returns path to a directory in which the kubeflow pipeline files are stored.

Returns:

Type Description
str

Path to the pipeline directory.

root_directory: str property readonly

Path to the root directory for all files concerning this orchestrator.

Returns:

Type Description
str

Path to the root directory.

settings_class: Type[zenml.integrations.lightning.flavors.lightning_orchestrator_flavor.LightningOrchestratorSettings] property readonly

Settings class for the Lightning orchestrator.

Returns:

Type Description
Type[zenml.integrations.lightning.flavors.lightning_orchestrator_flavor.LightningOrchestratorSettings]

The settings class.

validator: Optional[zenml.stack.stack_validator.StackValidator] property readonly

Validates the stack.

In the remote case, checks that the stack contains a container registry, image builder and only remote components.

Returns:

Type Description
Optional[zenml.stack.stack_validator.StackValidator]

A StackValidator instance.

get_orchestrator_run_id(self)

Returns the active orchestrator run id.

Exceptions:

Type Description
RuntimeError

If no run id exists. This happens when this method gets called while the orchestrator is not running a pipeline.

Returns:

Type Description
str

The orchestrator run id.

Exceptions:

Type Description
RuntimeError

If the run id cannot be read from the environment.

Source code in zenml/integrations/lightning/orchestrators/lightning_orchestrator.py
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If no run id exists. This happens when this method
            gets called while the orchestrator is not running a pipeline.

    Returns:
        The orchestrator run id.

    Raises:
        RuntimeError: If the run id cannot be read from the environment.
    """
    try:
        return os.environ[ENV_ZENML_LIGHTNING_ORCHESTRATOR_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_LIGHTNING_ORCHESTRATOR_RUN_ID}."
        )
prepare_or_run_pipeline(self, deployment, stack, environment)

Creates a wheel and uploads the pipeline to Lightning.

This functions as an intermediary representation of the pipeline which is then deployed to the kubeflow pipelines instance.

How it works:

Before this method is called the prepare_pipeline_deployment() method builds a docker image that contains the code for the pipeline, all steps the context around these files.

Based on this docker image a callable is created which builds task for each step (_construct_lightning_pipeline). To do this the entrypoint of the docker image is configured to run the correct step within the docker image. The dependencies between these task are then also configured onto each task by pointing at the downstream steps.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required

Exceptions:

Type Description
ValueError

If the schedule is not set or if the cron expression is not set.

Source code in zenml/integrations/lightning/orchestrators/lightning_orchestrator.py
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
) -> Any:
    """Creates a wheel and uploads the pipeline to Lightning.

    This functions as an intermediary representation of the pipeline which
    is then deployed to the kubeflow pipelines instance.

    How it works:
    -------------
    Before this method is called the `prepare_pipeline_deployment()`
    method builds a docker image that contains the code for the
    pipeline, all steps the context around these files.

    Based on this docker image a callable is created which builds
    task for each step (`_construct_lightning_pipeline`).
    To do this the entrypoint of the docker image is configured to
    run the correct step within the docker image. The dependencies
    between these task are then also configured onto each
    task by pointing at the downstream steps.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment.

    Raises:
        ValueError: If the schedule is not set or if the cron expression
            is not set.
    """
    settings = cast(
        LightningOrchestratorSettings, self.get_settings(deployment)
    )
    if deployment.schedule:
        if (
            deployment.schedule.catchup
            or deployment.schedule.interval_second
        ):
            logger.warning(
                "Lightning orchestrator only uses schedules with the "
                "`cron_expression` property, with optional `start_time` and/or `end_time`. "
                "All other properties are ignored."
            )
        if deployment.schedule.cron_expression is None:
            raise ValueError(
                "Property `cron_expression` must be set when passing "
                "schedule to a Lightning orchestrator."
            )
        if deployment.schedule.cron_expression:
            raise ValueError(
                "Property `schedule_timezone` must be set when passing "
                "`cron_expression` to a Lightning orchestrator."
                "Lightning orchestrator requires a Java Timezone ID to run the pipeline on schedule."
                "Please refer to https://docs.oracle.com/middleware/1221/wcs/tag-ref/MISC/TimeZones.html for more information."
            )

    # Get deployment id
    deployment_id = deployment.id

    pipeline_name = deployment.pipeline_configuration.name
    orchestrator_run_name = get_orchestrator_run_name(pipeline_name)

    # Copy the repository to a temporary directory and add a setup.py file
    # repository_temp_dir = (
    #    self.copy_repository_to_temp_dir_and_add_setup_py()
    # )

    # Create a wheel for the package in the temporary directory
    # wheel_path = self.create_wheel(temp_dir=repository_temp_dir)
    code_archive = code_utils.CodeArchive(
        root=source_utils.get_source_root()
    )
    logger.info("Archiving pipeline code...")
    with tempfile.NamedTemporaryFile(
        mode="w+b", delete=False, suffix=".tar.gz"
    ) as code_file:
        code_archive.write_archive(code_file)
        code_path = code_file.name
    filename = f"{orchestrator_run_name}.tar.gz"
    # Construct the env variables for the pipeline
    env_vars = environment.copy()
    orchestrator_run_id = str(uuid4())
    env_vars[ENV_ZENML_LIGHTNING_ORCHESTRATOR_RUN_ID] = orchestrator_run_id
    # Set up some variables for configuration
    env_vars[ENV_ZENML_CUSTOM_SOURCE_ROOT] = (
        LIGHTNING_ZENML_DEFAULT_CUSTOM_REPOSITORY_PATH
    )
    env_vars[ENV_ZENML_WHEEL_PACKAGE_NAME] = self.package_name

    # Create a line-by-line export of environment variables
    env_exports = "\n".join(
        [f"export {key}='{value}'" for key, value in env_vars.items()]
    )

    # Write the environment variables to a temporary file
    with tempfile.NamedTemporaryFile(
        mode="w", delete=False, suffix=".studiorc"
    ) as temp_file:
        temp_file.write(env_exports)
        env_file_path = temp_file.name

    # Gather the requirements
    pipeline_docker_settings = (
        deployment.pipeline_configuration.docker_settings
    )
    pipeline_requirements = gather_requirements(pipeline_docker_settings)
    pipeline_requirements_to_string = " ".join(
        f'"{req}"' for req in pipeline_requirements
    )

    def _construct_lightning_steps(
        deployment: "PipelineDeploymentResponse",
    ) -> Dict[str, Dict[str, Any]]:
        """Construct the steps for the pipeline.

        Args:
            deployment: The pipeline deployment to prepare or run.

        Returns:
            The steps for the pipeline.
        """
        steps = {}
        for step_name, step in deployment.step_configurations.items():
            # The arguments are passed to configure the entrypoint of the
            # docker container when the step is called.
            entrypoint_command = (
                StepEntrypointConfiguration.get_entrypoint_command()
            )
            entrypoint_arguments = (
                StepEntrypointConfiguration.get_entrypoint_arguments(
                    step_name=step_name,
                    deployment_id=deployment_id,
                )
            )
            entrypoint = entrypoint_command + entrypoint_arguments
            entrypoint_string = " ".join(entrypoint)

            step_settings = cast(
                LightningOrchestratorSettings, self.get_settings(step)
            )

            # Gather the requirements
            step_docker_settings = step.config.docker_settings
            step_requirements = gather_requirements(step_docker_settings)
            step_requirements_to_string = " ".join(
                f'"{req}"' for req in step_requirements
            )

            # Construct the command to run the step
            run_command = f"{entrypoint_string}"
            commands = [run_command]
            steps[step_name] = {
                "commands": commands,
                "requirements": step_requirements_to_string,
                "machine": step_settings.machine_type
                if step_settings != settings
                else None,
            }
        return steps

    if not settings.synchronous:
        entrypoint_command = LightningOrchestratorEntrypointConfiguration.get_entrypoint_command()
        entrypoint_arguments = LightningOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
            run_name=orchestrator_run_name,
            deployment_id=deployment.id,
        )
        entrypoint = entrypoint_command + entrypoint_arguments
        entrypoint_string = " ".join(entrypoint)
        logger.info("Setting up Lightning AI client")
        self._set_lightning_env_vars(deployment)

        studio_name = sanitize_studio_name(
            "zenml_async_orchestrator_studio"
        )
        logger.info(f"Creating main studio: {studio_name}")
        studio = Studio(name=studio_name)
        studio.start()

        logger.info(
            "Uploading wheel package and installing dependencies on main studio"
        )
        studio.run(
            f"mkdir -p /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]}"
        )
        studio.upload_file(
            code_path,
            remote_path=f"/teamspace/studios/this_studio/zenml_codes/{filename}",
        )
        time.sleep(10)
        studio.run(
            f"tar -xvzf /teamspace/studios/this_studio/zenml_codes/{filename} -C /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]}"
        )
        studio.upload_file(env_file_path)
        time.sleep(6)
        studio.run(
            f"cp {env_file_path.split('/')[-1]} ./.lightning_studio/.studiorc"
        )
        studio.run(f"rm {env_file_path.split('/')[-1]}")

        studio.run("pip install uv")
        logger.info(
            f"Installing requirements: {pipeline_requirements_to_string}"
        )
        studio.run(f"uv pip install {pipeline_requirements_to_string}")
        studio.run("pip install zenml")

        for custom_command in settings.custom_commands or []:
            studio.run(
                f"cd /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]} && {custom_command}"
            )
        # studio.run(f"pip install {wheel_path.rsplit('/', 1)[-1]}")
        logger.info("Running pipeline in async mode")
        studio.run(
            f"nohup bash -c 'cd /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]} && {entrypoint_string}' > log_{filename.rsplit('.', 2)[0]}.txt 2>&1 &"
        )
        logger.info(
            f"The pipeline is running in async mode, you can keep checking the logs by running the following command: `lightning download -s vision-model/zenml-async-orchestrator-studio -p /teamspace/studios/this_studio/log_{filename.rsplit('.', 2)[0]}.txt && cat log_{filename.rsplit('.', 2)[0]}.txt`"
        )
    else:
        self._upload_and_run_pipeline(
            deployment,
            orchestrator_run_id,
            pipeline_requirements_to_string,
            settings,
            _construct_lightning_steps(deployment),
            code_path,
            filename,
            env_file_path,
        )
    os.unlink(env_file_path)
setup_credentials(self)

Set up credentials for the orchestrator.

Source code in zenml/integrations/lightning/orchestrators/lightning_orchestrator.py
def setup_credentials(self) -> None:
    """Set up credentials for the orchestrator."""
    connector = self.get_connector()
    assert connector is not None
    connector.configure_local_client()

lightning_orchestrator_entrypoint

Entrypoint of the Lightning master/orchestrator STUDIO.

main()

Entrypoint of the Lightning master/orchestrator STUDIO.

This is the entrypoint of the Lightning master/orchestrator STUDIO. It is responsible for provisioning the STUDIO and running the pipeline steps in separate STUDIO.

Exceptions:

Type Description
TypeError

If the active stack's orchestrator is not an instance of LightningOrchestrator.

ValueError

If the active stack's container registry is None.

Source code in zenml/integrations/lightning/orchestrators/lightning_orchestrator_entrypoint.py
def main() -> None:
    """Entrypoint of the Lightning master/orchestrator STUDIO.

    This is the entrypoint of the Lightning master/orchestrator STUDIO. It is
    responsible for provisioning the STUDIO and running the pipeline steps in
    separate STUDIO.

    Raises:
        TypeError: If the active stack's orchestrator is not an instance of
            LightningOrchestrator.
        ValueError: If the active stack's container registry is None.
    """
    # Log to the container's stdout so it can be streamed by the client.
    logger.info("Lightning orchestrator STUDIO started.")

    # Parse / extract args.
    args = parse_args()

    orchestrator_run_id = os.environ.get(
        ENV_ZENML_LIGHTNING_ORCHESTRATOR_RUN_ID
    )
    if not orchestrator_run_id:
        raise ValueError(
            f"Environment variable '{ENV_ZENML_LIGHTNING_ORCHESTRATOR_RUN_ID}' is not set."
        )

    logger.info(f"Orchestrator run id: {orchestrator_run_id}")

    deployment = Client().get_deployment(args.deployment_id)
    filename = f"{args.run_name}.tar.gz"

    pipeline_dag = {
        step_name: step.spec.upstream_steps
        for step_name, step in deployment.step_configurations.items()
    }
    entrypoint_command = StepEntrypointConfiguration.get_entrypoint_command()

    active_stack = Client().active_stack

    orchestrator = active_stack.orchestrator
    if not isinstance(orchestrator, LightningOrchestrator):
        raise TypeError(
            "The active stack's orchestrator is not an instance of LightningOrchestrator."
        )

    # Set up credentials
    orchestrator._set_lightning_env_vars(deployment)

    pipeline_settings = cast(
        LightningOrchestratorSettings, orchestrator.get_settings(deployment)
    )

    # Gather the requirements
    pipeline_docker_settings = (
        deployment.pipeline_configuration.docker_settings
    )
    pipeline_requirements = gather_requirements(pipeline_docker_settings)
    pipeline_requirements_to_string = " ".join(
        f'"{req}"' for req in pipeline_requirements
    )

    unique_resource_configs: Dict[str, str] = {}
    main_studio_name = sanitize_studio_name(
        f"zenml_{orchestrator_run_id}_pipeline"
    )
    for step_name, step in deployment.step_configurations.items():
        step_settings = cast(
            LightningOrchestratorSettings,
            orchestrator.get_settings(step),
        )
        unique_resource_configs[step_name] = main_studio_name
        if pipeline_settings.machine_type != step_settings.machine_type:
            unique_resource_configs[step_name] = (
                f"zenml-{orchestrator_run_id}_{step_name}"
            )

    logger.info(f"Creating main studio: {main_studio_name}")
    main_studio = Studio(name=main_studio_name)
    if pipeline_settings.machine_type:
        main_studio.start(Machine(pipeline_settings.machine_type))
    else:
        main_studio.start()

    logger.info("Main studio started.")
    logger.info("Uploading code to main studio the code path: %s", filename)
    main_studio.upload_file(
        "/teamspace/studios/this_studio/.lightning_studio/.studiorc",
        remote_path="/teamspace/studios/this_studio/.lightning_studio/.studiorc",
    )
    output = main_studio.run(
        f"mkdir -p /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]}"
    )
    logger.info(output)
    main_studio.upload_file(
        f"/teamspace/studios/this_studio/zenml_codes/{filename}",
        remote_path=f"/teamspace/studios/this_studio/zenml_codes/{filename}",
    )
    logger.info("Extracting code... ")
    output = main_studio.run(
        f"tar -xvzf /teamspace/studios/this_studio/zenml_codes/{filename} -C /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]}"
    )
    logger.info(f"Code extraction output: {output}")
    logger.info("Installing requirements... ")

    output = main_studio.run("pip install uv")
    logger.info(output)
    output = main_studio.run(
        f"uv pip install {pipeline_requirements_to_string}"
    )
    logger.info(output)
    output = main_studio.run("pip install zenml")
    logger.info(output)

    for command in pipeline_settings.custom_commands or []:
        output = main_studio.run(
            f"cd /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]} && {command}"
        )
        logger.info(f"Custom command output: {output}")

    run = Client().list_pipeline_runs(
        sort_by="asc:created",
        size=1,
        deployment_id=args.deployment_id,
        status=ExecutionStatus.INITIALIZING,
    )[0]

    logger.info("Fetching pipeline run: %s", run.id)

    def run_step_on_lightning_studio(step_name: str) -> None:
        """Run a pipeline step in a separate Lightning STUDIO.

        Args:
            step_name: Name of the step.

        Raises:
            Exception: If an error occurs while running the step on the STUDIO.
        """
        step_args = StepEntrypointConfiguration.get_entrypoint_arguments(
            step_name=step_name,
            deployment_id=args.deployment_id,
        )

        entrypoint = entrypoint_command + step_args
        entrypoint_string = " ".join(entrypoint)
        run_command = f"{entrypoint_string}"

        step = deployment.step_configurations[step_name]
        if unique_resource_configs[step_name] != main_studio_name:
            logger.info(
                f"Creating separate studio for step: {unique_resource_configs[step_name]}"
            )
            # Get step settings
            step_settings = cast(
                LightningOrchestratorSettings,
                orchestrator.get_settings(step),
            )
            # Gather the requirements
            step_docker_settings = step.config.docker_settings
            step_requirements = gather_requirements(step_docker_settings)
            step_requirements_to_string = " ".join(
                f'"{req}"' for req in step_requirements
            )
            run_command = f"{entrypoint_string}"

            logger.info(
                f"Creating separate studio for step: {unique_resource_configs[step_name]}"
            )
            studio = Studio(name=unique_resource_configs[step_name])
            try:
                studio.start(Machine(step_settings.machine_type))
                output = studio.run(
                    f"mkdir -p /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]}"
                )
                logger.info(output)
                studio.upload_file(
                    f"/teamspace/studios/this_studio/zenml_codes/{filename}",
                    remote_path=f"/teamspace/studios/this_studio/zenml_codes/{filename}",
                )
                output = studio.run(
                    f"tar -xvzf /teamspace/studios/this_studio/zenml_codes/{filename} -C /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]}"
                )
                logger.info(output)
                studio.upload_file(
                    "/teamspace/studios/this_studio/.lightning_studio/.studiorc",
                    remote_path="/teamspace/studios/this_studio/.lightning_studio/.studiorc",
                )
                output = studio.run("pip install uv")
                logger.info(output)
                output = studio.run(
                    f"uv pip install {step_requirements_to_string}"
                )
                logger.info(output)
                output = studio.run("pip install zenml")
                logger.info(output)
                for command in step_settings.custom_commands or []:
                    output = studio.run(
                        f"cd /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]} && {command}"
                    )
                    logger.info(f"Custom command output: {output}")
                output = studio.run(
                    f"cd /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]} && {run_command}"
                )
                logger.info(output)
            except Exception as e:
                logger.error(
                    f"Error running step {step_name} on studio {unique_resource_configs[step_name]}: {e}"
                )
                raise e
            finally:
                studio.delete()
                studio.delete()
        else:
            output = main_studio.run(
                f"cd /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]} && {run_command}"
            )
            logger.info(output)

            # Pop the resource configuration for this step
        unique_resource_configs.pop(step_name)

        if main_studio_name in unique_resource_configs.values():
            # If there are more steps using this configuration, skip deprovisioning the cluster
            logger.info(
                f"Resource configuration for studio '{main_studio_name}' "
                "is used by subsequent steps. Skipping the deprovisioning of "
                "the studio."
            )
        else:
            # If there are no more steps using this configuration, down the cluster
            logger.info(
                f"Resource configuration for cluster '{main_studio_name}' "
                "is not used by subsequent steps. deprovisioning the cluster."
            )
            main_studio.delete()
        logger.info(f"Running step `{step_name}` on a Studio is completed.")

    ThreadedDagRunner(
        dag=pipeline_dag, run_fn=run_step_on_lightning_studio
    ).run()

    logger.info("Orchestration STUDIO provisioned.")
parse_args()

Parse entrypoint arguments.

Returns:

Type Description
Namespace

Parsed args.

Source code in zenml/integrations/lightning/orchestrators/lightning_orchestrator_entrypoint.py
def parse_args() -> argparse.Namespace:
    """Parse entrypoint arguments.

    Returns:
        Parsed args.
    """
    parser = argparse.ArgumentParser()
    parser.add_argument("--run_name", type=str, required=True)
    parser.add_argument("--deployment_id", type=str, required=True)
    return parser.parse_args()

lightning_orchestrator_entrypoint_configuration

Entrypoint configuration for the Lightning master/orchestrator VM.

LightningOrchestratorEntrypointConfiguration

Entrypoint configuration for the Lightning master/orchestrator VM.

Source code in zenml/integrations/lightning/orchestrators/lightning_orchestrator_entrypoint_configuration.py
class LightningOrchestratorEntrypointConfiguration:
    """Entrypoint configuration for the Lightning master/orchestrator VM."""

    @classmethod
    def get_entrypoint_options(cls) -> Set[str]:
        """Gets all the options required for running this entrypoint.

        Returns:
            Entrypoint options.
        """
        options = {
            RUN_NAME_OPTION,
            DEPLOYMENT_ID_OPTION,
        }
        return options

    @classmethod
    def get_entrypoint_command(cls) -> List[str]:
        """Returns a command that runs the entrypoint module.

        Returns:
            Entrypoint command.
        """
        command = [
            "python",
            "-m",
            "zenml.integrations.lightning.orchestrators.lightning_orchestrator_entrypoint",
        ]
        return command

    @classmethod
    def get_entrypoint_arguments(
        cls,
        run_name: str,
        deployment_id: "UUID",
    ) -> List[str]:
        """Gets all arguments that the entrypoint command should be called with.

        Args:
            run_name: Name of the ZenML run.
            deployment_id: ID of the deployment.

        Returns:
            List of entrypoint arguments.
        """
        args = [
            f"--{RUN_NAME_OPTION}",
            run_name,
            f"--{DEPLOYMENT_ID_OPTION}",
            str(deployment_id),
        ]

        return args
get_entrypoint_arguments(run_name, deployment_id) classmethod

Gets all arguments that the entrypoint command should be called with.

Parameters:

Name Type Description Default
run_name str

Name of the ZenML run.

required
deployment_id UUID

ID of the deployment.

required

Returns:

Type Description
List[str]

List of entrypoint arguments.

Source code in zenml/integrations/lightning/orchestrators/lightning_orchestrator_entrypoint_configuration.py
@classmethod
def get_entrypoint_arguments(
    cls,
    run_name: str,
    deployment_id: "UUID",
) -> List[str]:
    """Gets all arguments that the entrypoint command should be called with.

    Args:
        run_name: Name of the ZenML run.
        deployment_id: ID of the deployment.

    Returns:
        List of entrypoint arguments.
    """
    args = [
        f"--{RUN_NAME_OPTION}",
        run_name,
        f"--{DEPLOYMENT_ID_OPTION}",
        str(deployment_id),
    ]

    return args
get_entrypoint_command() classmethod

Returns a command that runs the entrypoint module.

Returns:

Type Description
List[str]

Entrypoint command.

Source code in zenml/integrations/lightning/orchestrators/lightning_orchestrator_entrypoint_configuration.py
@classmethod
def get_entrypoint_command(cls) -> List[str]:
    """Returns a command that runs the entrypoint module.

    Returns:
        Entrypoint command.
    """
    command = [
        "python",
        "-m",
        "zenml.integrations.lightning.orchestrators.lightning_orchestrator_entrypoint",
    ]
    return command
get_entrypoint_options() classmethod

Gets all the options required for running this entrypoint.

Returns:

Type Description
Set[str]

Entrypoint options.

Source code in zenml/integrations/lightning/orchestrators/lightning_orchestrator_entrypoint_configuration.py
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
    """Gets all the options required for running this entrypoint.

    Returns:
        Entrypoint options.
    """
    options = {
        RUN_NAME_OPTION,
        DEPLOYMENT_ID_OPTION,
    }
    return options

utils

Utility functions for the Lightning orchestrator.

gather_requirements(docker_settings)

Gather the requirements files.

Parameters:

Name Type Description Default
docker_settings DockerSettings

Docker settings.

required

Returns:

Type Description
List[str]

List of requirements.

Source code in zenml/integrations/lightning/orchestrators/utils.py
def gather_requirements(docker_settings: "DockerSettings") -> List[str]:
    """Gather the requirements files.

    Args:
        docker_settings: Docker settings.

    Returns:
        List of requirements.
    """
    docker_image_builder = PipelineDockerImageBuilder()
    requirements_files = docker_image_builder.gather_requirements_files(
        docker_settings=docker_settings,
        stack=Client().active_stack,
        log=False,
    )

    # Extract and clean the requirements
    requirements = list(
        itertools.chain.from_iterable(
            r[1].strip().split("\n") for r in requirements_files
        )
    )

    # Remove empty items and duplicates
    requirements = sorted(set(filter(None, requirements)))

    return requirements
sanitize_studio_name(studio_name)

Sanitize studio_names so they conform to Kubernetes studio naming convention.

Parameters:

Name Type Description Default
studio_name str

Arbitrary input studio_name.

required

Returns:

Type Description
str

Sanitized pod name.

Source code in zenml/integrations/lightning/orchestrators/utils.py
def sanitize_studio_name(studio_name: str) -> str:
    """Sanitize studio_names so they conform to Kubernetes studio naming convention.

    Args:
        studio_name: Arbitrary input studio_name.

    Returns:
        Sanitized pod name.
    """
    studio_name = re.sub(r"[^a-z0-9-]", "-", studio_name.lower())
    studio_name = re.sub(r"^[-]+", "", studio_name)
    return re.sub(r"[-]+", "-", studio_name)