Skip to content

Runai

zenml.integrations.runai

Run:AI integration for ZenML.

Attributes

RUNAI = 'runai' module-attribute

RUNAI_STEP_OPERATOR_FLAVOR = 'runai' module-attribute

Classes

Flavor

Class for ZenML Flavors.

Attributes
config_class: Type[StackComponentConfig] abstractmethod property

Returns StackComponentConfig config class.

Returns:

Type Description
Type[StackComponentConfig]

The config class.

config_schema: Dict[str, Any] property

The config schema for a flavor.

Returns:

Type Description
Dict[str, Any]

The config schema.

display_name: Optional[str] property

The display name of the flavor.

By default, converts the technical name to a human-readable format. For example, "vm_kubernetes" becomes "VM Kubernetes". Flavors can override this to provide custom display names.

Returns:

Type Description
Optional[str]

The display name of the flavor.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[StackComponent] abstractmethod property

Implementation class for this flavor.

Returns:

Type Description
Type[StackComponent]

The implementation class for this flavor.

logo_url: Optional[str] property

A url to represent the flavor in the dashboard.

Returns:

Type Description
Optional[str]

The flavor logo.

name: str abstractmethod property

The flavor name.

Returns:

Type Description
str

The flavor name.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

type: StackComponentType abstractmethod property

The stack component type.

Returns:

Type Description
StackComponentType

The stack component type.

Functions
from_model(flavor_model: FlavorResponse) -> Flavor classmethod

Loads a flavor from a model.

Parameters:

Name Type Description Default
flavor_model FlavorResponse

The model to load from.

required

Raises:

Type Description
CustomFlavorImportError

If the custom flavor can't be imported.

ImportError

If the flavor can't be imported.

Returns:

Type Description
Flavor

The loaded flavor.

Source code in src/zenml/stack/flavor.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
@classmethod
def from_model(cls, flavor_model: FlavorResponse) -> "Flavor":
    """Loads a flavor from a model.

    Args:
        flavor_model: The model to load from.

    Raises:
        CustomFlavorImportError: If the custom flavor can't be imported.
        ImportError: If the flavor can't be imported.

    Returns:
        The loaded flavor.
    """
    try:
        flavor = source_utils.load(flavor_model.source)()
    except (ModuleNotFoundError, ImportError, NotImplementedError) as err:
        if flavor_model.is_custom:
            flavor_module, _ = flavor_model.source.rsplit(".", maxsplit=1)
            expected_file_path = os.path.join(
                source_utils.get_source_root(),
                flavor_module.replace(".", os.path.sep),
            )
            raise CustomFlavorImportError(
                f"Couldn't import custom flavor {flavor_model.name}: "
                f"{err}. Make sure the custom flavor class "
                f"`{flavor_model.source}` is importable. If it is part of "
                "a library, make sure it is installed. If "
                "it is a local code file, make sure it exists at "
                f"`{expected_file_path}.py`."
            )
        else:
            raise ImportError(
                f"Couldn't import flavor {flavor_model.name}: {err}"
            )
    return cast(Flavor, flavor)
generate_default_docs_url() -> str

Generate the doc urls for all inbuilt and integration flavors.

Note that this method is not going to be useful for custom flavors, which do not have any docs in the main zenml docs.

Returns:

Type Description
str

The complete url to the zenml documentation

Source code in src/zenml/stack/flavor.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def generate_default_docs_url(self) -> str:
    """Generate the doc urls for all inbuilt and integration flavors.

    Note that this method is not going to be useful for custom flavors,
    which do not have any docs in the main zenml docs.

    Returns:
        The complete url to the zenml documentation
    """
    from zenml import __version__

    component_type = self.type.plural.replace("_", "-")
    name = self.name.replace("_", "-")

    try:
        is_latest = is_latest_zenml_version()
    except RuntimeError:
        # We assume in error cases that we are on the latest version
        is_latest = True

    if is_latest:
        base = "https://docs.zenml.io"
    else:
        base = f"https://zenml-io.gitbook.io/zenml-legacy-documentation/v/{__version__}"
    return f"{base}/stack-components/{component_type}/{name}"
generate_default_sdk_docs_url() -> str

Generate SDK docs url for a flavor.

Returns:

Type Description
str

The complete url to the zenml SDK docs

Source code in src/zenml/stack/flavor.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def generate_default_sdk_docs_url(self) -> str:
    """Generate SDK docs url for a flavor.

    Returns:
        The complete url to the zenml SDK docs
    """
    from zenml import __version__

    base = f"https://sdkdocs.zenml.io/{__version__}"

    component_type = self.type.plural

    if "zenml.integrations" in self.__module__:
        # Get integration name out of module path which will look something
        #  like this "zenml.integrations.<integration>....
        integration = self.__module__.split(
            "zenml.integrations.", maxsplit=1
        )[1].split(".")[0]

        # Get the config class name to point to the specific class
        config_class_name = self.config_class.__name__

        return (
            f"{base}/integration_code_docs"
            f"/integrations-{integration}"
            f"#zenml.integrations.{integration}.flavors.{config_class_name}"
        )

    else:
        return (
            f"{base}/core_code_docs/core-{component_type}/"
            f"#{self.__module__}"
        )
to_model(integration: Optional[str] = None, is_custom: bool = True) -> FlavorRequest

Converts a flavor to a model.

Parameters:

Name Type Description Default
integration Optional[str]

The integration to use for the model.

None
is_custom bool

Whether the flavor is a custom flavor.

True

Returns:

Type Description
FlavorRequest

The model.

Source code in src/zenml/stack/flavor.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def to_model(
    self,
    integration: Optional[str] = None,
    is_custom: bool = True,
) -> FlavorRequest:
    """Converts a flavor to a model.

    Args:
        integration: The integration to use for the model.
        is_custom: Whether the flavor is a custom flavor.

    Returns:
        The model.
    """
    connector_requirements = self.service_connector_requirements
    connector_type = (
        connector_requirements.connector_type
        if connector_requirements
        else None
    )
    resource_type = (
        connector_requirements.resource_type
        if connector_requirements
        else None
    )
    resource_id_attr = (
        connector_requirements.resource_id_attr
        if connector_requirements
        else None
    )

    model = FlavorRequest(
        name=self.name,
        display_name=self.display_name,
        type=self.type,
        source=source_utils.resolve(self.__class__).import_path,
        config_schema=self.config_schema,
        connector_type=connector_type,
        connector_resource_type=resource_type,
        connector_resource_id_attr=resource_id_attr,
        integration=integration,
        logo_url=self.logo_url,
        docs_url=self.docs_url,
        sdk_docs_url=self.sdk_docs_url,
        is_custom=is_custom,
    )
    return model

Integration

Base class for integration in ZenML.

Functions
activate() -> None classmethod

Abstract method to activate the integration.

Source code in src/zenml/integrations/integration.py
136
137
138
@classmethod
def activate(cls) -> None:
    """Abstract method to activate the integration."""
check_installation() -> bool classmethod

Method to check whether the required packages are installed.

Returns:

Type Description
bool

True if all required packages are installed, False otherwise.

Source code in src/zenml/integrations/integration.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@classmethod
def check_installation(cls) -> bool:
    """Method to check whether the required packages are installed.

    Returns:
        True if all required packages are installed, False otherwise.
    """
    for requirement in cls.get_requirements():
        parsed_requirement = Requirement(requirement)

        if not requirement_installed(parsed_requirement):
            logger.debug(
                "Requirement '%s' for integration '%s' is not installed "
                "or installed with the wrong version.",
                requirement,
                cls.NAME,
            )
            return False

        dependencies = get_dependencies(parsed_requirement)

        for dependency in dependencies:
            if not requirement_installed(dependency):
                logger.debug(
                    "Requirement '%s' for integration '%s' is not "
                    "installed or installed with the wrong version.",
                    dependency,
                    cls.NAME,
                )
                return False

    logger.debug(
        f"Integration '{cls.NAME}' is installed correctly with "
        f"requirements {cls.get_requirements()}."
    )
    return True
flavors() -> List[Type[Flavor]] classmethod

Abstract method to declare new stack component flavors.

Returns:

Type Description
List[Type[Flavor]]

A list of new stack component flavors.

