Orchestrators
zenml.orchestrators
Initialization for ZenML orchestrators.
An orchestrator is a special kind of backend that manages the running of each step of the pipeline. Orchestrators administer the actual pipeline runs. You can think of it as the 'root' of any pipeline job that you run during your experimentation.
ZenML supports a local orchestrator out of the box which allows you to run your pipelines in a local environment. We also support using Apache Airflow as the orchestrator to handle the steps of your pipeline.
Attributes
__all__ = ['BaseOrchestrator', 'BaseOrchestratorConfig', 'BaseOrchestratorFlavor', 'ContainerizedOrchestrator', 'WheeledOrchestrator', 'LocalOrchestrator', 'LocalOrchestratorFlavor', 'LocalDockerOrchestrator', 'LocalDockerOrchestratorFlavor']
module-attribute
Classes
BaseOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)
Bases: StackComponent
, ABC
Base class for all orchestrators.
In order to implement an orchestrator you will need to subclass from this class.
How it works:
The run(...)
method is the entrypoint that is executed when the
pipeline's run method is called within the user code
(pipeline_instance.run(...)
).
This method will do some internal preparation and then call the
prepare_or_run_pipeline(...)
method. BaseOrchestrator subclasses must
implement this method and either run the pipeline steps directly or deploy
the pipeline to some remote infrastructure.
Source code in src/zenml/stack/stack_component.py
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 |
|
Attributes
config: BaseOrchestratorConfig
property
Returns the BaseOrchestratorConfig
config.
Returns:
Type | Description |
---|---|
BaseOrchestratorConfig
|
The configuration. |
Functions
fetch_status(run: PipelineRunResponse) -> ExecutionStatus
Refreshes the status of a specific pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run
|
PipelineRunResponse
|
A pipeline run response to fetch its status. |
required |
Raises:
Type | Description |
---|---|
NotImplementedError
|
If any orchestrator inheriting from the base class does not implement this logic. |
Source code in src/zenml/orchestrators/base_orchestrator.py
319 320 321 322 323 324 325 326 327 328 329 330 331 332 |
|
get_orchestrator_run_id() -> str
abstractmethod
Returns the run id of the active orchestrator run.
Important: This needs to be a unique ID and return the same value for all steps of a pipeline run.
Returns:
Type | Description |
---|---|
str
|
The orchestrator run id. |
Source code in src/zenml/orchestrators/base_orchestrator.py
126 127 128 129 130 131 132 133 134 135 |
|
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Optional[Iterator[Dict[str, MetadataType]]]
abstractmethod
The method needs to be implemented by the respective orchestrator.
Depending on the type of orchestrator you'll have to perform slightly different operations.
Simple Case:
The Steps are run directly from within the same environment in which
the orchestrator code is executed. In this case you will need to
deal with implementation-specific runtime configurations (like the
schedule) and then iterate through the steps and finally call
self.run_step(...)
to execute each step.
Advanced Case:
Most orchestrators will not run the steps directly. Instead, they build some intermediate representation of the pipeline that is then used to create and run the pipeline and its steps on the target environment. For such orchestrators this method will have to build this representation and deploy it.
Regardless of the implementation details, the orchestrator will need
to run each step in the target environment. For this the
self.run_step(...)
method should be used.
The easiest way to make this work is by using an entrypoint
configuration to run single steps (zenml.entrypoints.step_entrypoint_configuration.StepEntrypointConfiguration
)
or entire pipelines (zenml.entrypoints.pipeline_entrypoint_configuration.PipelineEntrypointConfiguration
).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment
|
PipelineDeploymentResponse
|
The pipeline deployment to prepare or run. |
required |
stack
|
Stack
|
The stack the pipeline will run on. |
required |
environment
|
Dict[str, str]
|
Environment variables to set in the orchestration environment. These don't need to be set if running locally. |
required |
placeholder_run
|
Optional[PipelineRunResponse]
|
An optional placeholder run for the deployment. |
None
|
Yields:
Type | Description |
---|---|
Optional[Iterator[Dict[str, MetadataType]]]
|
Metadata for the pipeline run. |
Source code in src/zenml/orchestrators/base_orchestrator.py
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
|
requires_resources_in_orchestration_environment(step: Step) -> bool
staticmethod
Checks if the orchestrator should run this step on special resources.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step
|
Step
|
The step that will be checked. |
required |
Returns:
Type | Description |
---|---|
bool
|
True if the step requires special resources in the orchestration |
bool
|
environment, False otherwise. |
Source code in src/zenml/orchestrators/base_orchestrator.py
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 |
|
run(deployment: PipelineDeploymentResponse, stack: Stack, placeholder_run: Optional[PipelineRunResponse] = None) -> None
Runs a pipeline on a stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment
|
PipelineDeploymentResponse
|
The pipeline deployment. |
required |
stack
|
Stack
|
The stack on which to run the pipeline. |
required |
placeholder_run
|
Optional[PipelineRunResponse]
|
An optional placeholder run for the deployment. This will be deleted in case the pipeline deployment failed. |
None
|
Source code in src/zenml/orchestrators/base_orchestrator.py
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
|
run_step(step: Step) -> None
Runs the given step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step
|
Step
|
The step to run. |
required |
Source code in src/zenml/orchestrators/base_orchestrator.py
272 273 274 275 276 277 278 279 280 281 282 283 284 |
|
BaseOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)
Bases: StackComponentConfig
Base orchestrator config.
Source code in src/zenml/stack/stack_component.py
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
|
Attributes
is_schedulable: bool
property
Whether the orchestrator is schedulable or not.
Returns:
Type | Description |
---|---|
bool
|
Whether the orchestrator is schedulable or not. |
is_synchronous: bool
property
Whether the orchestrator runs synchronous or not.
Returns:
Type | Description |
---|---|
bool
|
Whether the orchestrator runs synchronous or not. |
supports_client_side_caching: bool
property
Whether the orchestrator supports client side caching.
Returns:
Type | Description |
---|---|
bool
|
Whether the orchestrator supports client side caching. |
BaseOrchestratorFlavor
Bases: Flavor
Base orchestrator flavor class.
Attributes
config_class: Type[BaseOrchestratorConfig]
property
Config class for the base orchestrator flavor.
Returns:
Type | Description |
---|---|
Type[BaseOrchestratorConfig]
|
The config class. |
implementation_class: Type[BaseOrchestrator]
abstractmethod
property
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[BaseOrchestrator]
|
The implementation class. |
type: StackComponentType
property
ContainerizedOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)
Bases: BaseOrchestrator
, ABC
Base class for containerized orchestrators.
Source code in src/zenml/stack/stack_component.py
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 |
|
Functions
get_docker_builds(deployment: PipelineDeploymentBase) -> List[BuildConfiguration]
Gets the Docker builds required for the component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment
|
PipelineDeploymentBase
|
The pipeline deployment for which to get the builds. |
required |
Returns:
Type | Description |
---|---|
List[BuildConfiguration]
|
The required Docker builds. |
Source code in src/zenml/orchestrators/containerized_orchestrator.py
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
|
get_image(deployment: PipelineDeploymentResponse, step_name: Optional[str] = None) -> str
staticmethod
Gets the Docker image for the pipeline/a step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment
|
PipelineDeploymentResponse
|
The deployment from which to get the image. |
required |
step_name
|
Optional[str]
|
Pipeline step name for which to get the image. If not given the generic pipeline image will be returned. |
None
|
Raises:
Type | Description |
---|---|
RuntimeError
|
If the deployment does not have an associated build. |
Returns:
Type | Description |
---|---|
str
|
The image name or digest. |
Source code in src/zenml/orchestrators/containerized_orchestrator.py
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
|
LocalDockerOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)
Bases: ContainerizedOrchestrator
Orchestrator responsible for running pipelines locally using Docker.
This orchestrator does not allow for concurrent execution of steps and also does not support running on a schedule.
Source code in src/zenml/stack/stack_component.py
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 |
|
Attributes
settings_class: Optional[Type[BaseSettings]]
property
Settings class for the Local Docker orchestrator.
Returns:
Type | Description |
---|---|
Optional[Type[BaseSettings]]
|
The settings class. |
validator: Optional[StackValidator]
property
Ensures there is an image builder in the stack.
Returns:
Type | Description |
---|---|
Optional[StackValidator]
|
A |
Functions
get_orchestrator_run_id() -> str
Returns the active orchestrator run id.
Raises:
Type | Description |
---|---|
RuntimeError
|
If the environment variable specifying the run id is not set. |
Returns:
Type | Description |
---|---|
str
|
The orchestrator run id. |
Source code in src/zenml/orchestrators/local_docker/local_docker_orchestrator.py
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
|
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Any
Sequentially runs all pipeline steps in local Docker containers.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment
|
PipelineDeploymentResponse
|
The pipeline deployment to prepare or run. |
required |
stack
|
Stack
|
The stack the pipeline will run on. |
required |
environment
|
Dict[str, str]
|
Environment variables to set in the orchestration environment. |
required |
placeholder_run
|
Optional[PipelineRunResponse]
|
An optional placeholder run for the deployment. |
None
|
Raises:
Type | Description |
---|---|
RuntimeError
|
If a step fails. |
Source code in src/zenml/orchestrators/local_docker/local_docker_orchestrator.py
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
|
LocalDockerOrchestratorFlavor
Bases: BaseOrchestratorFlavor
Flavor for the local Docker orchestrator.
Attributes
config_class: Type[BaseOrchestratorConfig]
property
Config class for the base orchestrator flavor.
Returns:
Type | Description |
---|---|
Type[BaseOrchestratorConfig]
|
The config class. |
docs_url: Optional[str]
property
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str]
|
A flavor docs url. |
implementation_class: Type[LocalDockerOrchestrator]
property
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[LocalDockerOrchestrator]
|
Implementation class for this flavor. |
logo_url: str
property
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str
|
The flavor logo. |
name: str
property
Name of the orchestrator flavor.
Returns:
Type | Description |
---|---|
str
|
Name of the orchestrator flavor. |
sdk_docs_url: Optional[str]
property
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str]
|
A flavor SDK docs url. |
LocalOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)
Bases: BaseOrchestrator
Orchestrator responsible for running pipelines locally.
This orchestrator does not allow for concurrent execution of steps and also does not support running on a schedule.
Source code in src/zenml/stack/stack_component.py
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 |
|
Functions
get_orchestrator_run_id() -> str
Returns the active orchestrator run id.
Raises:
Type | Description |
---|---|
RuntimeError
|
If no run id exists. This happens when this method gets called while the orchestrator is not running a pipeline. |
Returns:
Type | Description |
---|---|
str
|
The orchestrator run id. |
Source code in src/zenml/orchestrators/local/local_orchestrator.py
91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
|
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Any
Iterates through all steps and executes them sequentially.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment
|
PipelineDeploymentResponse
|
The pipeline deployment to prepare or run. |
required |
stack
|
Stack
|
The stack on which the pipeline is deployed. |
required |
environment
|
Dict[str, str]
|
Environment variables to set in the orchestration environment. |
required |
placeholder_run
|
Optional[PipelineRunResponse]
|
An optional placeholder run for the deployment. |
None
|
Source code in src/zenml/orchestrators/local/local_orchestrator.py
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
|
LocalOrchestratorFlavor
Bases: BaseOrchestratorFlavor
Class for the LocalOrchestratorFlavor
.
Attributes
config_class: Type[BaseOrchestratorConfig]
property
Config class for the base orchestrator flavor.
Returns:
Type | Description |
---|---|
Type[BaseOrchestratorConfig]
|
The config class. |
docs_url: Optional[str]
property
A URL to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str]
|
A flavor docs url. |
implementation_class: Type[LocalOrchestrator]
property
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[LocalOrchestrator]
|
The implementation class for this flavor. |
logo_url: str
property
A URL to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str
|
The flavor logo. |
name: str
property
The flavor name.
Returns:
Type | Description |
---|---|
str
|
The flavor name. |
sdk_docs_url: Optional[str]
property
A URL to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str]
|
A flavor SDK docs url. |
WheeledOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)
Bases: BaseOrchestrator
, ABC
Base class for wheeled orchestrators.
Source code in src/zenml/stack/stack_component.py
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 |
|
Functions
copy_repository_to_temp_dir_and_add_setup_py() -> str
Copy the repository to a temporary directory and add a setup.py file.
Returns:
Type | Description |
---|---|
str
|
Path to the temporary directory containing the copied repository. |
Source code in src/zenml/orchestrators/wheeled_orchestrator.py
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
|
create_wheel(temp_dir: str) -> str
Create a wheel for the package in the given temporary directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
temp_dir
|
str
|
Path to the temporary directory containing the package. |
required |
Raises:
Type | Description |
---|---|
RuntimeError
|
If the wheel file could not be created. |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
Path to the created wheel file. |
Source code in src/zenml/orchestrators/wheeled_orchestrator.py
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
|
sanitize_name(name: str) -> str
Sanitize the value to be used in a cluster name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Arbitrary input cluster name. |
required |
Returns:
Type | Description |
---|---|
str
|
Sanitized cluster name. |
Source code in src/zenml/orchestrators/wheeled_orchestrator.py
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
|
Modules
base_orchestrator
Base orchestrator class.
Classes
BaseOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)
Bases: StackComponent
, ABC
Base class for all orchestrators.
In order to implement an orchestrator you will need to subclass from this class.
How it works:
The run(...)
method is the entrypoint that is executed when the
pipeline's run method is called within the user code
(pipeline_instance.run(...)
).
This method will do some internal preparation and then call the
prepare_or_run_pipeline(...)
method. BaseOrchestrator subclasses must
implement this method and either run the pipeline steps directly or deploy
the pipeline to some remote infrastructure.
Source code in src/zenml/stack/stack_component.py
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 |
|
config: BaseOrchestratorConfig
property
Returns the BaseOrchestratorConfig
config.
Returns:
Type | Description |
---|---|
BaseOrchestratorConfig
|
The configuration. |
fetch_status(run: PipelineRunResponse) -> ExecutionStatus
Refreshes the status of a specific pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run
|
PipelineRunResponse
|
A pipeline run response to fetch its status. |
required |
Raises:
Type | Description |
---|---|
NotImplementedError
|
If any orchestrator inheriting from the base class does not implement this logic. |
Source code in src/zenml/orchestrators/base_orchestrator.py
319 320 321 322 323 324 325 326 327 328 329 330 331 332 |
|
get_orchestrator_run_id() -> str
abstractmethod
Returns the run id of the active orchestrator run.
Important: This needs to be a unique ID and return the same value for all steps of a pipeline run.
Returns:
Type | Description |
---|---|
str
|
The orchestrator run id. |
Source code in src/zenml/orchestrators/base_orchestrator.py
126 127 128 129 130 131 132 133 134 135 |
|
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Optional[Iterator[Dict[str, MetadataType]]]
abstractmethod
The method needs to be implemented by the respective orchestrator.
Depending on the type of orchestrator you'll have to perform slightly different operations.
Simple Case:
The Steps are run directly from within the same environment in which
the orchestrator code is executed. In this case you will need to
deal with implementation-specific runtime configurations (like the
schedule) and then iterate through the steps and finally call
self.run_step(...)
to execute each step.
Advanced Case:
Most orchestrators will not run the steps directly. Instead, they build some intermediate representation of the pipeline that is then used to create and run the pipeline and its steps on the target environment. For such orchestrators this method will have to build this representation and deploy it.
Regardless of the implementation details, the orchestrator will need
to run each step in the target environment. For this the
self.run_step(...)
method should be used.
The easiest way to make this work is by using an entrypoint
configuration to run single steps (zenml.entrypoints.step_entrypoint_configuration.StepEntrypointConfiguration
)
or entire pipelines (zenml.entrypoints.pipeline_entrypoint_configuration.PipelineEntrypointConfiguration
).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment
|
PipelineDeploymentResponse
|
The pipeline deployment to prepare or run. |
required |
stack
|
Stack
|
The stack the pipeline will run on. |
required |
environment
|
Dict[str, str]
|
Environment variables to set in the orchestration environment. These don't need to be set if running locally. |
required |
placeholder_run
|
Optional[PipelineRunResponse]
|
An optional placeholder run for the deployment. |
None
|
Yields:
Type | Description |
---|---|
Optional[Iterator[Dict[str, MetadataType]]]
|
Metadata for the pipeline run. |
Source code in src/zenml/orchestrators/base_orchestrator.py
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
|
requires_resources_in_orchestration_environment(step: Step) -> bool
staticmethod
Checks if the orchestrator should run this step on special resources.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step
|
Step
|
The step that will be checked. |
required |
Returns:
Type | Description |
---|---|
bool
|
True if the step requires special resources in the orchestration |
bool
|
environment, False otherwise. |
Source code in src/zenml/orchestrators/base_orchestrator.py
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 |
|
run(deployment: PipelineDeploymentResponse, stack: Stack, placeholder_run: Optional[PipelineRunResponse] = None) -> None
Runs a pipeline on a stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment
|
PipelineDeploymentResponse
|
The pipeline deployment. |
required |
stack
|
Stack
|
The stack on which to run the pipeline. |
required |
placeholder_run
|
Optional[PipelineRunResponse]
|
An optional placeholder run for the deployment. This will be deleted in case the pipeline deployment failed. |
None
|
Source code in src/zenml/orchestrators/base_orchestrator.py
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
|
run_step(step: Step) -> None
Runs the given step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step
|
Step
|
The step to run. |
required |
Source code in src/zenml/orchestrators/base_orchestrator.py
272 273 274 275 276 277 278 279 280 281 282 283 284 |
|
BaseOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)
Bases: StackComponentConfig
Base orchestrator config.
Source code in src/zenml/stack/stack_component.py
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
|
is_schedulable: bool
property
Whether the orchestrator is schedulable or not.
Returns:
Type | Description |
---|---|
bool
|
Whether the orchestrator is schedulable or not. |
is_synchronous: bool
property
Whether the orchestrator runs synchronous or not.
Returns:
Type | Description |
---|---|
bool
|
Whether the orchestrator runs synchronous or not. |
supports_client_side_caching: bool
property
Whether the orchestrator supports client side caching.
Returns:
Type | Description |
---|---|
bool
|
Whether the orchestrator supports client side caching. |
BaseOrchestratorFlavor
Bases: Flavor
Base orchestrator flavor class.
config_class: Type[BaseOrchestratorConfig]
property
Config class for the base orchestrator flavor.
Returns:
Type | Description |
---|---|
Type[BaseOrchestratorConfig]
|
The config class. |
implementation_class: Type[BaseOrchestrator]
abstractmethod
property
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[BaseOrchestrator]
|
The implementation class. |
type: StackComponentType
property
Functions
cache_utils
Utilities for caching.
Classes
Functions
generate_cache_key(step: Step, input_artifact_ids: Dict[str, UUID], artifact_store: BaseArtifactStore, project_id: UUID) -> str
Generates a cache key for a step run.
If the cache key is the same for two step runs, we conclude that the step runs are identical and can be cached.
The cache key is a MD5 hash of: - the project ID, - the artifact store ID and path, - the source code that defines the step, - the parameters of the step, - the names and IDs of the input artifacts of the step, - the names and source codes of the output artifacts of the step, - the source codes of the output materializers of the step. - additional custom caching parameters of the step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step
|
Step
|
The step to generate the cache key for. |
required |
input_artifact_ids
|
Dict[str, UUID]
|
The input artifact IDs for the step. |
required |
artifact_store
|
BaseArtifactStore
|
The artifact store of the active stack. |
required |
project_id
|
UUID
|
The ID of the active project. |
required |
Returns:
Type | Description |
---|---|
str
|
A cache key. |
Source code in src/zenml/orchestrators/cache_utils.py
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
|
get_cached_step_run(cache_key: str) -> Optional[StepRunResponse]
If a given step can be cached, get the corresponding existing step run.
A step run can be cached if there is an existing step run in the same project which has the same cache key and was successfully executed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cache_key
|
str
|
The cache key of the step. |
required |
Returns:
Type | Description |
---|---|
Optional[StepRunResponse]
|
The existing step run if the step can be cached, otherwise None. |
Source code in src/zenml/orchestrators/cache_utils.py
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
|
containerized_orchestrator
Containerized orchestrator class.
Classes
ContainerizedOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)
Bases: BaseOrchestrator
, ABC
Base class for containerized orchestrators.
Source code in src/zenml/stack/stack_component.py
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 |
|
get_docker_builds(deployment: PipelineDeploymentBase) -> List[BuildConfiguration]
Gets the Docker builds required for the component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment
|
PipelineDeploymentBase
|
The pipeline deployment for which to get the builds. |
required |
Returns:
Type | Description |
---|---|
List[BuildConfiguration]
|
The required Docker builds. |
Source code in src/zenml/orchestrators/containerized_orchestrator.py
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
|
get_image(deployment: PipelineDeploymentResponse, step_name: Optional[str] = None) -> str
staticmethod
Gets the Docker image for the pipeline/a step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment
|
PipelineDeploymentResponse
|
The deployment from which to get the image. |
required |
step_name
|
Optional[str]
|
Pipeline step name for which to get the image. If not given the generic pipeline image will be returned. |
None
|
Raises:
Type | Description |
---|---|
RuntimeError
|
If the deployment does not have an associated build. |
Returns:
Type | Description |
---|---|
str
|
The image name or digest. |
Source code in src/zenml/orchestrators/containerized_orchestrator.py
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
|
dag_runner
DAG (Directed Acyclic Graph) Runners.
Classes
NodeStatus
Bases: Enum
Status of the execution of a node.
ThreadedDagRunner(dag: Dict[str, List[str]], run_fn: Callable[[str], Any], finalize_fn: Optional[Callable[[Dict[str, NodeStatus]], None]] = None, parallel_node_startup_waiting_period: float = 0.0)
Multi-threaded DAG Runner.
This class expects a DAG of strings in adjacency list representation, as
well as a custom run_fn
as input, then calls run_fn(node)
for each
string node in the DAG.
Steps that can be executed in parallel will be started in separate threads.
Define attributes and initialize all nodes in waiting state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dag
|
Dict[str, List[str]]
|
Adjacency list representation of a DAG.
E.g.: [(1->2), (1->3), (2->4), (3->4)] should be represented as
|
required |
run_fn
|
Callable[[str], Any]
|
A function |
required |
finalize_fn
|
Optional[Callable[[Dict[str, NodeStatus]], None]]
|
A function |
None
|
parallel_node_startup_waiting_period
|
float
|
Delay in seconds to wait in between starting parallel nodes. |
0.0
|
Source code in src/zenml/orchestrators/dag_runner.py
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
|
run() -> None
Call self.run_fn
on all nodes in self.dag
.
The order of execution is determined using topological sort. Each node is run in a separate thread to enable parallelism.
Source code in src/zenml/orchestrators/dag_runner.py
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 |
|
Functions
reverse_dag(dag: Dict[str, List[str]]) -> Dict[str, List[str]]
Reverse a DAG.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dag
|
Dict[str, List[str]]
|
Adjacency list representation of a DAG. |
required |
Returns:
Type | Description |
---|---|
Dict[str, List[str]]
|
Adjacency list representation of the reversed DAG. |
Source code in src/zenml/orchestrators/dag_runner.py
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
|
input_utils
Utilities for inputs.
Classes
Functions
resolve_step_inputs(step: Step, pipeline_run: PipelineRunResponse) -> Tuple[Dict[str, StepRunInputResponse], List[UUID]]
Resolves inputs for the current step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step
|
Step
|
The step for which to resolve the inputs. |
required |
pipeline_run
|
PipelineRunResponse
|
The current pipeline run. |
required |
Raises:
Type | Description |
---|---|
InputResolutionError
|
If input resolving failed due to a missing step or output. |
ValueError
|
If object from model version passed into a step cannot be resolved in runtime due to missing object. |
Returns:
Type | Description |
---|---|
Tuple[Dict[str, StepRunInputResponse], List[UUID]]
|
The IDs of the input artifact versions and the IDs of parent steps of the current step. |
Source code in src/zenml/orchestrators/input_utils.py
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
|
Modules
local
Initialization for the local orchestrator.
Modules
local_orchestrator
Implementation of the ZenML local orchestrator.
LocalOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)
Bases: BaseOrchestrator
Orchestrator responsible for running pipelines locally.
This orchestrator does not allow for concurrent execution of steps and also does not support running on a schedule.
Source code in src/zenml/stack/stack_component.py
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 |
|
get_orchestrator_run_id() -> str
Returns the active orchestrator run id.
Raises:
Type | Description |
---|---|
RuntimeError
|
If no run id exists. This happens when this method gets called while the orchestrator is not running a pipeline. |
Returns:
Type | Description |
---|---|
str
|
The orchestrator run id. |
Source code in src/zenml/orchestrators/local/local_orchestrator.py
91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
|
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Any
Iterates through all steps and executes them sequentially.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment
|
PipelineDeploymentResponse
|
The pipeline deployment to prepare or run. |
required |
stack
|
Stack
|
The stack on which the pipeline is deployed. |
required |
environment
|
Dict[str, str]
|
Environment variables to set in the orchestration environment. |
required |
placeholder_run
|
Optional[PipelineRunResponse]
|
An optional placeholder run for the deployment. |
None
|
Source code in src/zenml/orchestrators/local/local_orchestrator.py
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
|
LocalOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)
Bases: BaseOrchestratorConfig
Local orchestrator config.
Source code in src/zenml/stack/stack_component.py
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
|
is_local: bool
property
Checks if this stack component is running locally.
Returns:
Type | Description |
---|---|
bool
|
True if this config is for a local component, False otherwise. |
is_synchronous: bool
property
Whether the orchestrator runs synchronous or not.
Returns:
Type | Description |
---|---|
bool
|
Whether the orchestrator runs synchronous or not. |
LocalOrchestratorFlavor
Bases: BaseOrchestratorFlavor
Class for the LocalOrchestratorFlavor
.
config_class: Type[BaseOrchestratorConfig]
property
Config class for the base orchestrator flavor.
Returns:
Type | Description |
---|---|
Type[BaseOrchestratorConfig]
|
The config class. |
docs_url: Optional[str]
property
A URL to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str]
|
A flavor docs url. |
implementation_class: Type[LocalOrchestrator]
property
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[LocalOrchestrator]
|
The implementation class for this flavor. |
logo_url: str
property
A URL to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str
|
The flavor logo. |
name: str
property
The flavor name.
Returns:
Type | Description |
---|---|
str
|
The flavor name. |
sdk_docs_url: Optional[str]
property
A URL to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str]
|
A flavor SDK docs url. |
local_docker
Initialization for the local Docker orchestrator.
Modules
local_docker_orchestrator
Implementation of the ZenML local Docker orchestrator.
LocalDockerOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)
Bases: ContainerizedOrchestrator
Orchestrator responsible for running pipelines locally using Docker.
This orchestrator does not allow for concurrent execution of steps and also does not support running on a schedule.
Source code in src/zenml/stack/stack_component.py
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 |
|
settings_class: Optional[Type[BaseSettings]]
property
Settings class for the Local Docker orchestrator.
Returns:
Type | Description |
---|---|
Optional[Type[BaseSettings]]
|
The settings class. |
validator: Optional[StackValidator]
property
Ensures there is an image builder in the stack.
Returns:
Type | Description |
---|---|
Optional[StackValidator]
|
A |
get_orchestrator_run_id() -> str
Returns the active orchestrator run id.
Raises:
Type | Description |
---|---|
RuntimeError
|
If the environment variable specifying the run id is not set. |
Returns:
Type | Description |
---|---|
str
|
The orchestrator run id. |
Source code in src/zenml/orchestrators/local_docker/local_docker_orchestrator.py
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
|
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Any
Sequentially runs all pipeline steps in local Docker containers.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment
|
PipelineDeploymentResponse
|
The pipeline deployment to prepare or run. |
required |
stack
|
Stack
|
The stack the pipeline will run on. |
required |
environment
|
Dict[str, str]
|
Environment variables to set in the orchestration environment. |
required |
placeholder_run
|
Optional[PipelineRunResponse]
|
An optional placeholder run for the deployment. |
None
|
Raises:
Type | Description |
---|---|
RuntimeError
|
If a step fails. |
Source code in src/zenml/orchestrators/local_docker/local_docker_orchestrator.py
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
|
LocalDockerOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)
Bases: BaseOrchestratorConfig
, LocalDockerOrchestratorSettings
Local Docker orchestrator config.
Source code in src/zenml/stack/stack_component.py
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
|
is_local: bool
property
Checks if this stack component is running locally.
Returns:
Type | Description |
---|---|
bool
|
True if this config is for a local component, False otherwise. |
is_synchronous: bool
property
Whether the orchestrator runs synchronous or not.
Returns:
Type | Description |
---|---|
bool
|
Whether the orchestrator runs synchronous or not. |
LocalDockerOrchestratorFlavor
Bases: BaseOrchestratorFlavor
Flavor for the local Docker orchestrator.
config_class: Type[BaseOrchestratorConfig]
property
Config class for the base orchestrator flavor.
Returns:
Type | Description |
---|---|
Type[BaseOrchestratorConfig]
|
The config class. |
docs_url: Optional[str]
property
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str]
|
A flavor docs url. |
implementation_class: Type[LocalDockerOrchestrator]
property
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[LocalDockerOrchestrator]
|
Implementation class for this flavor. |
logo_url: str
property
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str
|
The flavor logo. |
name: str
property
Name of the orchestrator flavor.
Returns:
Type | Description |
---|---|
str
|
Name of the orchestrator flavor. |
sdk_docs_url: Optional[str]
property
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str]
|
A flavor SDK docs url. |
LocalDockerOrchestratorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)
Bases: BaseSettings
Local Docker orchestrator settings.
Attributes:
Name | Type | Description |
---|---|---|
run_args |
Dict[str, Any]
|
Arguments to pass to the |
Source code in src/zenml/config/secret_reference_mixin.py
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
|
output_utils
Utilities for outputs.
Classes
Functions
generate_artifact_uri(artifact_store: BaseArtifactStore, step_run: StepRunResponse, output_name: str) -> str
Generates a URI for an output artifact.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifact_store
|
BaseArtifactStore
|
The artifact store on which the artifact will be stored. |
required |
step_run
|
StepRunResponse
|
The step run that created the artifact. |
required |
output_name
|
str
|
The name of the output in the step run for this artifact. |
required |
Returns:
Type | Description |
---|---|
str
|
The URI of the output artifact. |
Source code in src/zenml/orchestrators/output_utils.py
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
|
prepare_output_artifact_uris(step_run: StepRunResponse, stack: Stack, step: Step) -> Dict[str, str]
Prepares the output artifact URIs to run the current step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_run
|
StepRunResponse
|
The step run for which to prepare the artifact URIs. |
required |
stack
|
Stack
|
The stack on which the pipeline is running. |
required |
step
|
Step
|
The step configuration. |
required |
Raises:
Type | Description |
---|---|
RuntimeError
|
If an artifact URI already exists. |
Returns:
Type | Description |
---|---|
Dict[str, str]
|
A dictionary mapping output names to artifact URIs. |
Source code in src/zenml/orchestrators/output_utils.py
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
|
remove_artifact_dirs(artifact_uris: Sequence[str]) -> None
Removes the artifact directories.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifact_uris
|
Sequence[str]
|
URIs of the artifacts to remove the directories for. |
required |
Source code in src/zenml/orchestrators/output_utils.py
94 95 96 97 98 99 100 101 102 103 |
|
Modules
publish_utils
Utilities to publish pipeline and step runs.
Classes
Functions
get_pipeline_run_status(step_statuses: List[ExecutionStatus], num_steps: int) -> ExecutionStatus
Gets the pipeline run status for the given step statuses.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_statuses
|
List[ExecutionStatus]
|
The status of steps in this run. |
required |
num_steps
|
int
|
The total amount of steps in this run. |
required |
Returns:
Type | Description |
---|---|
ExecutionStatus
|
The run status. |
Source code in src/zenml/orchestrators/publish_utils.py
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
|
publish_failed_pipeline_run(pipeline_run_id: UUID) -> PipelineRunResponse
Publishes a failed pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_run_id
|
UUID
|
The ID of the pipeline run to update. |
required |
Returns:
Type | Description |
---|---|
PipelineRunResponse
|
The updated pipeline run. |
Source code in src/zenml/orchestrators/publish_utils.py
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
|
publish_failed_step_run(step_run_id: UUID) -> StepRunResponse
Publishes a failed step run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_run_id
|
UUID
|
The ID of the step run to update. |
required |
Returns:
Type | Description |
---|---|
StepRunResponse
|
The updated step run. |
Source code in src/zenml/orchestrators/publish_utils.py
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
|
publish_pipeline_run_metadata(pipeline_run_id: UUID, pipeline_run_metadata: Dict[UUID, Dict[str, MetadataType]]) -> None
Publishes the given pipeline run metadata.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_run_id
|
UUID
|
The ID of the pipeline run. |
required |
pipeline_run_metadata
|
Dict[UUID, Dict[str, MetadataType]]
|
A dictionary mapping stack component IDs to the metadata they created. |
required |
Source code in src/zenml/orchestrators/publish_utils.py
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
|
publish_step_run_metadata(step_run_id: UUID, step_run_metadata: Dict[UUID, Dict[str, MetadataType]]) -> None
Publishes the given step run metadata.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_run_id
|
UUID
|
The ID of the step run. |
required |
step_run_metadata
|
Dict[UUID, Dict[str, MetadataType]]
|
A dictionary mapping stack component IDs to the metadata they created. |
required |
Source code in src/zenml/orchestrators/publish_utils.py
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
|
publish_successful_step_run(step_run_id: UUID, output_artifact_ids: Dict[str, List[UUID]]) -> StepRunResponse
Publishes a successful step run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_run_id
|
UUID
|
The ID of the step run to update. |
required |
output_artifact_ids
|
Dict[str, List[UUID]]
|
The output artifact IDs for the step run. |
required |
Returns:
Type | Description |
---|---|
StepRunResponse
|
The updated step run. |
Source code in src/zenml/orchestrators/publish_utils.py
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
|
step_launcher
Class to launch (run directly or using a step operator) steps.
Classes
StepLauncher(deployment: PipelineDeploymentResponse, step: Step, orchestrator_run_id: str)
A class responsible for launching a step of a ZenML pipeline.
This class follows these steps to launch and publish a ZenML step:
1. Publish or reuse a PipelineRun
2. Resolve the input artifacts of the step
3. Generate a cache key for the step
4. Check if the step can be cached or not
5. Publish a new StepRun
6. If the step can't be cached, the step will be executed in one of these
two ways depending on its configuration:
- Calling a step operator
to run the step in a different environment
- Calling a step runner
to run the step in the current environment
7. Update the status of the previously published StepRun
8. Update the status of the PipelineRun
Initializes the launcher.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment
|
PipelineDeploymentResponse
|
The pipeline deployment. |
required |
step
|
Step
|
The step to launch. |
required |
orchestrator_run_id
|
str
|
The orchestrator pipeline run id. |
required |
Raises:
Type | Description |
---|---|
RuntimeError
|
If the deployment has no associated stack. |
Source code in src/zenml/orchestrators/step_launcher.py
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
|
launch() -> None
Launches the step.
Raises:
Type | Description |
---|---|
BaseException
|
If the step failed to launch, run, or publish. |
Source code in src/zenml/orchestrators/step_launcher.py
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 |
|
Functions
Modules
step_run_utils
Utilities for creating step runs.
Classes
StepRunRequestFactory(deployment: PipelineDeploymentResponse, pipeline_run: PipelineRunResponse, stack: Stack)
Helper class to create step run requests.
Initialize the object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment
|
PipelineDeploymentResponse
|
The deployment for which to create step run requests. |
required |
pipeline_run
|
PipelineRunResponse
|
The pipeline run for which to create step run requests. |
required |
stack
|
Stack
|
The stack on which the pipeline run is happening. |
required |
Source code in src/zenml/orchestrators/step_run_utils.py
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
|
create_request(invocation_id: str) -> StepRunRequest
Create a step run request.
This will only create a request with basic information and will not yet
compute information like the cache key and inputs. This is separated
into a different method populate_request(...)
that might raise
exceptions while trying to compute this information.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
invocation_id
|
str
|
The invocation ID for which to create the request. |
required |
Returns:
Type | Description |
---|---|
StepRunRequest
|
The step run request. |
Source code in src/zenml/orchestrators/step_run_utils.py
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
|
populate_request(request: StepRunRequest) -> None
Populate a step run request with additional information.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request
|
StepRunRequest
|
The request to populate. |
required |
Source code in src/zenml/orchestrators/step_run_utils.py
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
|
Functions
create_cached_step_runs(deployment: PipelineDeploymentResponse, pipeline_run: PipelineRunResponse, stack: Stack) -> Set[str]
Create all cached step runs for a pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment
|
PipelineDeploymentResponse
|
The deployment of the pipeline run. |
required |
pipeline_run
|
PipelineRunResponse
|
The pipeline run for which to create the step runs. |
required |
stack
|
Stack
|
The stack on which the pipeline run is happening. |
required |
Returns:
Type | Description |
---|---|
Set[str]
|
The invocation IDs of the created step runs. |
Source code in src/zenml/orchestrators/step_run_utils.py
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 |
|
find_cacheable_invocation_candidates(deployment: PipelineDeploymentResponse, finished_invocations: Set[str]) -> Set[str]
Find invocations that can potentially be cached.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment
|
PipelineDeploymentResponse
|
The pipeline deployment containing the invocations. |
required |
finished_invocations
|
Set[str]
|
A set of invocations that are already finished. |
required |
Returns:
Type | Description |
---|---|
Set[str]
|
The set of invocations that can potentially be cached. |
Source code in src/zenml/orchestrators/step_run_utils.py
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 |
|
link_output_artifacts_to_model_version(artifacts: Dict[str, List[ArtifactVersionResponse]], model_version: ModelVersionResponse) -> None
Link the outputs of a step run to a model version.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifacts
|
Dict[str, List[ArtifactVersionResponse]]
|
The step output artifacts. |
required |
model_version
|
ModelVersionResponse
|
The model version to link. |
required |
Source code in src/zenml/orchestrators/step_run_utils.py
369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 |
|
log_model_version_dashboard_url(model_version: ModelVersionResponse) -> None
Log the dashboard URL for a model version.
If the current server is not a ZenML Pro workspace, a fallback message is logged instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_version
|
ModelVersionResponse
|
The model version for which to log the dashboard URL. |
required |
Source code in src/zenml/orchestrators/step_run_utils.py
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 |
|
Modules
step_runner
Class to run steps.
Classes
StepRunner(step: Step, stack: Stack)
Class to run steps.
Initializes the step runner.
Parameters:
Source code in src/zenml/orchestrators/step_runner.py
86 87 88 89 90 91 92 93 94 |
|
configuration: StepConfiguration
property
Configuration of the step to run.
Returns:
Type | Description |
---|---|
StepConfiguration
|
The step configuration. |
load_and_run_hook(hook_source: Source, step_exception: Optional[BaseException]) -> None
Loads hook source and runs the hook.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
hook_source
|
Source
|
The source of the hook function. |
required |
step_exception
|
Optional[BaseException]
|
The exception of the original step. |
required |
Source code in src/zenml/orchestrators/step_runner.py
671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 |
|
run(pipeline_run: PipelineRunResponse, step_run: StepRunResponse, input_artifacts: Dict[str, StepRunInputResponse], output_artifact_uris: Dict[str, str], step_run_info: StepRunInfo) -> None
Runs the step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_run
|
PipelineRunResponse
|
The model of the current pipeline run. |
required |
step_run
|
StepRunResponse
|
The model of the current step run. |
required |
input_artifacts
|
Dict[str, StepRunInputResponse]
|
The input artifact versions of the step. |
required |
output_artifact_uris
|
Dict[str, str]
|
The URIs of the output artifacts of the step. |
required |
step_run_info
|
StepRunInfo
|
The step run info. |
required |
Raises:
Type | Description |
---|---|
BaseException
|
A general exception if the step fails. |
Source code in src/zenml/orchestrators/step_runner.py
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 |
|
Functions
Modules
topsort
Utilities for topological sort.
Implementation heavily inspired by TFX: https://github.com/tensorflow/tfx/blob/master/tfx/utils/topsort.py
Functions
topsorted_layers(nodes: Sequence[NodeT], get_node_id_fn: Callable[[NodeT], str], get_parent_nodes: Callable[[NodeT], List[NodeT]], get_child_nodes: Callable[[NodeT], List[NodeT]]) -> List[List[NodeT]]
Sorts the DAG of nodes in topological order.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
nodes
|
Sequence[NodeT]
|
A sequence of nodes. |
required |
get_node_id_fn
|
Callable[[NodeT], str]
|
Callable that returns a unique text identifier for a node. |
required |
get_parent_nodes
|
Callable[[NodeT], List[NodeT]]
|
Callable that returns a list of parent nodes for a node. If a parent node's id is not found in the list of node ids, that parent node will be omitted. |
required |
get_child_nodes
|
Callable[[NodeT], List[NodeT]]
|
Callable that returns a list of child nodes for a node. If a child node's id is not found in the list of node ids, that child node will be omitted. |
required |
Returns:
Type | Description |
---|---|
List[List[NodeT]]
|
A list of topologically ordered node layers. Each layer of nodes is sorted |
List[List[NodeT]]
|
by its node id given by |
Raises:
Type | Description |
---|---|
RuntimeError
|
If the input nodes don't form a DAG. |
ValueError
|
If the nodes are not unique. |
Source code in src/zenml/orchestrators/topsort.py
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
|
utils
Utility functions for the orchestrator.
Classes
register_artifact_store_filesystem(target_artifact_store_id: Optional[UUID])
Context manager for the artifact_store/filesystem_registry dependency.
Even though it is rare, sometimes we bump into cases where we are trying to load artifacts that belong to an artifact store which is different from the active artifact store.
In cases like this, we will try to instantiate the target artifact store by creating the corresponding artifact store Python object, which ends up registering the right filesystem in the filesystem registry.
The problem is, the keys in the filesystem registry are schemes (such as "s3://" or "gcs://"). If we have two artifact stores with the same set of supported schemes, we might end up overwriting the filesystem that belongs to the active artifact store (and its authentication). That's why we have to re-instantiate the active artifact store again, so the correct filesystem will be restored.
Initialization of the context manager.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
target_artifact_store_id
|
Optional[UUID]
|
the ID of the artifact store to load. |
required |
Source code in src/zenml/orchestrators/utils.py
245 246 247 248 249 250 251 |
|
Functions
get_config_environment_vars(schedule_id: Optional[UUID] = None, pipeline_run_id: Optional[UUID] = None, step_run_id: Optional[UUID] = None) -> Dict[str, str]
Gets environment variables to set for mirroring the active config.
If a schedule ID, pipeline run ID or step run ID is given, and the current client is not authenticated to a server with an API key, the environment variables will be updated to include a newly generated workload API token that will be valid for the duration of the schedule, pipeline run, or step run instead of the current API token used to authenticate the client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
schedule_id
|
Optional[UUID]
|
Optional schedule ID to use to generate a new API token. |
None
|
pipeline_run_id
|
Optional[UUID]
|
Optional pipeline run ID to use to generate a new API token. |
None
|
step_run_id
|
Optional[UUID]
|
Optional step run ID to use to generate a new API token. |
None
|
Returns:
Type | Description |
---|---|
Dict[str, str]
|
Environment variable dict. |
Source code in src/zenml/orchestrators/utils.py
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
|
get_orchestrator_run_name(pipeline_name: str, max_length: Optional[int] = None) -> str
Gets an orchestrator run name.
This run name is not the same as the ZenML run name but can instead be used to display in the orchestrator UI.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name
|
str
|
Name of the pipeline that will run. |
required |
max_length
|
Optional[int]
|
Maximum length of the generated name. |
None
|
Raises:
Type | Description |
---|---|
ValueError
|
If the max length is below 8 characters. |
Returns:
Type | Description |
---|---|
str
|
The orchestrator run name. |
Source code in src/zenml/orchestrators/utils.py
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
|
is_setting_enabled(is_enabled_on_step: Optional[bool], is_enabled_on_pipeline: Optional[bool]) -> bool
Checks if a certain setting is enabled within a step run.
This is the case if: - the setting is explicitly enabled for the step, or - the setting is neither explicitly disabled for the step nor the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
is_enabled_on_step
|
Optional[bool]
|
The setting of the step. |
required |
is_enabled_on_pipeline
|
Optional[bool]
|
The setting of the pipeline. |
required |
Returns:
Type | Description |
---|---|
bool
|
True if the setting is enabled within the step run, False otherwise. |
Source code in src/zenml/orchestrators/utils.py
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
|
wheeled_orchestrator
Wheeled orchestrator class.
Classes
WheeledOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)
Bases: BaseOrchestrator
, ABC
Base class for wheeled orchestrators.
Source code in src/zenml/stack/stack_component.py
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 |
|
copy_repository_to_temp_dir_and_add_setup_py() -> str
Copy the repository to a temporary directory and add a setup.py file.
Returns:
Type | Description |
---|---|
str
|
Path to the temporary directory containing the copied repository. |
Source code in src/zenml/orchestrators/wheeled_orchestrator.py
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
|
create_wheel(temp_dir: str) -> str
Create a wheel for the package in the given temporary directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
temp_dir
|
str
|
Path to the temporary directory containing the package. |
required |
Raises:
Type | Description |
---|---|
RuntimeError
|
If the wheel file could not be created. |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
Path to the created wheel file. |
Source code in src/zenml/orchestrators/wheeled_orchestrator.py
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
|
sanitize_name(name: str) -> str
Sanitize the value to be used in a cluster name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Arbitrary input cluster name. |
required |
Returns:
Type | Description |
---|---|
str
|
Sanitized cluster name. |
Source code in src/zenml/orchestrators/wheeled_orchestrator.py
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
|