Lightning
zenml.integrations.lightning
special
Initialization of the Lightning integration for ZenML.
LightningIntegration (Integration)
Definition of Lightning Integration for ZenML.
Source code in zenml/integrations/lightning/__init__.py
class LightningIntegration(Integration):
"""Definition of Lightning Integration for ZenML."""
NAME = LIGHTNING
REQUIREMENTS = ["lightning-sdk>=0.1.17"]
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Lightning integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.lightning.flavors import (
LightningOrchestratorFlavor,
)
return [
LightningOrchestratorFlavor,
]
flavors()
classmethod
Declare the stack component flavors for the Lightning integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/lightning/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Lightning integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.lightning.flavors import (
LightningOrchestratorFlavor,
)
return [
LightningOrchestratorFlavor,
]
flavors
special
Lightning integration flavors.
lightning_orchestrator_flavor
Lightning orchestrator base config and settings.
LightningOrchestratorConfig (BaseOrchestratorConfig, LightningOrchestratorSettings)
Lightning orchestrator base config.
Source code in zenml/integrations/lightning/flavors/lightning_orchestrator_flavor.py
class LightningOrchestratorConfig(
BaseOrchestratorConfig, LightningOrchestratorSettings
):
"""Lightning orchestrator base config."""
@property
def is_local(self) -> bool:
"""Checks if this stack component is running locally.
Returns:
True if this config is for a local component, False otherwise.
"""
return False
@property
def is_synchronous(self) -> bool:
"""Whether the orchestrator runs synchronous or not.
Returns:
Whether the orchestrator runs synchronous or not.
"""
return self.synchronous
@property
def is_schedulable(self) -> bool:
"""Whether the orchestrator is schedulable or not.
Returns:
Whether the orchestrator is schedulable or not.
"""
return False
is_local: bool
property
readonly
Checks if this stack component is running locally.
Returns:
Type | Description |
---|---|
bool |
True if this config is for a local component, False otherwise. |
is_schedulable: bool
property
readonly
Whether the orchestrator is schedulable or not.
Returns:
Type | Description |
---|---|
bool |
Whether the orchestrator is schedulable or not. |
is_synchronous: bool
property
readonly
Whether the orchestrator runs synchronous or not.
Returns:
Type | Description |
---|---|
bool |
Whether the orchestrator runs synchronous or not. |
LightningOrchestratorFlavor (BaseOrchestratorFlavor)
Lightning orchestrator flavor.
Source code in zenml/integrations/lightning/flavors/lightning_orchestrator_flavor.py
class LightningOrchestratorFlavor(BaseOrchestratorFlavor):
"""Lightning orchestrator flavor."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return LIGHTNING_ORCHESTRATOR_FLAVOR
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/lightning.png"
@property
def config_class(self) -> Type[LightningOrchestratorConfig]:
"""Returns `KubeflowOrchestratorConfig` config class.
Returns:
The config class.
"""
return LightningOrchestratorConfig
@property
def implementation_class(self) -> Type["LightningOrchestrator"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.lightning.orchestrators import (
LightningOrchestrator,
)
return LightningOrchestrator
config_class: Type[zenml.integrations.lightning.flavors.lightning_orchestrator_flavor.LightningOrchestratorConfig]
property
readonly
Returns KubeflowOrchestratorConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.lightning.flavors.lightning_orchestrator_flavor.LightningOrchestratorConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[LightningOrchestrator]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[LightningOrchestrator] |
The implementation class. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
LightningOrchestratorSettings (BaseSettings)
Lightning orchestrator base settings.
Attributes:
Name | Type | Description |
---|---|---|
main_studio_name |
Optional[str] |
Main studio name. |
machine_type |
Optional[str] |
Machine type. |
user_id |
Optional[str] |
User id. |
api_key |
Optional[str] |
api_key. |
username |
Optional[str] |
Username. |
teamspace |
Optional[str] |
Teamspace. |
organization |
Optional[str] |
Organization. |
custom_commands |
Optional[List[str]] |
Custom commands to run. |
synchronous |
bool |
If |
Source code in zenml/integrations/lightning/flavors/lightning_orchestrator_flavor.py
class LightningOrchestratorSettings(BaseSettings):
"""Lightning orchestrator base settings.
Attributes:
main_studio_name: Main studio name.
machine_type: Machine type.
user_id: User id.
api_key: api_key.
username: Username.
teamspace: Teamspace.
organization: Organization.
custom_commands: Custom commands to run.
synchronous: If `True`, the client running a pipeline using this
orchestrator waits until all steps finish running. If `False`,
the client returns immediately and the pipeline is executed
asynchronously. Defaults to `True`. This setting only
has an effect when specified on the pipeline and will be ignored if
specified on steps.
"""
# Resources
main_studio_name: Optional[str] = None
machine_type: Optional[str] = None
user_id: Optional[str] = SecretField(default=None)
api_key: Optional[str] = SecretField(default=None)
username: Optional[str] = None
teamspace: Optional[str] = None
organization: Optional[str] = None
custom_commands: Optional[List[str]] = None
synchronous: bool = True
orchestrators
special
Initialization of the Lightning ZenML orchestrator.
lightning_orchestrator
Implementation of the Lightning orchestrator.
LightningOrchestrator (WheeledOrchestrator)
Base class for Orchestrator responsible for running pipelines remotely in a VM.
This orchestrator does not support running on a schedule.
Source code in zenml/integrations/lightning/orchestrators/lightning_orchestrator.py
class LightningOrchestrator(WheeledOrchestrator):
"""Base class for Orchestrator responsible for running pipelines remotely in a VM.
This orchestrator does not support running on a schedule.
"""
@property
def validator(self) -> Optional[StackValidator]:
"""Validates the stack.
In the remote case, checks that the stack contains a container registry,
image builder and only remote components.
Returns:
A `StackValidator` instance.
"""
def _validate_remote_components(
stack: "Stack",
) -> Tuple[bool, str]:
for component in stack.components.values():
if not component.config.is_local:
continue
# return False, (
# f"The Lightning orchestrator runs pipelines remotely, "
# f"but the '{component.name}' {component.type.value} is "
# "a local stack component and will not be available in "
# "the Lightning step.\nPlease ensure that you always "
# "use non-local stack components with the Lightning "
# "orchestrator."
# )
return True, ""
return StackValidator(
custom_validation_function=_validate_remote_components,
)
def _set_lightning_env_vars(
self,
deployment: "PipelineDeploymentResponse",
) -> None:
"""Set up the Lightning client using environment variables.
Args:
deployment: The pipeline deployment to prepare or run.
Raises:
ValueError: If the user id and api key or username and organization
"""
settings = cast(
LightningOrchestratorSettings, self.get_settings(deployment)
)
if not settings.user_id or not settings.api_key:
raise ValueError(
"Lightning orchestrator requires `user_id` and `api_key` both to be set in the settings."
)
os.environ["LIGHTNING_USER_ID"] = settings.user_id
os.environ["LIGHTNING_API_KEY"] = settings.api_key
if settings.username:
os.environ["LIGHTNING_USERNAME"] = settings.username
elif settings.organization:
os.environ["LIGHTNING_ORG"] = settings.organization
else:
raise ValueError(
"Lightning orchestrator requires either `username` or `organization` to be set in the settings."
)
if settings.teamspace:
os.environ["LIGHTNING_TEAMSPACE"] = settings.teamspace
@property
def config(self) -> LightningOrchestratorConfig:
"""Returns the `LightningOrchestratorConfig` config.
Returns:
The configuration.
"""
return cast(LightningOrchestratorConfig, self._config)
@property
def settings_class(self) -> Type[LightningOrchestratorSettings]:
"""Settings class for the Lightning orchestrator.
Returns:
The settings class.
"""
return LightningOrchestratorSettings
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If no run id exists. This happens when this method
gets called while the orchestrator is not running a pipeline.
Returns:
The orchestrator run id.
Raises:
RuntimeError: If the run id cannot be read from the environment.
"""
try:
return os.environ[ENV_ZENML_LIGHTNING_ORCHESTRATOR_RUN_ID]
except KeyError:
raise RuntimeError(
"Unable to read run id from environment variable "
f"{ENV_ZENML_LIGHTNING_ORCHESTRATOR_RUN_ID}."
)
@property
def root_directory(self) -> str:
"""Path to the root directory for all files concerning this orchestrator.
Returns:
Path to the root directory.
"""
return os.path.join(
io_utils.get_global_config_directory(),
"lightning",
str(self.id),
)
@property
def pipeline_directory(self) -> str:
"""Returns path to a directory in which the kubeflow pipeline files are stored.
Returns:
Path to the pipeline directory.
"""
return os.path.join(self.root_directory, "pipelines")
def setup_credentials(self) -> None:
"""Set up credentials for the orchestrator."""
connector = self.get_connector()
assert connector is not None
connector.configure_local_client()
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeploymentResponse",
stack: "Stack",
environment: Dict[str, str],
) -> Any:
"""Creates a wheel and uploads the pipeline to Lightning.
This functions as an intermediary representation of the pipeline which
is then deployed to the kubeflow pipelines instance.
How it works:
-------------
Before this method is called the `prepare_pipeline_deployment()`
method builds a docker image that contains the code for the
pipeline, all steps the context around these files.
Based on this docker image a callable is created which builds
task for each step (`_construct_lightning_pipeline`).
To do this the entrypoint of the docker image is configured to
run the correct step within the docker image. The dependencies
between these task are then also configured onto each
task by pointing at the downstream steps.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
environment: Environment variables to set in the orchestration
environment.
Raises:
ValueError: If the schedule is not set or if the cron expression
is not set.
"""
settings = cast(
LightningOrchestratorSettings, self.get_settings(deployment)
)
if deployment.schedule:
if (
deployment.schedule.catchup
or deployment.schedule.interval_second
):
logger.warning(
"Lightning orchestrator only uses schedules with the "
"`cron_expression` property, with optional `start_time` and/or `end_time`. "
"All other properties are ignored."
)
if deployment.schedule.cron_expression is None:
raise ValueError(
"Property `cron_expression` must be set when passing "
"schedule to a Lightning orchestrator."
)
if deployment.schedule.cron_expression:
raise ValueError(
"Property `schedule_timezone` must be set when passing "
"`cron_expression` to a Lightning orchestrator."
"Lightning orchestrator requires a Java Timezone ID to run the pipeline on schedule."
"Please refer to https://docs.oracle.com/middleware/1221/wcs/tag-ref/MISC/TimeZones.html for more information."
)
# Get deployment id
deployment_id = deployment.id
pipeline_name = deployment.pipeline_configuration.name
orchestrator_run_name = get_orchestrator_run_name(pipeline_name)
# Copy the repository to a temporary directory and add a setup.py file
# repository_temp_dir = (
# self.copy_repository_to_temp_dir_and_add_setup_py()
# )
# Create a wheel for the package in the temporary directory
# wheel_path = self.create_wheel(temp_dir=repository_temp_dir)
code_archive = code_utils.CodeArchive(
root=source_utils.get_source_root()
)
logger.info("Archiving pipeline code...")
with tempfile.NamedTemporaryFile(
mode="w+b", delete=False, suffix=".tar.gz"
) as code_file:
code_archive.write_archive(code_file)
code_path = code_file.name
filename = f"{orchestrator_run_name}.tar.gz"
# Construct the env variables for the pipeline
env_vars = environment.copy()
orchestrator_run_id = str(uuid4())
env_vars[ENV_ZENML_LIGHTNING_ORCHESTRATOR_RUN_ID] = orchestrator_run_id
# Set up some variables for configuration
env_vars[ENV_ZENML_CUSTOM_SOURCE_ROOT] = (
LIGHTNING_ZENML_DEFAULT_CUSTOM_REPOSITORY_PATH
)
env_vars[ENV_ZENML_WHEEL_PACKAGE_NAME] = self.package_name
# Create a line-by-line export of environment variables
env_exports = "\n".join(
[f"export {key}='{value}'" for key, value in env_vars.items()]
)
# Write the environment variables to a temporary file
with tempfile.NamedTemporaryFile(
mode="w", delete=False, suffix=".studiorc"
) as temp_file:
temp_file.write(env_exports)
env_file_path = temp_file.name
# Gather the requirements
pipeline_docker_settings = (
deployment.pipeline_configuration.docker_settings
)
pipeline_requirements = gather_requirements(pipeline_docker_settings)
pipeline_requirements_to_string = " ".join(
f'"{req}"' for req in pipeline_requirements
)
def _construct_lightning_steps(
deployment: "PipelineDeploymentResponse",
) -> Dict[str, Dict[str, Any]]:
"""Construct the steps for the pipeline.
Args:
deployment: The pipeline deployment to prepare or run.
Returns:
The steps for the pipeline.
"""
steps = {}
for step_name, step in deployment.step_configurations.items():
# The arguments are passed to configure the entrypoint of the
# docker container when the step is called.
entrypoint_command = (
StepEntrypointConfiguration.get_entrypoint_command()
)
entrypoint_arguments = (
StepEntrypointConfiguration.get_entrypoint_arguments(
step_name=step_name,
deployment_id=deployment_id,
)
)
entrypoint = entrypoint_command + entrypoint_arguments
entrypoint_string = " ".join(entrypoint)
step_settings = cast(
LightningOrchestratorSettings, self.get_settings(step)
)
# Gather the requirements
step_docker_settings = step.config.docker_settings
step_requirements = gather_requirements(step_docker_settings)
step_requirements_to_string = " ".join(
f'"{req}"' for req in step_requirements
)
# Construct the command to run the step
run_command = f"{entrypoint_string}"
commands = [run_command]
steps[step_name] = {
"commands": commands,
"requirements": step_requirements_to_string,
"machine": step_settings.machine_type
if step_settings != settings
else None,
}
return steps
if not settings.synchronous:
entrypoint_command = LightningOrchestratorEntrypointConfiguration.get_entrypoint_command()
entrypoint_arguments = LightningOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
run_name=orchestrator_run_name,
deployment_id=deployment.id,
)
entrypoint = entrypoint_command + entrypoint_arguments
entrypoint_string = " ".join(entrypoint)
logger.info("Setting up Lightning AI client")
self._set_lightning_env_vars(deployment)
studio_name = sanitize_studio_name(
"zenml_async_orchestrator_studio"
)
logger.info(f"Creating main studio: {studio_name}")
studio = Studio(name=studio_name)
studio.start()
logger.info(
"Uploading wheel package and installing dependencies on main studio"
)
studio.run(
f"mkdir -p /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]}"
)
studio.upload_file(
code_path,
remote_path=f"/teamspace/studios/this_studio/zenml_codes/{filename}",
)
time.sleep(10)
studio.run(
f"tar -xvzf /teamspace/studios/this_studio/zenml_codes/{filename} -C /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]}"
)
studio.upload_file(
env_file_path,
remote_path="/teamspace/studios/this_studio/.lightning_studio/.studiorc",
)
studio.run("pip install uv")
logger.info(
f"Installing requirements: {pipeline_requirements_to_string}"
)
studio.run(f"uv pip install {pipeline_requirements_to_string}")
studio.run("pip install zenml")
for custom_command in settings.custom_commands or []:
studio.run(
f"cd /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]} && {custom_command}"
)
# studio.run(f"pip install {wheel_path.rsplit('/', 1)[-1]}")
logger.info("Running pipeline in async mode")
studio.run(
f"nohup bash -c 'cd /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]} && {entrypoint_string}' > log_{filename.rsplit('.', 2)[0]}.txt 2>&1 &"
)
logger.info(
f"The pipeline is running in async mode, you can keep checking the logs by running the following command: `lightning download -s vision-model/zenml-async-orchestrator-studio -p /teamspace/studios/this_studio/log_{filename.rsplit('.', 2)[0]}.txt && cat log_{filename.rsplit('.', 2)[0]}.txt`"
)
else:
self._upload_and_run_pipeline(
deployment,
orchestrator_run_id,
pipeline_requirements_to_string,
settings,
_construct_lightning_steps(deployment),
code_path,
filename,
env_file_path,
)
os.unlink(env_file_path)
def _upload_and_run_pipeline(
self,
deployment: "PipelineDeploymentResponse",
orchestrator_run_id: str,
requirements: str,
settings: LightningOrchestratorSettings,
steps_commands: Dict[str, Dict[str, Any]],
code_path: str,
filename: str,
env_file_path: str,
) -> None:
"""Upload and run the pipeline on Lightning Studio.
Args:
deployment: The pipeline deployment to prepare or run.
orchestrator_run_id: The orchestrator run id.
requirements: The requirements for the pipeline.
settings: The orchestrator settings.
steps_commands: The commands for the steps.
code_path: The path to the wheel package.
filename: The name of the code archive.
env_file_path: The path to the environment file.
Raises:
Exception: If an error occurs while running the pipeline.
"""
logger.info("Setting up Lightning AI client")
self._set_lightning_env_vars(deployment)
if settings.main_studio_name:
studio_name = settings.main_studio_name
studio = Studio(name=studio_name)
if (
studio.machine != settings.machine_type
and settings.machine_type
):
studio.switch_machine(Machine(settings.machine_type))
else:
studio_name = sanitize_studio_name(
f"zenml_{orchestrator_run_id}_pipeline"
)
logger.info(f"Creating main studio: {studio_name}")
studio = Studio(name=studio_name)
if settings.machine_type:
studio.start(Machine(settings.machine_type))
else:
studio.start()
try:
studio.run(
f"mkdir -p /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]}"
)
studio.upload_file(
code_path,
remote_path=f"/teamspace/studios/this_studio/zenml_codes/{filename}",
)
studio.run(
f"tar -xvzf /teamspace/studios/this_studio/zenml_codes/{filename} -C /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]}"
)
logger.info(
"Uploading wheel package and installing dependencies on main studio"
)
studio.upload_file(
env_file_path,
remote_path="/teamspace/studios/this_studio/.lightning_studio/.studiorc",
)
studio.run("pip install uv")
studio.run(f"uv pip install {requirements}")
studio.run("pip install zenml")
# studio.run(f"pip install {wheel_path.rsplit('/', 1)[-1]}")
for command in settings.custom_commands or []:
output = studio.run(
f"cd /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]} && {command}"
)
logger.info(f"Custom command output: {output}")
for step_name, details in steps_commands.items():
if details["machine"]:
logger.info(f"Executing step: {step_name} in new studio")
self._run_step_in_new_studio(
orchestrator_run_id,
step_name,
details,
code_path,
filename,
env_file_path,
settings.custom_commands,
)
else:
logger.info(f"Executing step: {step_name} in main studio")
self._run_step_in_main_studio(studio, details, filename)
except Exception as e:
logger.error(f"Error running pipeline: {e}")
raise e
finally:
if (
studio.status != studio.status.NotCreated
and settings.main_studio_name is None
):
logger.info("Deleting main studio")
studio.delete()
def _run_step_in_new_studio(
self,
orchestrator_run_id: str,
step_name: str,
details: Dict[str, Any],
code_path: str,
filename: str,
env_file_path: str,
custom_commands: Optional[List[str]] = None,
) -> None:
"""Run a step in a new studio.
Args:
orchestrator_run_id: The orchestrator run id.
step_name: The name of the step.
details: The details of the step.
code_path: The path to the wheel package.
filename: The name of the code archive.
env_file_path: The path to the environment file.
custom_commands: Custom commands to run.
"""
studio_name = sanitize_studio_name(
f"zenml_{orchestrator_run_id}_{step_name}"
)
logger.info(f"Creating new studio for step {step_name}: {studio_name}")
studio = Studio(name=studio_name)
studio.start(Machine(details["machine"]))
studio.run(
f"mkdir -p /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]}"
)
studio.upload_file(code_path, remote_path=f"/zenml_codes/{filename}")
studio.run(
f"tar -xvzf /teamspace/studios/this_studio/zenml_codes/{filename} -C /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]}"
)
studio.upload_file(
env_file_path, remote_path=".lightning_studio/.studiorc"
)
studio.run("pip install uv")
studio.run(f"uv pip install {details['requirements']}")
studio.run("pip install zenml")
# studio.run(f"pip install {wheel_path.rsplit('/', 1)[-1]}")
for command in custom_commands or []:
output = studio.run(
f"cd /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]} && {command}"
)
logger.info(f"Custom command output: {output}")
for command in details["commands"]:
output = studio.run(
f"cd /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]} && {command}"
)
logger.info(f"Step {step_name} output: {output}")
studio.delete()
def _run_step_in_main_studio(
self, studio: Studio, details: Dict[str, Any], filename: str
) -> None:
"""Run a step in the main studio.
Args:
studio: The studio to run the step in.
details: The details of the step.
filename: The name of the code archive.
"""
for command in details["commands"]:
output = studio.run(
f"cd /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]} && {command}"
)
logger.info(f"Step output: {output}")
config: LightningOrchestratorConfig
property
readonly
Returns the LightningOrchestratorConfig
config.
Returns:
Type | Description |
---|---|
LightningOrchestratorConfig |
The configuration. |
pipeline_directory: str
property
readonly
Returns path to a directory in which the kubeflow pipeline files are stored.
Returns:
Type | Description |
---|---|
str |
Path to the pipeline directory. |
root_directory: str
property
readonly
Path to the root directory for all files concerning this orchestrator.
Returns:
Type | Description |
---|---|
str |
Path to the root directory. |
settings_class: Type[zenml.integrations.lightning.flavors.lightning_orchestrator_flavor.LightningOrchestratorSettings]
property
readonly
Settings class for the Lightning orchestrator.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.lightning.flavors.lightning_orchestrator_flavor.LightningOrchestratorSettings] |
The settings class. |
validator: Optional[zenml.stack.stack_validator.StackValidator]
property
readonly
Validates the stack.
In the remote case, checks that the stack contains a container registry, image builder and only remote components.
Returns:
Type | Description |
---|---|
Optional[zenml.stack.stack_validator.StackValidator] |
A |
get_orchestrator_run_id(self)
Returns the active orchestrator run id.
Exceptions:
Type | Description |
---|---|
RuntimeError |
If no run id exists. This happens when this method gets called while the orchestrator is not running a pipeline. |
Returns:
Type | Description |
---|---|
str |
The orchestrator run id. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the run id cannot be read from the environment. |
Source code in zenml/integrations/lightning/orchestrators/lightning_orchestrator.py
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If no run id exists. This happens when this method
gets called while the orchestrator is not running a pipeline.
Returns:
The orchestrator run id.
Raises:
RuntimeError: If the run id cannot be read from the environment.
"""
try:
return os.environ[ENV_ZENML_LIGHTNING_ORCHESTRATOR_RUN_ID]
except KeyError:
raise RuntimeError(
"Unable to read run id from environment variable "
f"{ENV_ZENML_LIGHTNING_ORCHESTRATOR_RUN_ID}."
)
prepare_or_run_pipeline(self, deployment, stack, environment)
Creates a wheel and uploads the pipeline to Lightning.
This functions as an intermediary representation of the pipeline which is then deployed to the kubeflow pipelines instance.
How it works:
Before this method is called the prepare_pipeline_deployment()
method builds a docker image that contains the code for the
pipeline, all steps the context around these files.
Based on this docker image a callable is created which builds
task for each step (_construct_lightning_pipeline
).
To do this the entrypoint of the docker image is configured to
run the correct step within the docker image. The dependencies
between these task are then also configured onto each
task by pointing at the downstream steps.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentResponse |
The pipeline deployment to prepare or run. |
required |
stack |
Stack |
The stack the pipeline will run on. |
required |
environment |
Dict[str, str] |
Environment variables to set in the orchestration environment. |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
If the schedule is not set or if the cron expression is not set. |
Source code in zenml/integrations/lightning/orchestrators/lightning_orchestrator.py
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeploymentResponse",
stack: "Stack",
environment: Dict[str, str],
) -> Any:
"""Creates a wheel and uploads the pipeline to Lightning.
This functions as an intermediary representation of the pipeline which
is then deployed to the kubeflow pipelines instance.
How it works:
-------------
Before this method is called the `prepare_pipeline_deployment()`
method builds a docker image that contains the code for the
pipeline, all steps the context around these files.
Based on this docker image a callable is created which builds
task for each step (`_construct_lightning_pipeline`).
To do this the entrypoint of the docker image is configured to
run the correct step within the docker image. The dependencies
between these task are then also configured onto each
task by pointing at the downstream steps.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
environment: Environment variables to set in the orchestration
environment.
Raises:
ValueError: If the schedule is not set or if the cron expression
is not set.
"""
settings = cast(
LightningOrchestratorSettings, self.get_settings(deployment)
)
if deployment.schedule:
if (
deployment.schedule.catchup
or deployment.schedule.interval_second
):
logger.warning(
"Lightning orchestrator only uses schedules with the "
"`cron_expression` property, with optional `start_time` and/or `end_time`. "
"All other properties are ignored."
)
if deployment.schedule.cron_expression is None:
raise ValueError(
"Property `cron_expression` must be set when passing "
"schedule to a Lightning orchestrator."
)
if deployment.schedule.cron_expression:
raise ValueError(
"Property `schedule_timezone` must be set when passing "
"`cron_expression` to a Lightning orchestrator."
"Lightning orchestrator requires a Java Timezone ID to run the pipeline on schedule."
"Please refer to https://docs.oracle.com/middleware/1221/wcs/tag-ref/MISC/TimeZones.html for more information."
)
# Get deployment id
deployment_id = deployment.id
pipeline_name = deployment.pipeline_configuration.name
orchestrator_run_name = get_orchestrator_run_name(pipeline_name)
# Copy the repository to a temporary directory and add a setup.py file
# repository_temp_dir = (
# self.copy_repository_to_temp_dir_and_add_setup_py()
# )
# Create a wheel for the package in the temporary directory
# wheel_path = self.create_wheel(temp_dir=repository_temp_dir)
code_archive = code_utils.CodeArchive(
root=source_utils.get_source_root()
)
logger.info("Archiving pipeline code...")
with tempfile.NamedTemporaryFile(
mode="w+b", delete=False, suffix=".tar.gz"
) as code_file:
code_archive.write_archive(code_file)
code_path = code_file.name
filename = f"{orchestrator_run_name}.tar.gz"
# Construct the env variables for the pipeline
env_vars = environment.copy()
orchestrator_run_id = str(uuid4())
env_vars[ENV_ZENML_LIGHTNING_ORCHESTRATOR_RUN_ID] = orchestrator_run_id
# Set up some variables for configuration
env_vars[ENV_ZENML_CUSTOM_SOURCE_ROOT] = (
LIGHTNING_ZENML_DEFAULT_CUSTOM_REPOSITORY_PATH
)
env_vars[ENV_ZENML_WHEEL_PACKAGE_NAME] = self.package_name
# Create a line-by-line export of environment variables
env_exports = "\n".join(
[f"export {key}='{value}'" for key, value in env_vars.items()]
)
# Write the environment variables to a temporary file
with tempfile.NamedTemporaryFile(
mode="w", delete=False, suffix=".studiorc"
) as temp_file:
temp_file.write(env_exports)
env_file_path = temp_file.name
# Gather the requirements
pipeline_docker_settings = (
deployment.pipeline_configuration.docker_settings
)
pipeline_requirements = gather_requirements(pipeline_docker_settings)
pipeline_requirements_to_string = " ".join(
f'"{req}"' for req in pipeline_requirements
)
def _construct_lightning_steps(
deployment: "PipelineDeploymentResponse",
) -> Dict[str, Dict[str, Any]]:
"""Construct the steps for the pipeline.
Args:
deployment: The pipeline deployment to prepare or run.
Returns:
The steps for the pipeline.
"""
steps = {}
for step_name, step in deployment.step_configurations.items():
# The arguments are passed to configure the entrypoint of the
# docker container when the step is called.
entrypoint_command = (
StepEntrypointConfiguration.get_entrypoint_command()
)
entrypoint_arguments = (
StepEntrypointConfiguration.get_entrypoint_arguments(
step_name=step_name,
deployment_id=deployment_id,
)
)
entrypoint = entrypoint_command + entrypoint_arguments
entrypoint_string = " ".join(entrypoint)
step_settings = cast(
LightningOrchestratorSettings, self.get_settings(step)
)
# Gather the requirements
step_docker_settings = step.config.docker_settings
step_requirements = gather_requirements(step_docker_settings)
step_requirements_to_string = " ".join(
f'"{req}"' for req in step_requirements
)
# Construct the command to run the step
run_command = f"{entrypoint_string}"
commands = [run_command]
steps[step_name] = {
"commands": commands,
"requirements": step_requirements_to_string,
"machine": step_settings.machine_type
if step_settings != settings
else None,
}
return steps
if not settings.synchronous:
entrypoint_command = LightningOrchestratorEntrypointConfiguration.get_entrypoint_command()
entrypoint_arguments = LightningOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
run_name=orchestrator_run_name,
deployment_id=deployment.id,
)
entrypoint = entrypoint_command + entrypoint_arguments
entrypoint_string = " ".join(entrypoint)
logger.info("Setting up Lightning AI client")
self._set_lightning_env_vars(deployment)
studio_name = sanitize_studio_name(
"zenml_async_orchestrator_studio"
)
logger.info(f"Creating main studio: {studio_name}")
studio = Studio(name=studio_name)
studio.start()
logger.info(
"Uploading wheel package and installing dependencies on main studio"
)
studio.run(
f"mkdir -p /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]}"
)
studio.upload_file(
code_path,
remote_path=f"/teamspace/studios/this_studio/zenml_codes/{filename}",
)
time.sleep(10)
studio.run(
f"tar -xvzf /teamspace/studios/this_studio/zenml_codes/{filename} -C /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]}"
)
studio.upload_file(
env_file_path,
remote_path="/teamspace/studios/this_studio/.lightning_studio/.studiorc",
)
studio.run("pip install uv")
logger.info(
f"Installing requirements: {pipeline_requirements_to_string}"
)
studio.run(f"uv pip install {pipeline_requirements_to_string}")
studio.run("pip install zenml")
for custom_command in settings.custom_commands or []:
studio.run(
f"cd /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]} && {custom_command}"
)
# studio.run(f"pip install {wheel_path.rsplit('/', 1)[-1]}")
logger.info("Running pipeline in async mode")
studio.run(
f"nohup bash -c 'cd /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]} && {entrypoint_string}' > log_{filename.rsplit('.', 2)[0]}.txt 2>&1 &"
)
logger.info(
f"The pipeline is running in async mode, you can keep checking the logs by running the following command: `lightning download -s vision-model/zenml-async-orchestrator-studio -p /teamspace/studios/this_studio/log_{filename.rsplit('.', 2)[0]}.txt && cat log_{filename.rsplit('.', 2)[0]}.txt`"
)
else:
self._upload_and_run_pipeline(
deployment,
orchestrator_run_id,
pipeline_requirements_to_string,
settings,
_construct_lightning_steps(deployment),
code_path,
filename,
env_file_path,
)
os.unlink(env_file_path)
setup_credentials(self)
Set up credentials for the orchestrator.
Source code in zenml/integrations/lightning/orchestrators/lightning_orchestrator.py
def setup_credentials(self) -> None:
"""Set up credentials for the orchestrator."""
connector = self.get_connector()
assert connector is not None
connector.configure_local_client()
lightning_orchestrator_entrypoint
Entrypoint of the Lightning master/orchestrator STUDIO.
main()
Entrypoint of the Lightning master/orchestrator STUDIO.
This is the entrypoint of the Lightning master/orchestrator STUDIO. It is responsible for provisioning the STUDIO and running the pipeline steps in separate STUDIO.
Exceptions:
Type | Description |
---|---|
TypeError |
If the active stack's orchestrator is not an instance of LightningOrchestrator. |
ValueError |
If the active stack's container registry is None. |
Source code in zenml/integrations/lightning/orchestrators/lightning_orchestrator_entrypoint.py
def main() -> None:
"""Entrypoint of the Lightning master/orchestrator STUDIO.
This is the entrypoint of the Lightning master/orchestrator STUDIO. It is
responsible for provisioning the STUDIO and running the pipeline steps in
separate STUDIO.
Raises:
TypeError: If the active stack's orchestrator is not an instance of
LightningOrchestrator.
ValueError: If the active stack's container registry is None.
"""
# Log to the container's stdout so it can be streamed by the client.
logger.info("Lightning orchestrator STUDIO started.")
# Parse / extract args.
args = parse_args()
orchestrator_run_id = os.environ.get(
ENV_ZENML_LIGHTNING_ORCHESTRATOR_RUN_ID
)
if not orchestrator_run_id:
raise ValueError(
f"Environment variable '{ENV_ZENML_LIGHTNING_ORCHESTRATOR_RUN_ID}' is not set."
)
logger.info(f"Orchestrator run id: {orchestrator_run_id}")
deployment = Client().get_deployment(args.deployment_id)
filename = f"{args.run_name}.tar.gz"
pipeline_dag = {
step_name: step.spec.upstream_steps
for step_name, step in deployment.step_configurations.items()
}
entrypoint_command = StepEntrypointConfiguration.get_entrypoint_command()
active_stack = Client().active_stack
orchestrator = active_stack.orchestrator
if not isinstance(orchestrator, LightningOrchestrator):
raise TypeError(
"The active stack's orchestrator is not an instance of LightningOrchestrator."
)
# Set up credentials
orchestrator._set_lightning_env_vars(deployment)
pipeline_settings = cast(
LightningOrchestratorSettings, orchestrator.get_settings(deployment)
)
# Gather the requirements
pipeline_docker_settings = (
deployment.pipeline_configuration.docker_settings
)
pipeline_requirements = gather_requirements(pipeline_docker_settings)
pipeline_requirements_to_string = " ".join(
f'"{req}"' for req in pipeline_requirements
)
unique_resource_configs: Dict[str, str] = {}
main_studio_name = sanitize_studio_name(
f"zenml_{orchestrator_run_id}_pipeline"
)
for step_name, step in deployment.step_configurations.items():
step_settings = cast(
LightningOrchestratorSettings,
orchestrator.get_settings(step),
)
unique_resource_configs[step_name] = main_studio_name
if pipeline_settings.machine_type != step_settings.machine_type:
unique_resource_configs[step_name] = (
f"zenml-{orchestrator_run_id}_{step_name}"
)
logger.info(f"Creating main studio: {main_studio_name}")
main_studio = Studio(name=main_studio_name)
if pipeline_settings.machine_type:
main_studio.start(Machine(pipeline_settings.machine_type))
else:
main_studio.start()
logger.info("Main studio started.")
logger.info("Uploading code to main studio the code path: %s", filename)
main_studio.upload_file(
"/teamspace/studios/this_studio/.lightning_studio/.studiorc",
remote_path="/teamspace/studios/this_studio/.lightning_studio/.studiorc",
)
output = main_studio.run(
f"mkdir -p /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]}"
)
logger.info(output)
main_studio.upload_file(
f"/teamspace/studios/this_studio/zenml_codes/{filename}",
remote_path=f"/teamspace/studios/this_studio/zenml_codes/{filename}",
)
logger.info("Extracting code... ")
output = main_studio.run(
f"tar -xvzf /teamspace/studios/this_studio/zenml_codes/{filename} -C /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]}"
)
logger.info(f"Code extraction output: {output}")
logger.info("Installing requirements... ")
output = main_studio.run("pip install uv")
logger.info(output)
output = main_studio.run(
f"uv pip install {pipeline_requirements_to_string}"
)
logger.info(output)
output = main_studio.run("pip install zenml")
logger.info(output)
for command in pipeline_settings.custom_commands or []:
output = main_studio.run(
f"cd /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]} && {command}"
)
logger.info(f"Custom command output: {output}")
run = Client().list_pipeline_runs(
sort_by="asc:created",
size=1,
deployment_id=args.deployment_id,
status=ExecutionStatus.INITIALIZING,
)[0]
logger.info("Fetching pipeline run: %s", run.id)
def run_step_on_lightning_studio(step_name: str) -> None:
"""Run a pipeline step in a separate Lightning STUDIO.
Args:
step_name: Name of the step.
Raises:
Exception: If an error occurs while running the step on the STUDIO.
"""
step_args = StepEntrypointConfiguration.get_entrypoint_arguments(
step_name=step_name,
deployment_id=args.deployment_id,
)
entrypoint = entrypoint_command + step_args
entrypoint_string = " ".join(entrypoint)
run_command = f"{entrypoint_string}"
step = deployment.step_configurations[step_name]
if unique_resource_configs[step_name] != main_studio_name:
logger.info(
f"Creating separate studio for step: {unique_resource_configs[step_name]}"
)
# Get step settings
step_settings = cast(
LightningOrchestratorSettings,
orchestrator.get_settings(step),
)
# Gather the requirements
step_docker_settings = step.config.docker_settings
step_requirements = gather_requirements(step_docker_settings)
step_requirements_to_string = " ".join(
f'"{req}"' for req in step_requirements
)
run_command = f"{entrypoint_string}"
logger.info(
f"Creating separate studio for step: {unique_resource_configs[step_name]}"
)
studio = Studio(name=unique_resource_configs[step_name])
try:
studio.start(Machine(step_settings.machine_type))
output = studio.run(
f"mkdir -p /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]}"
)
logger.info(output)
studio.upload_file(
f"/teamspace/studios/this_studio/zenml_codes/{filename}",
remote_path=f"/teamspace/studios/this_studio/zenml_codes/{filename}",
)
output = studio.run(
f"tar -xvzf /teamspace/studios/this_studio/zenml_codes/{filename} -C /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]}"
)
logger.info(output)
studio.upload_file(
"/teamspace/studios/this_studio/.lightning_studio/.studiorc",
remote_path="/teamspace/studios/this_studio/.lightning_studio/.studiorc",
)
output = studio.run("pip install uv")
logger.info(output)
output = studio.run(
f"uv pip install {step_requirements_to_string}"
)
logger.info(output)
output = studio.run("pip install zenml")
logger.info(output)
for command in step_settings.custom_commands or []:
output = studio.run(
f"cd /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]} && {command}"
)
logger.info(f"Custom command output: {output}")
output = studio.run(
f"cd /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]} && {run_command}"
)
logger.info(output)
except Exception as e:
logger.error(
f"Error running step {step_name} on studio {unique_resource_configs[step_name]}: {e}"
)
raise e
finally:
studio.delete()
studio.delete()
else:
output = main_studio.run(
f"cd /teamspace/studios/this_studio/zenml_codes/{filename.rsplit('.', 2)[0]} && {run_command}"
)
logger.info(output)
# Pop the resource configuration for this step
unique_resource_configs.pop(step_name)
if main_studio_name in unique_resource_configs.values():
# If there are more steps using this configuration, skip deprovisioning the cluster
logger.info(
f"Resource configuration for studio '{main_studio_name}' "
"is used by subsequent steps. Skipping the deprovisioning of "
"the studio."
)
else:
# If there are no more steps using this configuration, down the cluster
logger.info(
f"Resource configuration for cluster '{main_studio_name}' "
"is not used by subsequent steps. deprovisioning the cluster."
)
main_studio.delete()
logger.info(f"Running step `{step_name}` on a Studio is completed.")
ThreadedDagRunner(
dag=pipeline_dag, run_fn=run_step_on_lightning_studio
).run()
logger.info("Orchestration STUDIO provisioned.")
parse_args()
Parse entrypoint arguments.
Returns:
Type | Description |
---|---|
Namespace |
Parsed args. |
Source code in zenml/integrations/lightning/orchestrators/lightning_orchestrator_entrypoint.py
def parse_args() -> argparse.Namespace:
"""Parse entrypoint arguments.
Returns:
Parsed args.
"""
parser = argparse.ArgumentParser()
parser.add_argument("--run_name", type=str, required=True)
parser.add_argument("--deployment_id", type=str, required=True)
return parser.parse_args()
lightning_orchestrator_entrypoint_configuration
Entrypoint configuration for the Lightning master/orchestrator VM.
LightningOrchestratorEntrypointConfiguration
Entrypoint configuration for the Lightning master/orchestrator VM.
Source code in zenml/integrations/lightning/orchestrators/lightning_orchestrator_entrypoint_configuration.py
class LightningOrchestratorEntrypointConfiguration:
"""Entrypoint configuration for the Lightning master/orchestrator VM."""
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
"""Gets all the options required for running this entrypoint.
Returns:
Entrypoint options.
"""
options = {
RUN_NAME_OPTION,
DEPLOYMENT_ID_OPTION,
}
return options
@classmethod
def get_entrypoint_command(cls) -> List[str]:
"""Returns a command that runs the entrypoint module.
Returns:
Entrypoint command.
"""
command = [
"python",
"-m",
"zenml.integrations.lightning.orchestrators.lightning_orchestrator_entrypoint",
]
return command
@classmethod
def get_entrypoint_arguments(
cls,
run_name: str,
deployment_id: "UUID",
) -> List[str]:
"""Gets all arguments that the entrypoint command should be called with.
Args:
run_name: Name of the ZenML run.
deployment_id: ID of the deployment.
Returns:
List of entrypoint arguments.
"""
args = [
f"--{RUN_NAME_OPTION}",
run_name,
f"--{DEPLOYMENT_ID_OPTION}",
str(deployment_id),
]
return args
get_entrypoint_arguments(run_name, deployment_id)
classmethod
Gets all arguments that the entrypoint command should be called with.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_name |
str |
Name of the ZenML run. |
required |
deployment_id |
UUID |
ID of the deployment. |
required |
Returns:
Type | Description |
---|---|
List[str] |
List of entrypoint arguments. |
Source code in zenml/integrations/lightning/orchestrators/lightning_orchestrator_entrypoint_configuration.py
@classmethod
def get_entrypoint_arguments(
cls,
run_name: str,
deployment_id: "UUID",
) -> List[str]:
"""Gets all arguments that the entrypoint command should be called with.
Args:
run_name: Name of the ZenML run.
deployment_id: ID of the deployment.
Returns:
List of entrypoint arguments.
"""
args = [
f"--{RUN_NAME_OPTION}",
run_name,
f"--{DEPLOYMENT_ID_OPTION}",
str(deployment_id),
]
return args
get_entrypoint_command()
classmethod
Returns a command that runs the entrypoint module.
Returns:
Type | Description |
---|---|
List[str] |
Entrypoint command. |
Source code in zenml/integrations/lightning/orchestrators/lightning_orchestrator_entrypoint_configuration.py
@classmethod
def get_entrypoint_command(cls) -> List[str]:
"""Returns a command that runs the entrypoint module.
Returns:
Entrypoint command.
"""
command = [
"python",
"-m",
"zenml.integrations.lightning.orchestrators.lightning_orchestrator_entrypoint",
]
return command
get_entrypoint_options()
classmethod
Gets all the options required for running this entrypoint.
Returns:
Type | Description |
---|---|
Set[str] |
Entrypoint options. |
Source code in zenml/integrations/lightning/orchestrators/lightning_orchestrator_entrypoint_configuration.py
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
"""Gets all the options required for running this entrypoint.
Returns:
Entrypoint options.
"""
options = {
RUN_NAME_OPTION,
DEPLOYMENT_ID_OPTION,
}
return options
utils
Utility functions for the Lightning orchestrator.
gather_requirements(docker_settings)
Gather the requirements files.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
docker_settings |
DockerSettings |
Docker settings. |
required |
Returns:
Type | Description |
---|---|
List[str] |
List of requirements. |
Source code in zenml/integrations/lightning/orchestrators/utils.py
def gather_requirements(docker_settings: "DockerSettings") -> List[str]:
"""Gather the requirements files.
Args:
docker_settings: Docker settings.
Returns:
List of requirements.
"""
docker_image_builder = PipelineDockerImageBuilder()
requirements_files = docker_image_builder.gather_requirements_files(
docker_settings=docker_settings,
stack=Client().active_stack,
log=False,
)
# Extract and clean the requirements
requirements = list(
itertools.chain.from_iterable(
r[1].strip().split("\n") for r in requirements_files
)
)
# Remove empty items and duplicates
requirements = sorted(set(filter(None, requirements)))
return requirements
sanitize_studio_name(studio_name)
Sanitize studio_names so they conform to Kubernetes studio naming convention.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
studio_name |
str |
Arbitrary input studio_name. |
required |
Returns:
Type | Description |
---|---|
str |
Sanitized pod name. |
Source code in zenml/integrations/lightning/orchestrators/utils.py
def sanitize_studio_name(studio_name: str) -> str:
"""Sanitize studio_names so they conform to Kubernetes studio naming convention.
Args:
studio_name: Arbitrary input studio_name.
Returns:
Sanitized pod name.
"""
studio_name = re.sub(r"[^a-z0-9-]", "-", studio_name.lower())
studio_name = re.sub(r"^[-]+", "", studio_name)
return re.sub(r"[-]+", "-", studio_name)