Source code in src/zenml/integrations/integration.py
140
141
142
143
144
145
146
147
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Abstract method to declare new stack component flavors.

    Returns:
        A list of new stack component flavors.
    """
    return []
get_requirements(target_os: Optional[str] = None, python_version: Optional[str] = None) -> List[str] classmethod

Method to get the requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None
python_version Optional[str]

The Python version to use for the requirements.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@classmethod
def get_requirements(
    cls,
    target_os: Optional[str] = None,
    python_version: Optional[str] = None,
) -> List[str]:
    """Method to get the requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.
        python_version: The Python version to use for the requirements.

    Returns:
        A list of requirements.
    """
    return cls.REQUIREMENTS
get_uninstall_requirements(target_os: Optional[str] = None) -> List[str] classmethod

Method to get the uninstall requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
@classmethod
def get_uninstall_requirements(
    cls, target_os: Optional[str] = None
) -> List[str]:
    """Method to get the uninstall requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.

    Returns:
        A list of requirements.
    """
    ret = []
    for each in cls.get_requirements(target_os=target_os):
        is_ignored = False
        for ignored in cls.REQUIREMENTS_IGNORED_ON_UNINSTALL:
            if each.startswith(ignored):
                is_ignored = True
                break
        if not is_ignored:
            ret.append(each)
    return ret

RunAIIntegration

Bases: Integration

Definition of Run:AI integration for ZenML.

Functions
flavors() -> List[Type[Flavor]] classmethod

Declare the stack component flavors for the Run:AI integration.

Returns:

Type Description
List[Type[Flavor]]

List of stack component flavors for this integration.

Source code in src/zenml/integrations/runai/__init__.py
33
34
35
36
37
38
39
40
41
42
43
44
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Run:AI integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.runai.flavors import (
        RunAIStepOperatorFlavor,
    )

    return [RunAIStepOperatorFlavor]

Modules

client

Run:AI client utilities.

Classes
RunAIAuthenticationError

Bases: RunAIClientError

Raised when authentication with Run:AI fails.

RunAIClient(client_id: str, client_secret: str, runai_base_url: str)

Wrapper around the runapy SDK providing typed responses.

This client encapsulates all Run:AI API interactions and provides typed dataclasses instead of raw dictionaries.

Initialize the Run:AI client.

Parameters:

Name Type Description Default
client_id str

Run:AI client ID for authentication.

required
client_secret str

Run:AI client secret for authentication.

required
runai_base_url str

Run:AI control plane base URL.

required

Raises:

Type Description
RunAIConnectionError

If connecting to Run:AI fails.

RunAIAuthenticationError

If client configuration fails.

Source code in src/zenml/integrations/runai/client/runai_client.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def __init__(
    self, client_id: str, client_secret: str, runai_base_url: str
) -> None:
    """Initialize the Run:AI client.

    Args:
        client_id: Run:AI client ID for authentication.
        client_secret: Run:AI client secret for authentication.
        runai_base_url: Run:AI control plane base URL.

    Raises:
        RunAIConnectionError: If connecting to Run:AI fails.
        RunAIAuthenticationError: If client configuration fails.
    """
    try:
        config = Configuration(
            client_id=client_id,
            client_secret=client_secret,
            runai_base_url=runai_base_url,
        )
        self._raw_client = self._create_raw_client(config)
    except Exception as exc:
        if self._is_connection_error(exc):
            raise RunAIConnectionError(
                f"Failed to connect to Run:AI API ({type(exc).__name__}): "
                f"{exc}. Verify your runai_base_url and network "
                "connectivity."
            ) from exc
        raise RunAIAuthenticationError(
            f"Failed to initialize Run:AI client ({type(exc).__name__}): {exc}. "
            "Verify your client_id, client_secret, and runai_base_url are correct."
        ) from exc
Attributes
raw_client: RunapyClient property

Access the underlying runapy client for advanced operations.

Returns:

Type Description
RunaiClient

The raw runapy client.

Functions
create_training_workload(request: TrainingCreationRequest) -> WorkloadSubmissionResult

Submit a training workload to Run:AI.

Parameters:

Name Type Description Default
request TrainingCreationRequest

TrainingCreationRequest from runai.models.

required

Returns:

Type Description
WorkloadSubmissionResult

WorkloadSubmissionResult with the workload ID.

Source code in src/zenml/integrations/runai/client/runai_client.py
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
def create_training_workload(
    self, request: TrainingCreationRequest
) -> WorkloadSubmissionResult:
    """Submit a training workload to Run:AI.

    Args:
        request: TrainingCreationRequest from runai.models.

    Returns:
        WorkloadSubmissionResult with the workload ID.
    """
    try:
        response = self._raw_client.workloads.trainings.create_training1(
            training_creation_request=request
        )
        workload_id = self._extract_workload_id(response)
        return WorkloadSubmissionResult(
            workload_id=workload_id or request.name,
            workload_name=request.name,
        )
    except Exception as exc:
        self._raise_api_error(
            exc,
            f"Failed to submit Run:AI workload "
            f"({type(exc).__name__}): {exc}",
        )
delete_training_workload(workload_id: str) -> None

Delete a training workload.

Parameters:

Name Type Description Default
workload_id str

The workload ID to delete.

required
Source code in src/zenml/integrations/runai/client/runai_client.py
547
548
549
550
551
552
553
554
555
556
557
558
559
560
def delete_training_workload(self, workload_id: str) -> None:
    """Delete a training workload.

    Args:
        workload_id: The workload ID to delete.
    """
    try:
        self._raw_client.workloads.trainings.delete_training(workload_id)
    except Exception as exc:
        self._raise_api_error(
            exc,
            "Failed to delete Run:AI workload "
            f"{workload_id} ({type(exc).__name__}): {exc}",
        )
get_cluster_by_id(cluster_id: str) -> RunAICluster

Get a Run:AI cluster by ID.

Parameters:

Name Type Description Default
cluster_id str

The cluster ID to find.

required

Returns:

Type Description
RunAICluster

The matching RunAICluster.

Raises:

Type Description
RunAIClusterNotFoundError

If no cluster matches the ID.

Source code in src/zenml/integrations/runai/client/runai_client.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
def get_cluster_by_id(self, cluster_id: str) -> RunAICluster:
    """Get a Run:AI cluster by ID.

    Args:
        cluster_id: The cluster ID to find.

    Returns:
        The matching RunAICluster.

    Raises:
        RunAIClusterNotFoundError: If no cluster matches the ID.
    """
    clusters = self.get_clusters()
    for cluster in clusters:
        if cluster.id == cluster_id:
            return cluster
    available = [c.id for c in clusters]
    raise RunAIClusterNotFoundError(cluster_id, available)
get_cluster_by_name(name: str) -> RunAICluster

Get a Run:AI cluster by exact name match.

Parameters:

Name Type Description Default
name str

The cluster name to find.

required

Returns:

Type Description
RunAICluster

The matching RunAICluster.

Raises:

Type Description
RunAIClusterNotFoundError

If no cluster matches the name.

Source code in src/zenml/integrations/runai/client/runai_client.py
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
def get_cluster_by_name(self, name: str) -> RunAICluster:
    """Get a Run:AI cluster by exact name match.

    Args:
        name: The cluster name to find.

    Returns:
        The matching RunAICluster.

    Raises:
        RunAIClusterNotFoundError: If no cluster matches the name.
    """
    clusters = self.get_clusters()

    for cluster in clusters:
        if cluster.name == name:
            return cluster

    available = [c.name for c in clusters]
    raise RunAIClusterNotFoundError(name, available)
get_clusters() -> List[RunAICluster]

Get all Run:AI clusters.

Returns:

Type Description
List[RunAICluster]

List of RunAICluster objects.

Source code in src/zenml/integrations/runai/client/runai_client.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def get_clusters(self) -> List[RunAICluster]:
    """Get all Run:AI clusters.

    Returns:
        List of RunAICluster objects.
    """
    try:
        response = self._raw_client.organizations.clusters.get_clusters()
        clusters_data = response.data if response.data else []

        return [
            RunAICluster(
                id=c.get("uuid", c.get("id", "")),
                name=c.get("name", ""),
            )
            for c in clusters_data
        ]
    except Exception as exc:
        self._raise_api_error(
            exc,
            f"Failed to fetch Run:AI clusters "
            f"({type(exc).__name__}): {exc}",
        )
get_first_cluster() -> RunAICluster

Get the first available Run:AI cluster.

Returns:

Type Description
RunAICluster

The first RunAICluster.

Raises:

Type Description
RunAIClientError

If no clusters are available.

Source code in src/zenml/integrations/runai/client/runai_client.py
394
395
396
397
398
399
400
401
402
403
404
405
406
def get_first_cluster(self) -> RunAICluster:
    """Get the first available Run:AI cluster.

    Returns:
        The first RunAICluster.

    Raises:
        RunAIClientError: If no clusters are available.
    """
    clusters = self.get_clusters()
    if not clusters:
        raise RunAIClientError("No Run:AI clusters available")
    return clusters[0]
get_project_by_name(name: str) -> RunAIProject

Get a Run:AI project by exact name match.

Parameters:

Name Type Description Default
name str

The project name to find.

required

Returns:

Type Description
RunAIProject

The matching RunAIProject.

Raises:

Type Description
RunAIProjectNotFoundError

If no project matches the name.

Source code in src/zenml/integrations/runai/client/runai_client.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
def get_project_by_name(self, name: str) -> RunAIProject:
    """Get a Run:AI project by exact name match.

    Args:
        name: The project name to find.

    Returns:
        The matching RunAIProject.

    Raises:
        RunAIProjectNotFoundError: If no project matches the name.
    """
    projects = self.get_projects(search=name)

    for project in projects:
        if project.name == name:
            return project

    available = [p.name for p in projects]
    raise RunAIProjectNotFoundError(name, available)
get_projects(search: Optional[str] = None) -> List[RunAIProject]

Get Run:AI projects, optionally filtered by name.

Parameters:

Name Type Description Default
search Optional[str]

Optional search string to filter projects.

None

Returns:

Type Description
List[RunAIProject]

List of RunAIProject objects.

Source code in src/zenml/integrations/runai/client/runai_client.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
def get_projects(self, search: Optional[str] = None) -> List[RunAIProject]:
    """Get Run:AI projects, optionally filtered by name.

    Args:
        search: Optional search string to filter projects.

    Returns:
        List of RunAIProject objects.
    """
    try:
        response = self._raw_client.organizations.projects.get_projects(
            search=search
        )
        projects_data = (
            response.data.get("projects", []) if response.data else []
        )

        return [
            RunAIProject(
                id=str(p.get("id")),
                name=p.get("name", ""),
                cluster_id=p.get("clusterId"),
            )
            for p in projects_data
        ]
    except Exception as exc:
        self._raise_api_error(
            exc,
            f"Failed to fetch Run:AI projects "
            f"({type(exc).__name__}): {exc}",
        )
get_training_workload(workload_id: str) -> RunAITrainingWorkload

Get full training workload details.

Parameters:

Name Type Description Default
workload_id str

The workload ID to query.

required

Returns:

Type Description
RunAITrainingWorkload

The workload details as a typed model.

Raises:

Type Description
RunAIClientError

If the query fails or response is invalid.

Source code in src/zenml/integrations/runai/client/runai_client.py
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
def get_training_workload(self, workload_id: str) -> RunAITrainingWorkload:
    """Get full training workload details.

    Args:
        workload_id: The workload ID to query.

    Returns:
        The workload details as a typed model.

    Raises:
        RunAIClientError: If the query fails or response is invalid.
    """
    try:
        response = self._raw_client.workloads.trainings.get_training(
            workload_id
        )
        if not response.data:
            raise RunAIClientError(
                f"Empty response when querying workload {workload_id}. "
                "The workload may not exist or the API returned no data."
            )
        if not isinstance(response.data, dict):
            raise RunAIClientError(
                f"Unexpected response format for workload {workload_id}. "
                f"Expected dict, got {type(response.data).__name__}."
            )
        return RunAITrainingWorkload.model_validate(response.data)
    except RunAIClientError:
        raise
    except Exception as exc:
        self._raise_api_error(
            exc,
            "Failed to query Run:AI workload "
            f"{workload_id} ({type(exc).__name__}): {exc}",
        )
get_training_workload_status(workload_id: str) -> Optional[str]

Get the status of a training workload.

Parameters:

Name Type Description Default
workload_id str

The workload ID to query.

required

Returns:

Type Description
Optional[str]

The workload status string, or None if the response is missing a

Optional[str]

status field.

Raises:

Type Description
RunAIWorkloadNotFoundError

If the workload was not found (404).

RunAIClientError

If the API call fails for other reasons or the response is malformed.

Source code in src/zenml/integrations/runai/client/runai_client.py
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
def get_training_workload_status(self, workload_id: str) -> Optional[str]:
    """Get the status of a training workload.

    Args:
        workload_id: The workload ID to query.

    Returns:
        The workload status string, or None if the response is missing a
        status field.

    Raises:
        RunAIWorkloadNotFoundError: If the workload was not found (404).
        RunAIClientError: If the API call fails for other reasons or the
            response is malformed.
    """
    try:
        response = self._raw_client.workloads.trainings.get_training(
            workload_id
        )
        if not response.data:
            raise RunAIClientError(
                f"Empty response when querying workload {workload_id}. "
                "The API returned no data."
            )
        if not isinstance(response.data, dict):
            raise RunAIClientError(
                f"Unexpected response format for workload {workload_id}. "
                f"Expected dict, got {type(response.data).__name__}."
            )
        status = response.data.get("actualPhase") or response.data.get(
            "status"
        )
        if status is None:
            logger.warning(
                f"Workload {workload_id} response has no status field. "
                "Available keys: %s",
                list(response.data.keys()),
            )
        return cast(Optional[str], status)
    except RunAIClientError:
        raise
    except Exception as exc:
        status_code = self._get_status_code(exc)
        if status_code == 404:
            raise RunAIWorkloadNotFoundError(workload_id) from exc
        error_msg = str(exc).lower()
        if "not found" in error_msg or "404" in error_msg:
            raise RunAIWorkloadNotFoundError(workload_id) from exc
        self._raise_api_error(
            exc,
            f"Failed to query workload status "
            f"({type(exc).__name__}): {exc}",
        )
suspend_training_workload(workload_id: str) -> None

Suspend a training workload.

Parameters:

Name Type Description Default
workload_id str

The workload ID to suspend.

required
Source code in src/zenml/integrations/runai/client/runai_client.py
562
563
564
565
566
567
568
569
570
571
572
573
574
575
def suspend_training_workload(self, workload_id: str) -> None:
    """Suspend a training workload.

    Args:
        workload_id: The workload ID to suspend.
    """
    try:
        self._raw_client.workloads.trainings.suspend_training(workload_id)
    except Exception as exc:
        self._raise_api_error(
            exc,
            "Failed to suspend Run:AI workload "
            f"{workload_id} ({type(exc).__name__}): {exc}",
        )
RunAIClientError

Bases: Exception

Base exception for Run:AI client errors.

RunAICluster

Bases: BaseModel

Typed representation of a Run:AI cluster.

RunAIClusterNotFoundError(cluster_name: str, available: List[str])

Bases: RunAIClientError

Raised when a Run:AI cluster cannot be found.

Initialize the exception.

Parameters:

Name Type Description Default
cluster_name str

The cluster name that was not found.

required
available List[str]

List of available cluster names.

required
Source code in src/zenml/integrations/runai/client/runai_client.py
62
63
64
65
66
67
68
69
70
71
72
73
74
def __init__(self, cluster_name: str, available: List[str]) -> None:
    """Initialize the exception.

    Args:
        cluster_name: The cluster name that was not found.
        available: List of available cluster names.
    """
    self.cluster_name = cluster_name
    self.available = available
    super().__init__(
        f"Cluster '{cluster_name}' not found in Run:AI. "
        f"Available clusters: {available}"
    )
Functions
RunAIConnectionError

Bases: RunAIClientError

Raised when connection to Run:AI API fails.

RunAIProject

Bases: BaseModel

Typed representation of a Run:AI project.

RunAIProjectNotFoundError(project_name: str, available: List[str])

Bases: RunAIClientError

Raised when a Run:AI project cannot be found.

Initialize the exception.

Parameters:

Name Type Description Default
project_name str

The project name that was not found.

required
available List[str]

List of available project names.

required
Source code in src/zenml/integrations/runai/client/runai_client.py
44
45
46
47
48
49
50
51
52
53
54
55
56
def __init__(self, project_name: str, available: List[str]) -> None:
    """Initialize the exception.

    Args:
        project_name: The project name that was not found.
        available: List of available project names.
    """
    self.project_name = project_name
    self.available = available
    super().__init__(
        f"Project '{project_name}' not found in Run:AI. "
        f"Available projects: {available}"
    )
Functions
RunAITrainingWorkload

Bases: BaseModel

Typed representation of a Run:AI training workload.

RunAIWorkloadNotFoundError(workload_id: str)

Bases: RunAIClientError

Raised when a Run:AI workload cannot be found.

Initialize the exception.

Parameters:

Name Type Description Default
workload_id str

The workload ID that was not found.

required
Source code in src/zenml/integrations/runai/client/runai_client.py
80
81
82
83
84
85
86
87
def __init__(self, workload_id: str) -> None:
    """Initialize the exception.

    Args:
        workload_id: The workload ID that was not found.
    """
    self.workload_id = workload_id
    super().__init__(f"Workload '{workload_id}' not found in Run:AI.")
Functions
WorkloadSubmissionResult

Bases: BaseModel

Result of submitting a workload to Run:AI.

Modules
runai_client

Run:AI API client wrapper with typed responses.

Classes
RunAIAuthenticationError

Bases: RunAIClientError

Raised when authentication with Run:AI fails.

RunAIClient(client_id: str, client_secret: str, runai_base_url: str)

Wrapper around the runapy SDK providing typed responses.

This client encapsulates all Run:AI API interactions and provides typed dataclasses instead of raw dictionaries.

Initialize the Run:AI client.

Parameters:

Name Type Description Default
client_id str

Run:AI client ID for authentication.

required
client_secret str

Run:AI client secret for authentication.

required
runai_base_url str

Run:AI control plane base URL.

required

Raises:

Type Description
RunAIConnectionError

If connecting to Run:AI fails.

RunAIAuthenticationError

If client configuration fails.

Source code in src/zenml/integrations/runai/client/runai_client.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def __init__(
    self, client_id: str, client_secret: str, runai_base_url: str
) -> None:
    """Initialize the Run:AI client.

    Args:
        client_id: Run:AI client ID for authentication.
        client_secret: Run:AI client secret for authentication.
        runai_base_url: Run:AI control plane base URL.

    Raises:
        RunAIConnectionError: If connecting to Run:AI fails.
        RunAIAuthenticationError: If client configuration fails.
    """
    try:
        config = Configuration(
            client_id=client_id,
            client_secret=client_secret,
            runai_base_url=runai_base_url,
        )
        self._raw_client = self._create_raw_client(config)
    except Exception as exc:
        if self._is_connection_error(exc):
            raise RunAIConnectionError(
                f"Failed to connect to Run:AI API ({type(exc).__name__}): "
                f"{exc}. Verify your runai_base_url and network "
                "connectivity."
            ) from exc
        raise RunAIAuthenticationError(
            f"Failed to initialize Run:AI client ({type(exc).__name__}): {exc}. "
            "Verify your client_id, client_secret, and runai_base_url are correct."
        ) from exc
Attributes
raw_client: RunapyClient property

Access the underlying runapy client for advanced operations.

Returns:

Type Description
RunaiClient

The raw runapy client.

Functions
create_training_workload(request: TrainingCreationRequest) -> WorkloadSubmissionResult

Submit a training workload to Run:AI.

Parameters:

Name Type Description Default
request TrainingCreationRequest

TrainingCreationRequest from runai.models.

required

Returns:

Type Description
WorkloadSubmissionResult

WorkloadSubmissionResult with the workload ID.

Source code in src/zenml/integrations/runai/client/runai_client.py
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
def create_training_workload(
    self, request: TrainingCreationRequest
) -> WorkloadSubmissionResult:
    """Submit a training workload to Run:AI.

    Args:
        request: TrainingCreationRequest from runai.models.

    Returns:
        WorkloadSubmissionResult with the workload ID.
    """
    try:
        response = self._raw_client.workloads.trainings.create_training1(
            training_creation_request=request
        )
        workload_id = self._extract_workload_id(response)
        return WorkloadSubmissionResult(
            workload_id=workload_id or request.name,
            workload_name=request.name,
        )
    except Exception as exc:
        self._raise_api_error(
            exc,
            f"Failed to submit Run:AI workload "
            f"({type(exc).__name__}): {exc}",
        )
delete_training_workload(workload_id: str) -> None

Delete a training workload.

Parameters:

Name Type Description Default
workload_id str

The workload ID to delete.

required
Source code in src/zenml/integrations/runai/client/runai_client.py
547
548
549
550
551
552
553
554
555
556
557
558
559
560
def delete_training_workload(self, workload_id: str) -> None:
    """Delete a training workload.

    Args:
        workload_id: The workload ID to delete.
    """
    try:
        self._raw_client.workloads.trainings.delete_training(workload_id)
    except Exception as exc:
        self._raise_api_error(
            exc,
            "Failed to delete Run:AI workload "
            f"{workload_id} ({type(exc).__name__}): {exc}",
        )
get_cluster_by_id(cluster_id: str) -> RunAICluster

Get a Run:AI cluster by ID.

Parameters:

Name Type Description Default
cluster_id str

The cluster ID to find.

required

Returns:

Type Description
RunAICluster

The matching RunAICluster.

Raises:

Type Description
RunAIClusterNotFoundError

If no cluster matches the ID.

Source code in src/zenml/integrations/runai/client/runai_client.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
def get_cluster_by_id(self, cluster_id: str) -> RunAICluster:
    """Get a Run:AI cluster by ID.

    Args:
        cluster_id: The cluster ID to find.

    Returns:
        The matching RunAICluster.

    Raises:
        RunAIClusterNotFoundError: If no cluster matches the ID.
    """
    clusters = self.get_clusters()
    for cluster in clusters:
        if cluster.id == cluster_id:
            return cluster
    available = [c.id for c in clusters]
    raise RunAIClusterNotFoundError(cluster_id, available)
get_cluster_by_name(name: str) -> RunAICluster

Get a Run:AI cluster by exact name match.

Parameters:

Name Type Description Default
name str

The cluster name to find.

required

Returns:

Type Description
RunAICluster

The matching RunAICluster.

Raises:

Type Description
RunAIClusterNotFoundError

If no cluster matches the name.

Source code in src/zenml/integrations/runai/client/runai_client.py
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
def get_cluster_by_name(self, name: str) -> RunAICluster:
    """Get a Run:AI cluster by exact name match.

    Args:
        name: The cluster name to find.

    Returns:
        The matching RunAICluster.

    Raises:
        RunAIClusterNotFoundError: If no cluster matches the name.
    """
    clusters = self.get_clusters()

    for cluster in clusters:
        if cluster.name == name:
            return cluster

    available = [c.name for c in clusters]
    raise RunAIClusterNotFoundError(name, available)
get_clusters() -> List[RunAICluster]

Get all Run:AI clusters.

Returns:

Type Description
List[RunAICluster]

List of RunAICluster objects.

Source code in src/zenml/integrations/runai/client/runai_client.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def get_clusters(self) -> List[RunAICluster]:
    """Get all Run:AI clusters.

    Returns:
        List of RunAICluster objects.
    """
    try:
        response = self._raw_client.organizations.clusters.get_clusters()
        clusters_data = response.data if response.data else []

        return [
            RunAICluster(
                id=c.get("uuid", c.get("id", "")),
                name=c.get("name", ""),
            )
            for c in clusters_data
        ]
    except Exception as exc:
        self._raise_api_error(
            exc,
            f"Failed to fetch Run:AI clusters "
            f"({type(exc).__name__}): {exc}",
        )
get_first_cluster() -> RunAICluster

Get the first available Run:AI cluster.

Returns:

Type Description
RunAICluster

The first RunAICluster.

Raises:

Type Description
RunAIClientError

If no clusters are available.

Source code in src/zenml/integrations/runai/client/runai_client.py
394
395
396
397
398
399
400
401
402
403
404
405
406
def get_first_cluster(self) -> RunAICluster:
    """Get the first available Run:AI cluster.

    Returns:
        The first RunAICluster.

    Raises:
        RunAIClientError: If no clusters are available.
    """
    clusters = self.get_clusters()
    if not clusters:
        raise RunAIClientError("No Run:AI clusters available")
    return clusters[0]
get_project_by_name(name: str) -> RunAIProject

Get a Run:AI project by exact name match.

Parameters:

Name Type Description Default
name str

The project name to find.

required

Returns:

Type Description
RunAIProject

The matching RunAIProject.

Raises:

Type Description
RunAIProjectNotFoundError

If no project matches the name.

Source code in src/zenml/integrations/runai/client/runai_client.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
def get_project_by_name(self, name: str) -> RunAIProject:
    """Get a Run:AI project by exact name match.

    Args:
        name: The project name to find.

    Returns:
        The matching RunAIProject.

    Raises:
        RunAIProjectNotFoundError: If no project matches the name.
    """
    projects = self.get_projects(search=name)

    for project in projects:
        if project.name == name:
            return project

    available = [p.name for p in projects]
    raise RunAIProjectNotFoundError(name, available)
get_projects(search: Optional[str] = None) -> List[RunAIProject]

Get Run:AI projects, optionally filtered by name.

Parameters:

Name Type Description Default
search Optional[str]

Optional search string to filter projects.

None

Returns:

Type Description
List[RunAIProject]

List of RunAIProject objects.

Source code in src/zenml/integrations/runai/client/runai_client.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
def get_projects(self, search: Optional[str] = None) -> List[RunAIProject]:
    """Get Run:AI projects, optionally filtered by name.

    Args:
        search: Optional search string to filter projects.

    Returns:
        List of RunAIProject objects.
    """
    try:
        response = self._raw_client.organizations.projects.get_projects(
            search=search
        )
        projects_data = (
            response.data.get("projects", []) if response.data else []
        )

        return [
            RunAIProject(
                id=str(p.get("id")),
                name=p.get("name", ""),
                cluster_id=p.get("clusterId"),
            )
            for p in projects_data
        ]
    except Exception as exc:
        self._raise_api_error(
            exc,
            f"Failed to fetch Run:AI projects "
            f"({type(exc).__name__}): {exc}",
        )
get_training_workload(workload_id: str) -> RunAITrainingWorkload

Get full training workload details.

Parameters:

Name Type Description Default
workload_id str

The workload ID to query.

required

Returns:

Type Description
RunAITrainingWorkload

The workload details as a typed model.

Raises:

Type Description
RunAIClientError

If the query fails or response is invalid.

Source code in src/zenml/integrations/runai/client/runai_client.py
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
def get_training_workload(self, workload_id: str) -> RunAITrainingWorkload:
    """Get full training workload details.

    Args:
        workload_id: The workload ID to query.

    Returns:
        The workload details as a typed model.

    Raises:
        RunAIClientError: If the query fails or response is invalid.
    """
    try:
        response = self._raw_client.workloads.trainings.get_training(
            workload_id
        )
        if not response.data:
            raise RunAIClientError(
                f"Empty response when querying workload {workload_id}. "
                "The workload may not exist or the API returned no data."
            )
        if not isinstance(response.data, dict):
            raise RunAIClientError(
                f"Unexpected response format for workload {workload_id}. "
                f"Expected dict, got {type(response.data).__name__}."
            )
        return RunAITrainingWorkload.model_validate(response.data)
    except RunAIClientError:
        raise
    except Exception as exc:
        self._raise_api_error(
            exc,
            "Failed to query Run:AI workload "
            f"{workload_id} ({type(exc).__name__}): {exc}",
        )
get_training_workload_status(workload_id: str) -> Optional[str]

Get the status of a training workload.

Parameters:

Name Type Description Default
workload_id str

The workload ID to query.

required

Returns:

Type Description
Optional[str]

The workload status string, or None if the response is missing a

Optional[str]

status field.

Raises:

Type Description
RunAIWorkloadNotFoundError

If the workload was not found (404).

RunAIClientError

If the API call fails for other reasons or the response is malformed.

Source code in src/zenml/integrations/runai/client/runai_client.py
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
def get_training_workload_status(self, workload_id: str) -> Optional[str]:
    """Get the status of a training workload.

    Args:
        workload_id: The workload ID to query.

    Returns:
        The workload status string, or None if the response is missing a
        status field.

    Raises:
        RunAIWorkloadNotFoundError: If the workload was not found (404).
        RunAIClientError: If the API call fails for other reasons or the
            response is malformed.
    """
    try:
        response = self._raw_client.workloads.trainings.get_training(
            workload_id
        )
        if not response.data:
            raise RunAIClientError(
                f"Empty response when querying workload {workload_id}. "
                "The API returned no data."
            )
        if not isinstance(response.data, dict):
            raise RunAIClientError(
                f"Unexpected response format for workload {workload_id}. "
                f"Expected dict, got {type(response.data).__name__}."
            )
        status = response.data.get("actualPhase") or response.data.get(
            "status"
        )
        if status is None:
            logger.warning(
                f"Workload {workload_id} response has no status field. "
                "Available keys: %s",
                list(response.data.keys()),
            )
        return cast(Optional[str], status)
    except RunAIClientError:
        raise
    except Exception as exc:
        status_code = self._get_status_code(exc)
        if status_code == 404:
            raise RunAIWorkloadNotFoundError(workload_id) from exc
        error_msg = str(exc).lower()
        if "not found" in error_msg or "404" in error_msg:
            raise RunAIWorkloadNotFoundError(workload_id) from exc
        self._raise_api_error(
            exc,
            f"Failed to query workload status "
            f"({type(exc).__name__}): {exc}",
        )
suspend_training_workload(workload_id: str) -> None

Suspend a training workload.

Parameters:

Name Type Description Default
workload_id str

The workload ID to suspend.

required
Source code in src/zenml/integrations/runai/client/runai_client.py
562
563
564
565
566
567
568
569
570
571
572
573
574
575
def suspend_training_workload(self, workload_id: str) -> None:
    """Suspend a training workload.

    Args:
        workload_id: The workload ID to suspend.
    """
    try:
        self._raw_client.workloads.trainings.suspend_training(workload_id)
    except Exception as exc:
        self._raise_api_error(
            exc,
            "Failed to suspend Run:AI workload "
            f"{workload_id} ({type(exc).__name__}): {exc}",
        )
RunAIClientError

Bases: Exception

Base exception for Run:AI client errors.

RunAICluster

Bases: BaseModel

Typed representation of a Run:AI cluster.

RunAIClusterNotFoundError(cluster_name: str, available: List[str])

Bases: RunAIClientError

Raised when a Run:AI cluster cannot be found.

Initialize the exception.

Parameters:

Name Type Description Default
cluster_name str

The cluster name that was not found.

required
available List[str]

List of available cluster names.

required
Source code in src/zenml/integrations/runai/client/runai_client.py
62
63
64
65
66
67
68
69
70
71
72
73
74
def __init__(self, cluster_name: str, available: List[str]) -> None:
    """Initialize the exception.

    Args:
        cluster_name: The cluster name that was not found.
        available: List of available cluster names.
    """
    self.cluster_name = cluster_name
    self.available = available
    super().__init__(
        f"Cluster '{cluster_name}' not found in Run:AI. "
        f"Available clusters: {available}"
    )
Functions
RunAIConnectionError

Bases: RunAIClientError

Raised when connection to Run:AI API fails.

RunAIProject

Bases: BaseModel

Typed representation of a Run:AI project.

RunAIProjectNotFoundError(project_name: str, available: List[str])

Bases: RunAIClientError

Raised when a Run:AI project cannot be found.

Initialize the exception.

Parameters:

Name Type Description Default
project_name str

The project name that was not found.

required
available List[str]

List of available project names.

required
Source code in src/zenml/integrations/runai/client/runai_client.py
44
45
46
47
48
49
50
51
52
53
54
55
56
def __init__(self, project_name: str, available: List[str]) -> None:
    """Initialize the exception.

    Args:
        project_name: The project name that was not found.
        available: List of available project names.
    """
    self.project_name = project_name
    self.available = available
    super().__init__(
        f"Project '{project_name}' not found in Run:AI. "
        f"Available projects: {available}"
    )
Functions
RunAITrainingWorkload

Bases: BaseModel

Typed representation of a Run:AI training workload.

RunAIWorkloadNotFoundError(workload_id: str)

Bases: RunAIClientError

Raised when a Run:AI workload cannot be found.

Initialize the exception.

Parameters:

Name Type Description Default
workload_id str

The workload ID that was not found.

required
Source code in src/zenml/integrations/runai/client/runai_client.py
80
81
82
83
84
85
86
87
def __init__(self, workload_id: str) -> None:
    """Initialize the exception.

    Args:
        workload_id: The workload ID that was not found.
    """
    self.workload_id = workload_id
    super().__init__(f"Workload '{workload_id}' not found in Run:AI.")
Functions
WorkloadSubmissionResult

Bases: BaseModel

Result of submitting a workload to Run:AI.

Functions

constants

Run:AI integration constants and status mappings.

Classes
RunAIWorkloadStatus

Bases: str, Enum

Run:AI workload status values.

Functions
is_failure_status(status: str) -> bool

Check if a Run:AI status indicates failure.

Parameters:

Name Type Description Default
status str

The Run:AI workload status string.

required

Returns:

Type Description
bool

True if the status indicates failure.

Source code in src/zenml/integrations/runai/constants.py
110
111
112
113
114
115
116
117
118
119
120
121
122
def is_failure_status(status: str) -> bool:
    """Check if a Run:AI status indicates failure.

    Args:
        status: The Run:AI workload status string.

    Returns:
        True if the status indicates failure.
    """
    try:
        return RunAIWorkloadStatus(status.lower()) in _FAILURE_STATUSES
    except ValueError:
        return False
is_pending_status(status: str) -> bool

Check if a Run:AI status indicates pending scheduling.

Parameters:

Name Type Description Default
status str

The Run:AI workload status string.

required

Returns:

Type Description
bool

True if the status indicates the workload is pending.

Source code in src/zenml/integrations/runai/constants.py
125
126
127
128
129
130
131
132
133
134
135
136
137
def is_pending_status(status: str) -> bool:
    """Check if a Run:AI status indicates pending scheduling.

    Args:
        status: The Run:AI workload status string.

    Returns:
        True if the status indicates the workload is pending.
    """
    try:
        return RunAIWorkloadStatus(status.lower()) in _PENDING_STATUSES
    except ValueError:
        return False
is_success_status(status: str) -> bool

Check if a Run:AI status indicates success.

Parameters:

Name Type Description Default
status str

The Run:AI workload status string.

required

Returns:

Type Description
bool

True if the status indicates successful completion.

Source code in src/zenml/integrations/runai/constants.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def is_success_status(status: str) -> bool:
    """Check if a Run:AI status indicates success.

    Args:
        status: The Run:AI workload status string.

    Returns:
        True if the status indicates successful completion.
    """
    try:
        return RunAIWorkloadStatus(status.lower()) in _SUCCESS_STATUSES
    except ValueError:
        return False
is_terminal_status(status: str) -> bool

Check if a Run:AI status is terminal (workload finished).

Parameters:

Name Type Description Default
status str

The Run:AI workload status string.

required

Returns:

Type Description
bool

True if the status indicates the workload has finished.

Source code in src/zenml/integrations/runai/constants.py
80
81
82
83
84
85
86
87
88
89
90
91
92
def is_terminal_status(status: str) -> bool:
    """Check if a Run:AI status is terminal (workload finished).

    Args:
        status: The Run:AI workload status string.

    Returns:
        True if the status indicates the workload has finished.
    """
    try:
        return RunAIWorkloadStatus(status.lower()) in _TERMINAL_STATUSES
    except ValueError:
        return False
map_runai_status_to_execution_status(runai_status: str) -> ExecutionStatus

Maps Run:AI workload status to ZenML ExecutionStatus.

Parameters:

Name Type Description Default
runai_status str

The Run:AI workload status string.

required

Returns:

Type Description
ExecutionStatus

The corresponding ZenML ExecutionStatus.

Source code in src/zenml/integrations/runai/constants.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def map_runai_status_to_execution_status(runai_status: str) -> ExecutionStatus:
    """Maps Run:AI workload status to ZenML ExecutionStatus.

    Args:
        runai_status: The Run:AI workload status string.

    Returns:
        The corresponding ZenML ExecutionStatus.
    """
    if not runai_status:
        return ExecutionStatus.RUNNING

    try:
        status_enum = RunAIWorkloadStatus(runai_status.lower())
        return RUNAI_STATUS_TO_EXECUTION_STATUS.get(
            status_enum, ExecutionStatus.RUNNING
        )
    except ValueError:
        return ExecutionStatus.RUNNING

flavors

Run:AI integration flavors.

Classes
RunAIStepOperatorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseStepOperatorConfig, RunAIStepOperatorSettings

Configuration for the Run:AI step operator.

This step operator enables running individual pipeline steps on Run:AI clusters with fractional GPU allocation.

Example stack configuration:

zenml step-operator register runai \
    --flavor=runai \
    --client_id="xxx" \
    --client_secret="xxx" \
    --runai_base_url="https://myorg.run.ai" \
    --project_name="my-project"
Source code in src/zenml/stack/stack_component.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_local: bool property

Checks if this stack component is running locally.

Run:AI step operator never runs locally.

Returns:

Name Type Description
bool bool

Always False because the Run:AI step operator is remote.

is_remote: bool property

Checks if this stack component is running remotely.

Run:AI step operator always runs remotely on Run:AI clusters.

Returns:

Name Type Description
bool bool

Always True because the Run:AI step operator runs remotely.

RunAIStepOperatorFlavor

Bases: BaseStepOperatorFlavor

Run:AI step operator flavor.

Attributes
config_class: Type[RunAIStepOperatorConfig] property

Returns RunAIStepOperatorConfig config class.

Returns:

Type Description
Type[RunAIStepOperatorConfig]

The config class.

display_name: str property

Display name of the flavor.

Returns:

Type Description
str

The display name of the flavor.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[RunAIStepOperator] property

Implementation class for this flavor.

Returns:

Type Description
Type[RunAIStepOperator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

RunAIStepOperatorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Per-step settings for Run:AI execution.

These settings can be configured per-step using the step decorator:

@step(
    step_operator="runai",
    settings={"step_operator": {"gpu_portion_request": 0.5}}
)
def my_step():
    ...
Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
Modules
runai_step_operator_flavor

Run:AI step operator flavor.

Classes
RunAIStepOperatorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseStepOperatorConfig, RunAIStepOperatorSettings

Configuration for the Run:AI step operator.

This step operator enables running individual pipeline steps on Run:AI clusters with fractional GPU allocation.

Example stack configuration:

zenml step-operator register runai \
    --flavor=runai \
    --client_id="xxx" \
    --client_secret="xxx" \
    --runai_base_url="https://myorg.run.ai" \
    --project_name="my-project"
Source code in src/zenml/stack/stack_component.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_local: bool property

Checks if this stack component is running locally.

Run:AI step operator never runs locally.

Returns:

Name Type Description
bool bool

Always False because the Run:AI step operator is remote.

is_remote: bool property

Checks if this stack component is running remotely.

Run:AI step operator always runs remotely on Run:AI clusters.

Returns:

Name Type Description
bool bool

Always True because the Run:AI step operator runs remotely.

RunAIStepOperatorFlavor

Bases: BaseStepOperatorFlavor

Run:AI step operator flavor.

Attributes
config_class: Type[RunAIStepOperatorConfig] property

Returns RunAIStepOperatorConfig config class.

Returns:

Type Description
Type[RunAIStepOperatorConfig]

The config class.

display_name: str property

Display name of the flavor.

Returns:

Type Description
str

The display name of the flavor.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[RunAIStepOperator] property

Implementation class for this flavor.

Returns:

Type Description
Type[RunAIStepOperator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

RunAIStepOperatorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Per-step settings for Run:AI execution.

These settings can be configured per-step using the step decorator:

@step(
    step_operator="runai",
    settings={"step_operator": {"gpu_portion_request": 0.5}}
)
def my_step():
    ...
Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)

step_operators

Run:AI step operators.

Classes
RunAIStepOperator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, environment: Optional[Dict[str, str]] = None, secrets: Optional[List[UUID]] = None, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseStepOperator

Step operator to run individual steps on Run:AI.

This step operator enables selective GPU offloading by running individual pipeline steps on Run:AI clusters.

Example usage:

@step(step_operator="runai")
def train_model(data):
    # GPU-intensive training runs on Run:AI
    ...

Source code in src/zenml/stack/stack_component.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    environment: Optional[Dict[str, str]] = None,
    secrets: Optional[List[UUID]] = None,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        environment: Environment variables to set when running on this
            component.
        secrets: Secrets to set as environment variables when running on
            this component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.environment = environment or {}
    self.secrets = secrets or []
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
client: RunAIClient property

Get or create the Run:AI client.

The client is cached for reuse across multiple calls.

Returns:

Type Description
RunAIClient

The RunAIClient instance.

config: RunAIStepOperatorConfig property

Returns the step operator config.

Returns:

Type Description
RunAIStepOperatorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property

Settings class for the Run:AI step operator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property

Validates the stack.

Returns:

Type Description
Optional[StackValidator]

A validator that checks that the stack contains a remote container

Optional[StackValidator]

registry and a remote artifact store.

Functions
cancel(step_run: StepRunResponse) -> None

Cancels a submitted step.

Parameters:

Name Type Description Default
step_run StepRunResponse

The step run.

required
Source code in src/zenml/integrations/runai/step_operators/runai_step_operator.py
339
340
341
342
343
344
345
346
def cancel(self, step_run: "StepRunResponse") -> None:
    """Cancels a submitted step.

    Args:
        step_run: The step run.
    """
    workload_id = self._get_workload_id(step_run)
    self.client.suspend_training_workload(workload_id)
get_docker_builds(snapshot: PipelineSnapshotBase) -> List[BuildConfiguration]

Gets the Docker builds required for the component.

Parameters:

Name Type Description Default
snapshot PipelineSnapshotBase

The pipeline snapshot for which to get the builds.

required

Returns:

Type Description
List[BuildConfiguration]

The required Docker builds.

Source code in src/zenml/integrations/runai/step_operators/runai_step_operator.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def get_docker_builds(
    self, snapshot: "PipelineSnapshotBase"
) -> List[BuildConfiguration]:
    """Gets the Docker builds required for the component.

    Args:
        snapshot: The pipeline snapshot for which to get the builds.

    Returns:
        The required Docker builds.
    """
    builds = []
    for step_name, step in snapshot.step_configurations.items():
        if step.config.uses_step_operator(self.name):
            build = BuildConfiguration(
                key=RUNAI_STEP_OPERATOR_DOCKER_IMAGE_KEY,
                settings=step.config.docker_settings,
                step_name=step_name,
            )
            builds.append(build)

    return builds
get_status(step_run: StepRunResponse) -> ExecutionStatus

Gets the status of a submitted step.

Parameters:

Name Type Description Default
step_run StepRunResponse

The step run.

required

Returns:

Type Description
ExecutionStatus

The step status.

Source code in src/zenml/integrations/runai/step_operators/runai_step_operator.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
def get_status(self, step_run: "StepRunResponse") -> ExecutionStatus:
    """Gets the status of a submitted step.

    Args:
        step_run: The step run.

    Returns:
        The step status.
    """
    workload_id = self._get_workload_id(step_run)
    try:
        status = self.client.get_training_workload_status(workload_id)
    except RunAIWorkloadNotFoundError:
        logger.warning(
            "Run:AI workload `%s` for step run `%s` was not found.",
            workload_id,
            step_run.id,
        )
        return ExecutionStatus.FAILED

    if status is None:
        logger.warning(
            "Run:AI workload `%s` for step run `%s` has no status.",
            workload_id,
            step_run.id,
        )
        return ExecutionStatus.FAILED

    return map_runai_status_to_execution_status(status)
submit(info: StepRunInfo, entrypoint_command: List[str], environment: Dict[str, str]) -> None

Submits a step to Run:AI as a training workload.

Parameters:

Name Type Description Default
info StepRunInfo

Information about the step run.

required
entrypoint_command List[str]

Command that executes the step.

required
environment Dict[str, str]

Environment variables to set in the step operator environment.

required

Raises:

Type Description
RunAIClientError

If building the Run:AI training request fails.

RuntimeError

If workload submission fails.

Source code in src/zenml/integrations/runai/step_operators/runai_step_operator.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def submit(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
    environment: Dict[str, str],
) -> None:
    """Submits a step to Run:AI as a training workload.

    Args:
        info: Information about the step run.
        entrypoint_command: Command that executes the step.
        environment: Environment variables to set in the step operator
            environment.

    Raises:
        RunAIClientError: If building the Run:AI training request fails.
        RuntimeError: If workload submission fails.
    """
    settings = cast(RunAIStepOperatorSettings, self.get_settings(info))

    image = info.get_image(key=RUNAI_STEP_OPERATOR_DOCKER_IMAGE_KEY)

    project_id, cluster_id = self._resolve_project_and_cluster()

    workload_name = self._build_workload_name(info)

    compute = self._build_compute_spec(settings)

    env_vars = self._build_environment_variables(environment)

    image_pull_secrets = self._build_image_pull_secrets()

    command, args = self._build_command_and_args(entrypoint_command)

    tolerations_list = self._build_tolerations(settings)
    labels_list = self._build_labels(settings)
    annotations_list = self._build_annotations(settings)

    try:
        training_request = TrainingCreationRequest(
            name=workload_name,
            project_id=project_id,
            cluster_id=cluster_id,
            spec=TrainingSpecSpec(
                image=image,
                command=command,
                compute=compute,
                environment_variables=env_vars,
                args=args,
                image_pull_secrets=image_pull_secrets,
                node_pools=settings.node_pools,
                node_type=settings.node_type,
                preemptibility=settings.preemptibility,
                priority_class=settings.priority_class,
                tolerations=tolerations_list,
                backoff_limit=settings.backoff_limit,
                termination_grace_period_seconds=(
                    settings.termination_grace_period_seconds
                ),
                terminate_after_preemption=settings.terminate_after_preemption,
                working_dir=settings.working_dir,
                labels=labels_list,
                annotations=annotations_list,
            ),
        )
    except Exception as exc:
        raise RunAIClientError(
            "Failed to build Run:AI training request "
            f"({type(exc).__name__}): {exc}"
        ) from exc

    info.force_write_logs()

    try:
        result = self.client.create_training_workload(training_request)
        logger.info(
            "Submitted step '%s' to Run:AI as workload '%s' (ID: %s)",
            info.pipeline_step_name,
            result.workload_name,
            result.workload_id,
        )
    except RunAIClientError as exc:
        raise RuntimeError(
            f"Failed to submit step '{info.pipeline_step_name}' to Run:AI: {exc}. "
            "Verify credentials, project name, cluster access, and quota."
        ) from exc

    publish_step_run_metadata(
        info.step_run_id,
        {
            self.id: {
                RUNAI_WORKLOAD_ID_METADATA_KEY: result.workload_id,
                RUNAI_WORKLOAD_NAME_METADATA_KEY: result.workload_name,
            }
        },
    )
    info.step_run.run_metadata[RUNAI_WORKLOAD_ID_METADATA_KEY] = (
        result.workload_id
    )
    info.step_run.run_metadata[RUNAI_WORKLOAD_NAME_METADATA_KEY] = (
        result.workload_name
    )
wait(step_run: StepRunResponse) -> ExecutionStatus

Waits for a submitted step to finish.

Parameters:

Name Type Description Default
step_run StepRunResponse

The step run.

required

Returns:

Type Description
ExecutionStatus

The final step status.

Source code in src/zenml/integrations/runai/step_operators/runai_step_operator.py
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
def wait(self, step_run: "StepRunResponse") -> ExecutionStatus:
    """Waits for a submitted step to finish.

    Args:
        step_run: The step run.

    Returns:
        The final step status.
    """
    settings = cast(RunAIStepOperatorSettings, self.get_settings(step_run))
    workload_id = self._get_workload_id(step_run)
    status = self._wait_for_completion(
        client=self.client,
        workload_id=workload_id,
        settings=settings,
    )
    logger.info("Run:AI step operator job completed.")
    return status
Modules
runai_step_operator

Run:AI step operator implementation.

Classes
RunAIStepOperator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, environment: Optional[Dict[str, str]] = None, secrets: Optional[List[UUID]] = None, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseStepOperator

Step operator to run individual steps on Run:AI.

This step operator enables selective GPU offloading by running individual pipeline steps on Run:AI clusters.

Example usage:

@step(step_operator="runai")
def train_model(data):
    # GPU-intensive training runs on Run:AI
    ...

Source code in src/zenml/stack/stack_component.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    environment: Optional[Dict[str, str]] = None,
    secrets: Optional[List[UUID]] = None,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        environment: Environment variables to set when running on this
            component.
        secrets: Secrets to set as environment variables when running on
            this component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.environment = environment or {}
    self.secrets = secrets or []
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
client: RunAIClient property

Get or create the Run:AI client.

The client is cached for reuse across multiple calls.

Returns:

Type Description
RunAIClient

The RunAIClient instance.

config: RunAIStepOperatorConfig property

Returns the step operator config.

Returns:

Type Description
RunAIStepOperatorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property

Settings class for the Run:AI step operator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property

Validates the stack.

Returns:

Type Description
Optional[StackValidator]

A validator that checks that the stack contains a remote container

Optional[StackValidator]

registry and a remote artifact store.

Functions
cancel(step_run: StepRunResponse) -> None

Cancels a submitted step.

Parameters:

Name Type Description Default
step_run StepRunResponse

The step run.

required
Source code in src/zenml/integrations/runai/step_operators/runai_step_operator.py
339
340
341
342
343
344
345
346
def cancel(self, step_run: "StepRunResponse") -> None:
    """Cancels a submitted step.

    Args:
        step_run: The step run.
    """
    workload_id = self._get_workload_id(step_run)
    self.client.suspend_training_workload(workload_id)
get_docker_builds(snapshot: PipelineSnapshotBase) -> List[BuildConfiguration]

Gets the Docker builds required for the component.

Parameters:

Name Type Description Default
snapshot PipelineSnapshotBase

The pipeline snapshot for which to get the builds.

required

Returns:

Type Description
List[BuildConfiguration]

The required Docker builds.

Source code in src/zenml/integrations/runai/step_operators/runai_step_operator.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def get_docker_builds(
    self, snapshot: "PipelineSnapshotBase"
) -> List[BuildConfiguration]:
    """Gets the Docker builds required for the component.

    Args:
        snapshot: The pipeline snapshot for which to get the builds.

    Returns:
        The required Docker builds.
    """
    builds = []
    for step_name, step in snapshot.step_configurations.items():
        if step.config.uses_step_operator(self.name):
            build = BuildConfiguration(
                key=RUNAI_STEP_OPERATOR_DOCKER_IMAGE_KEY,
                settings=step.config.docker_settings,
                step_name=step_name,
            )
            builds.append(build)

    return builds
get_status(step_run: StepRunResponse) -> ExecutionStatus

Gets the status of a submitted step.

Parameters:

Name Type Description Default
step_run StepRunResponse

The step run.

required

Returns:

Type Description
ExecutionStatus

The step status.

Source code in src/zenml/integrations/runai/step_operators/runai_step_operator.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
def get_status(self, step_run: "StepRunResponse") -> ExecutionStatus:
    """Gets the status of a submitted step.

    Args:
        step_run: The step run.

    Returns:
        The step status.
    """
    workload_id = self._get_workload_id(step_run)
    try:
        status = self.client.get_training_workload_status(workload_id)
    except RunAIWorkloadNotFoundError:
        logger.warning(
            "Run:AI workload `%s` for step run `%s` was not found.",
            workload_id,
            step_run.id,
        )
        return ExecutionStatus.FAILED

    if status is None:
        logger.warning(
            "Run:AI workload `%s` for step run `%s` has no status.",
            workload_id,
            step_run.id,
        )
        return ExecutionStatus.FAILED

    return map_runai_status_to_execution_status(status)
submit(info: StepRunInfo, entrypoint_command: List[str], environment: Dict[str, str]) -> None

Submits a step to Run:AI as a training workload.

Parameters:

Name Type Description Default
info StepRunInfo

Information about the step run.

required
entrypoint_command List[str]

Command that executes the step.

required
environment Dict[str, str]

Environment variables to set in the step operator environment.

required

Raises:

Type Description
RunAIClientError

If building the Run:AI training request fails.

RuntimeError

If workload submission fails.

Source code in src/zenml/integrations/runai/step_operators/runai_step_operator.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def submit(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
    environment: Dict[str, str],
) -> None:
    """Submits a step to Run:AI as a training workload.

    Args:
        info: Information about the step run.
        entrypoint_command: Command that executes the step.
        environment: Environment variables to set in the step operator
            environment.

    Raises:
        RunAIClientError: If building the Run:AI training request fails.
        RuntimeError: If workload submission fails.
    """
    settings = cast(RunAIStepOperatorSettings, self.get_settings(info))

    image = info.get_image(key=RUNAI_STEP_OPERATOR_DOCKER_IMAGE_KEY)

    project_id, cluster_id = self._resolve_project_and_cluster()

    workload_name = self._build_workload_name(info)

    compute = self._build_compute_spec(settings)

    env_vars = self._build_environment_variables(environment)

    image_pull_secrets = self._build_image_pull_secrets()

    command, args = self._build_command_and_args(entrypoint_command)

    tolerations_list = self._build_tolerations(settings)
    labels_list = self._build_labels(settings)
    annotations_list = self._build_annotations(settings)

    try:
        training_request = TrainingCreationRequest(
            name=workload_name,
            project_id=project_id,
            cluster_id=cluster_id,
            spec=TrainingSpecSpec(
                image=image,
                command=command,
                compute=compute,
                environment_variables=env_vars,
                args=args,
                image_pull_secrets=image_pull_secrets,
                node_pools=settings.node_pools,
                node_type=settings.node_type,
                preemptibility=settings.preemptibility,
                priority_class=settings.priority_class,
                tolerations=tolerations_list,
                backoff_limit=settings.backoff_limit,
                termination_grace_period_seconds=(
                    settings.termination_grace_period_seconds
                ),
                terminate_after_preemption=settings.terminate_after_preemption,
                working_dir=settings.working_dir,
                labels=labels_list,
                annotations=annotations_list,
            ),
        )
    except Exception as exc:
        raise RunAIClientError(
            "Failed to build Run:AI training request "
            f"({type(exc).__name__}): {exc}"
        ) from exc

    info.force_write_logs()

    try:
        result = self.client.create_training_workload(training_request)
        logger.info(
            "Submitted step '%s' to Run:AI as workload '%s' (ID: %s)",
            info.pipeline_step_name,
            result.workload_name,
            result.workload_id,
        )
    except RunAIClientError as exc:
        raise RuntimeError(
            f"Failed to submit step '{info.pipeline_step_name}' to Run:AI: {exc}. "
            "Verify credentials, project name, cluster access, and quota."
        ) from exc

    publish_step_run_metadata(
        info.step_run_id,
        {
            self.id: {
                RUNAI_WORKLOAD_ID_METADATA_KEY: result.workload_id,
                RUNAI_WORKLOAD_NAME_METADATA_KEY: result.workload_name,
            }
        },
    )
    info.step_run.run_metadata[RUNAI_WORKLOAD_ID_METADATA_KEY] = (
        result.workload_id
    )
    info.step_run.run_metadata[RUNAI_WORKLOAD_NAME_METADATA_KEY] = (
        result.workload_name
    )
wait(step_run: StepRunResponse) -> ExecutionStatus

Waits for a submitted step to finish.

Parameters:

Name Type Description Default
step_run StepRunResponse

The step run.

required

Returns:

Type Description
ExecutionStatus

The final step status.

Source code in src/zenml/integrations/runai/step_operators/runai_step_operator.py
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
def wait(self, step_run: "StepRunResponse") -> ExecutionStatus:
    """Waits for a submitted step to finish.

    Args:
        step_run: The step run.

    Returns:
        The final step status.
    """
    settings = cast(RunAIStepOperatorSettings, self.get_settings(step_run))
    workload_id = self._get_workload_id(step_run)
    status = self._wait_for_completion(
        client=self.client,
        workload_id=workload_id,
        settings=settings,
    )
    logger.info("Run:AI step operator job completed.")
    return status
Functions