Skip to content

Entrypoints

zenml.entrypoints

Initializations for ZenML entrypoints module.

Attributes

__all__ = ['StepEntrypointConfiguration', 'PipelineEntrypointConfiguration'] module-attribute

Classes

PipelineEntrypointConfiguration(arguments: List[str])

Bases: BaseEntrypointConfiguration

Base class for entrypoint configurations that run an entire pipeline.

Source code in src/zenml/entrypoints/base_entrypoint_configuration.py
60
61
62
63
64
65
66
def __init__(self, arguments: List[str]):
    """Initializes the entrypoint configuration.

    Args:
        arguments: Command line arguments to configure this object.
    """
    self.entrypoint_args = self._parse_arguments(arguments)
Functions
run() -> None

Prepares the environment and runs the configured pipeline.

Source code in src/zenml/entrypoints/pipeline_entrypoint_configuration.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def run(self) -> None:
    """Prepares the environment and runs the configured pipeline."""
    deployment = self.load_deployment()

    # Activate all the integrations. This makes sure that all materializers
    # and stack component flavors are registered.
    integration_registry.activate_integrations()

    self.download_code_if_necessary(deployment=deployment)

    orchestrator = Client().active_stack.orchestrator
    orchestrator._prepare_run(deployment=deployment)

    for step in deployment.step_configurations.values():
        orchestrator.run_step(step)

StepEntrypointConfiguration(arguments: List[str])

Bases: BaseEntrypointConfiguration

Base class for entrypoint configurations that run a single step.

If an orchestrator needs to run steps in a separate process or environment (e.g. a docker container), this class can either be used directly or subclassed if custom behavior is necessary.

How to subclass:

Passing additional arguments to the entrypoint: If you need to pass additional arguments to the entrypoint, there are two methods that you need to implement: * get_entrypoint_options(): This method should return all the options that are required in the entrypoint. Make sure to include the result from the superclass method so the options are complete.

    * `get_entrypoint_arguments(...)`: This method should return
        a list of arguments that should be passed to the entrypoint.
        Make sure to include the result from the superclass method so
        the arguments are complete.

You'll be able to access the argument values from `self.entrypoint_args`
inside your `StepEntrypointConfiguration` subclass.
How to use:

After you created your StepEntrypointConfiguration subclass, you only have to run the entrypoint somewhere. To do this, you should execute the command returned by the get_entrypoint_command() method with the arguments returned by the get_entrypoint_arguments(...) method.

Example:

class MyStepEntrypointConfiguration(StepEntrypointConfiguration):
    ...

class MyOrchestrator(BaseOrchestrator):
    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
        environment: Dict[str, str],
        placeholder_run: Optional["PipelineRunResponse"] = None,
    ) -> Any:
        ...

        cmd = MyStepEntrypointConfiguration.get_entrypoint_command()
        for step_name, step in pipeline.steps.items():
            ...

            args = MyStepEntrypointConfiguration.get_entrypoint_arguments(
                step_name=step_name
            )
            # Run the command and pass it the arguments. Our example
            # orchestrator here executes the entrypoint in a separate
            # process, but in a real-world scenario you would probably run
            # it inside a docker container or a different environment.
            import subprocess
            subprocess.check_call(cmd + args)
Source code in src/zenml/entrypoints/base_entrypoint_configuration.py
60
61
62
63
64
65
66
def __init__(self, arguments: List[str]):
    """Initializes the entrypoint configuration.

    Args:
        arguments: Command line arguments to configure this object.
    """
    self.entrypoint_args = self._parse_arguments(arguments)
Functions
get_entrypoint_arguments(**kwargs: Any) -> List[str] classmethod

Gets all arguments that the entrypoint command should be called with.

The argument list should be something that argparse.ArgumentParser.parse_args(...) can handle (e.g. ["--some_option", "some_value"] or ["--some_option=some_value"]). It needs to provide values for all options returned by the get_entrypoint_options() method of this class.

Parameters:

Name Type Description Default
**kwargs Any

Kwargs, must include the step name.

{}

Returns:

Type Description
List[str]

The superclass arguments as well as arguments for the name of the

List[str]

step to run.

Source code in src/zenml/entrypoints/step_entrypoint_configuration.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
@classmethod
def get_entrypoint_arguments(
    cls,
    **kwargs: Any,
) -> List[str]:
    """Gets all arguments that the entrypoint command should be called with.

    The argument list should be something that
    `argparse.ArgumentParser.parse_args(...)` can handle (e.g.
    `["--some_option", "some_value"]` or `["--some_option=some_value"]`).
    It needs to provide values for all options returned by the
    `get_entrypoint_options()` method of this class.

    Args:
        **kwargs: Kwargs, must include the step name.

    Returns:
        The superclass arguments as well as arguments for the name of the
        step to run.
    """
    return super().get_entrypoint_arguments(**kwargs) + [
        f"--{STEP_NAME_OPTION}",
        kwargs[STEP_NAME_OPTION],
    ]
get_entrypoint_options() -> Set[str] classmethod

Gets all options required for running with this configuration.

Returns:

Type Description
Set[str]

The superclass options as well as an option for the name of the

Set[str]

step to run.

Source code in src/zenml/entrypoints/step_entrypoint_configuration.py
115
116
117
118
119
120
121
122
123
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
    """Gets all options required for running with this configuration.

    Returns:
        The superclass options as well as an option for the name of the
        step to run.
    """
    return super().get_entrypoint_options() | {STEP_NAME_OPTION}
post_run(pipeline_name: str, step_name: str) -> None

Does cleanup or post-processing after the step finished running.

Subclasses should overwrite this method if they need to run any additional code after the step execution.

Parameters:

Name Type Description Default
pipeline_name str

Name of the parent pipeline of the step that was executed.

required
step_name str

Name of the step that was executed.

required
Source code in src/zenml/entrypoints/step_entrypoint_configuration.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def post_run(
    self,
    pipeline_name: str,
    step_name: str,
) -> None:
    """Does cleanup or post-processing after the step finished running.

    Subclasses should overwrite this method if they need to run any
    additional code after the step execution.

    Args:
        pipeline_name: Name of the parent pipeline of the step that was
            executed.
        step_name: Name of the step that was executed.
    """
run() -> None

Prepares the environment and runs the configured step.

Source code in src/zenml/entrypoints/step_entrypoint_configuration.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def run(self) -> None:
    """Prepares the environment and runs the configured step."""
    deployment = self.load_deployment()

    # Activate all the integrations. This makes sure that all materializers
    # and stack component flavors are registered.
    integration_registry.activate_integrations()

    step_name = self.entrypoint_args[STEP_NAME_OPTION]

    # Change the working directory to make sure we're in the correct
    # directory where the files in the Docker image should be included.
    # This is necessary as some services overwrite the working directory
    # configured in the Docker image itself.
    os.makedirs("/app", exist_ok=True)
    os.chdir("/app")

    self.download_code_if_necessary(
        deployment=deployment, step_name=step_name
    )

    # If the working directory is not in the sys.path, we include it to make
    # sure user code gets correctly imported.
    cwd = os.getcwd()
    if cwd not in sys.path:
        sys.path.insert(0, cwd)

    pipeline_name = deployment.pipeline_configuration.name

    step = deployment.step_configurations[step_name]
    self._run_step(step, deployment=deployment)

    self.post_run(
        pipeline_name=pipeline_name,
        step_name=step_name,
    )

Modules

base_entrypoint_configuration

Abstract base class for entrypoint configurations.

Classes
BaseEntrypointConfiguration(arguments: List[str])

Bases: ABC

Abstract base class for entrypoint configurations.

An entrypoint configuration specifies the arguments that should be passed to the entrypoint and what is running inside the entrypoint.

Attributes:

Name Type Description
entrypoint_args

The parsed arguments passed to the entrypoint.

Initializes the entrypoint configuration.

Parameters:

Name Type Description Default
arguments List[str]

Command line arguments to configure this object.

required
Source code in src/zenml/entrypoints/base_entrypoint_configuration.py
60
61
62
63
64
65
66
def __init__(self, arguments: List[str]):
    """Initializes the entrypoint configuration.

    Args:
        arguments: Command line arguments to configure this object.
    """
    self.entrypoint_args = self._parse_arguments(arguments)
Functions
download_code_from_code_repository(code_reference: CodeReferenceResponse) -> None

Download code from a code repository.

Parameters:

Name Type Description Default
code_reference CodeReferenceResponse

The reference to the code.

required
Source code in src/zenml/entrypoints/base_entrypoint_configuration.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def download_code_from_code_repository(
    self, code_reference: "CodeReferenceResponse"
) -> None:
    """Download code from a code repository.

    Args:
        code_reference: The reference to the code.
    """
    logger.info(
        "Downloading code from code repository `%s` (commit `%s`).",
        code_reference.code_repository.name,
        code_reference.commit,
    )

    model = Client().get_code_repository(code_reference.code_repository.id)
    repo = BaseCodeRepository.from_model(model)
    code_repo_root = os.path.abspath("code")
    download_dir = os.path.join(
        code_repo_root, code_reference.subdirectory
    )
    os.makedirs(download_dir)
    repo.download_files(
        commit=code_reference.commit,
        directory=download_dir,
        repo_sub_directory=code_reference.subdirectory,
    )
    source_utils.set_custom_source_root(download_dir)
    code_repository_utils.set_custom_local_repository(
        root=code_repo_root, commit=code_reference.commit, repo=repo
    )

    sys.path.insert(0, download_dir)
    os.chdir(download_dir)
download_code_if_necessary(deployment: PipelineDeploymentResponse, step_name: Optional[str] = None) -> None

Downloads user code if necessary.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The deployment for which to download the code.

required
step_name Optional[str]

Name of the step to be run. This will be used to determine whether code download is necessary. If not given, the DockerSettings of the pipeline will be used to make that decision instead.

None

Raises:

Type Description
CustomFlavorImportError

If the artifact store flavor can't be imported.

RuntimeError

If the current environment requires code download but the deployment does not have a reference to any code.

Source code in src/zenml/entrypoints/base_entrypoint_configuration.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def download_code_if_necessary(
    self,
    deployment: "PipelineDeploymentResponse",
    step_name: Optional[str] = None,
) -> None:
    """Downloads user code if necessary.

    Args:
        deployment: The deployment for which to download the code.
        step_name: Name of the step to be run. This will be used to
            determine whether code download is necessary. If not given,
            the DockerSettings of the pipeline will be used to make that
            decision instead.

    Raises:
        CustomFlavorImportError: If the artifact store flavor can't be
            imported.
        RuntimeError: If the current environment requires code download
            but the deployment does not have a reference to any code.
    """
    should_download_code = self._should_download_code(
        deployment=deployment, step_name=step_name
    )

    if not should_download_code:
        return

    if code_path := deployment.code_path:
        # Load the artifact store not from the active stack but separately.
        # This is required in case the stack has custom flavor components
        # (other than the artifact store) for which the flavor
        # implementations will only be available once the download finishes.
        try:
            artifact_store = self._load_active_artifact_store()
        except CustomFlavorImportError as e:
            raise CustomFlavorImportError(
                "Failed to import custom artifact store flavor. The "
                "artifact store flavor is needed to download your code, "
                "but it looks like it might be part of the files "
                "that we're trying to download. If this is the case, you "
                "should disable downloading code from the artifact store "
                "using `DockerSettings(allow_download_from_artifact_store=False)` "
                "or make sure the artifact flavor files are included in "
                "Docker image by using a custom parent image or installing "
                "them as part of a pip dependency."
            ) from e
        code_utils.download_code_from_artifact_store(
            code_path=code_path, artifact_store=artifact_store
        )
    elif code_reference := deployment.code_reference:
        # TODO: This might fail if the code repository had unpushed changes
        # at the time the pipeline run was started.
        self.download_code_from_code_repository(
            code_reference=code_reference
        )
    else:
        raise RuntimeError(
            "Code download required but no code reference or path provided."
        )

    logger.info("Code download finished.")
get_entrypoint_arguments(**kwargs: Any) -> List[str] classmethod

Gets all arguments that the entrypoint command should be called with.

The argument list should be something that argparse.ArgumentParser.parse_args(...) can handle (e.g. ["--some_option", "some_value"] or ["--some_option=some_value"]). It needs to provide values for all options returned by the get_entrypoint_options() method of this class.

Parameters:

Name Type Description Default
**kwargs Any

Keyword args.

{}

Returns:

Type Description
List[str]

A list of strings with the arguments.

Raises:

Type Description
ValueError

If no valid deployment ID is passed.

Source code in src/zenml/entrypoints/base_entrypoint_configuration.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
@classmethod
def get_entrypoint_arguments(
    cls,
    **kwargs: Any,
) -> List[str]:
    """Gets all arguments that the entrypoint command should be called with.

    The argument list should be something that
    `argparse.ArgumentParser.parse_args(...)` can handle (e.g.
    `["--some_option", "some_value"]` or `["--some_option=some_value"]`).
    It needs to provide values for all options returned by the
    `get_entrypoint_options()` method of this class.

    Args:
        **kwargs: Keyword args.

    Returns:
        A list of strings with the arguments.

    Raises:
        ValueError: If no valid deployment ID is passed.
    """
    deployment_id = kwargs.get(DEPLOYMENT_ID_OPTION)
    if not uuid_utils.is_valid_uuid(deployment_id):
        raise ValueError(
            f"Missing or invalid deployment ID as argument for entrypoint "
            f"configuration. Please make sure to pass a valid UUID to "
            f"`{cls.__name__}.{cls.get_entrypoint_arguments.__name__}"
            f"({DEPLOYMENT_ID_OPTION}=<UUID>)`."
        )

    arguments = [
        f"--{ENTRYPOINT_CONFIG_SOURCE_OPTION}",
        source_utils.resolve(cls).import_path,
        f"--{DEPLOYMENT_ID_OPTION}",
        str(deployment_id),
    ]

    return arguments
get_entrypoint_command() -> List[str] classmethod

Returns a command that runs the entrypoint module.

This entrypoint module is responsible for running the entrypoint configuration when called. Defaults to running the zenml.entrypoints.entrypoint module.

Note: This command won't work on its own but needs to be called with the arguments returned by the get_entrypoint_arguments(...) method of this class.

Returns:

Type Description
List[str]

A list of strings with the command.

Source code in src/zenml/entrypoints/base_entrypoint_configuration.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@classmethod
def get_entrypoint_command(cls) -> List[str]:
    """Returns a command that runs the entrypoint module.

    This entrypoint module is responsible for running the entrypoint
    configuration when called. Defaults to running the
    `zenml.entrypoints.entrypoint` module.

    **Note**: This command won't work on its own but needs to be called with
        the arguments returned by the `get_entrypoint_arguments(...)`
        method of this class.

    Returns:
        A list of strings with the command.
    """
    return DEFAULT_ENTRYPOINT_COMMAND
get_entrypoint_options() -> Set[str] classmethod

Gets all options required for running with this configuration.

Returns:

Type Description
Set[str]

A set of strings with all required options.

Source code in src/zenml/entrypoints/base_entrypoint_configuration.py
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
    """Gets all options required for running with this configuration.

    Returns:
        A set of strings with all required options.
    """
    return {
        # Importable source pointing to the entrypoint configuration class
        # that should be used inside the entrypoint.
        ENTRYPOINT_CONFIG_SOURCE_OPTION,
        # ID of the pipeline deployment to use in this entrypoint
        DEPLOYMENT_ID_OPTION,
    }
load_deployment() -> PipelineDeploymentResponse

Loads the deployment.

Returns:

Type Description
PipelineDeploymentResponse

The deployment.

Source code in src/zenml/entrypoints/base_entrypoint_configuration.py
186
187
188
189
190
191
192
193
def load_deployment(self) -> "PipelineDeploymentResponse":
    """Loads the deployment.

    Returns:
        The deployment.
    """
    deployment_id = UUID(self.entrypoint_args[DEPLOYMENT_ID_OPTION])
    return Client().zen_store.get_deployment(deployment_id=deployment_id)
run() -> None abstractmethod

Runs the entrypoint configuration.

Source code in src/zenml/entrypoints/base_entrypoint_configuration.py
344
345
346
@abstractmethod
def run(self) -> None:
    """Runs the entrypoint configuration."""
Functions
Modules

entrypoint

Functionality to run ZenML steps or pipelines.

Classes
Functions
main() -> None

Runs the entrypoint configuration given by the command line arguments.

Source code in src/zenml/entrypoints/entrypoint.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def main() -> None:
    """Runs the entrypoint configuration given by the command line arguments."""
    _setup_logging()

    # Make sure this entrypoint does not run an entire pipeline when
    # importing user modules. This could happen if the `pipeline.run()` call
    # is not wrapped in a function or an `if __name__== "__main__":` check)
    constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True

    # Read the source for the entrypoint configuration class from the command
    # line arguments
    parser = argparse.ArgumentParser()
    parser.add_argument(f"--{ENTRYPOINT_CONFIG_SOURCE_OPTION}", required=True)
    args, remaining_args = parser.parse_known_args()

    entrypoint_config_class = source_utils.load_and_validate_class(
        args.entrypoint_config_source,
        expected_class=BaseEntrypointConfiguration,
    )
    entrypoint_config = entrypoint_config_class(arguments=remaining_args)

    entrypoint_config.run()
Modules

pipeline_entrypoint_configuration

Abstract base class for entrypoint configurations that run a pipeline.

Classes
PipelineEntrypointConfiguration(arguments: List[str])

Bases: BaseEntrypointConfiguration

Base class for entrypoint configurations that run an entire pipeline.

Source code in src/zenml/entrypoints/base_entrypoint_configuration.py
60
61
62
63
64
65
66
def __init__(self, arguments: List[str]):
    """Initializes the entrypoint configuration.

    Args:
        arguments: Command line arguments to configure this object.
    """
    self.entrypoint_args = self._parse_arguments(arguments)
Functions
run() -> None

Prepares the environment and runs the configured pipeline.

Source code in src/zenml/entrypoints/pipeline_entrypoint_configuration.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def run(self) -> None:
    """Prepares the environment and runs the configured pipeline."""
    deployment = self.load_deployment()

    # Activate all the integrations. This makes sure that all materializers
    # and stack component flavors are registered.
    integration_registry.activate_integrations()

    self.download_code_if_necessary(deployment=deployment)

    orchestrator = Client().active_stack.orchestrator
    orchestrator._prepare_run(deployment=deployment)

    for step in deployment.step_configurations.values():
        orchestrator.run_step(step)

step_entrypoint_configuration

Base class for entrypoint configurations that run a single step.

Classes
StepEntrypointConfiguration(arguments: List[str])

Bases: BaseEntrypointConfiguration

Base class for entrypoint configurations that run a single step.

If an orchestrator needs to run steps in a separate process or environment (e.g. a docker container), this class can either be used directly or subclassed if custom behavior is necessary.

How to subclass:

Passing additional arguments to the entrypoint: If you need to pass additional arguments to the entrypoint, there are two methods that you need to implement: * get_entrypoint_options(): This method should return all the options that are required in the entrypoint. Make sure to include the result from the superclass method so the options are complete.

    * `get_entrypoint_arguments(...)`: This method should return
        a list of arguments that should be passed to the entrypoint.
        Make sure to include the result from the superclass method so
        the arguments are complete.

You'll be able to access the argument values from `self.entrypoint_args`
inside your `StepEntrypointConfiguration` subclass.
How to use:

After you created your StepEntrypointConfiguration subclass, you only have to run the entrypoint somewhere. To do this, you should execute the command returned by the get_entrypoint_command() method with the arguments returned by the get_entrypoint_arguments(...) method.

Example:

class MyStepEntrypointConfiguration(StepEntrypointConfiguration):
    ...

class MyOrchestrator(BaseOrchestrator):
    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
        environment: Dict[str, str],
        placeholder_run: Optional["PipelineRunResponse"] = None,
    ) -> Any:
        ...

        cmd = MyStepEntrypointConfiguration.get_entrypoint_command()
        for step_name, step in pipeline.steps.items():
            ...

            args = MyStepEntrypointConfiguration.get_entrypoint_arguments(
                step_name=step_name
            )
            # Run the command and pass it the arguments. Our example
            # orchestrator here executes the entrypoint in a separate
            # process, but in a real-world scenario you would probably run
            # it inside a docker container or a different environment.
            import subprocess
            subprocess.check_call(cmd + args)
Source code in src/zenml/entrypoints/base_entrypoint_configuration.py
60
61
62
63
64
65
66
def __init__(self, arguments: List[str]):
    """Initializes the entrypoint configuration.

    Args:
        arguments: Command line arguments to configure this object.
    """
    self.entrypoint_args = self._parse_arguments(arguments)
Functions
get_entrypoint_arguments(**kwargs: Any) -> List[str] classmethod

Gets all arguments that the entrypoint command should be called with.

The argument list should be something that argparse.ArgumentParser.parse_args(...) can handle (e.g. ["--some_option", "some_value"] or ["--some_option=some_value"]). It needs to provide values for all options returned by the get_entrypoint_options() method of this class.

Parameters:

Name Type Description Default
**kwargs Any

Kwargs, must include the step name.

{}

Returns:

Type Description
List[str]

The superclass arguments as well as arguments for the name of the

List[str]

step to run.

Source code in src/zenml/entrypoints/step_entrypoint_configuration.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
@classmethod
def get_entrypoint_arguments(
    cls,
    **kwargs: Any,
) -> List[str]:
    """Gets all arguments that the entrypoint command should be called with.

    The argument list should be something that
    `argparse.ArgumentParser.parse_args(...)` can handle (e.g.
    `["--some_option", "some_value"]` or `["--some_option=some_value"]`).
    It needs to provide values for all options returned by the
    `get_entrypoint_options()` method of this class.

    Args:
        **kwargs: Kwargs, must include the step name.

    Returns:
        The superclass arguments as well as arguments for the name of the
        step to run.
    """
    return super().get_entrypoint_arguments(**kwargs) + [
        f"--{STEP_NAME_OPTION}",
        kwargs[STEP_NAME_OPTION],
    ]
get_entrypoint_options() -> Set[str] classmethod

Gets all options required for running with this configuration.

Returns:

Type Description
Set[str]

The superclass options as well as an option for the name of the

Set[str]

step to run.

Source code in src/zenml/entrypoints/step_entrypoint_configuration.py
115
116
117
118
119
120
121
122
123
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
    """Gets all options required for running with this configuration.

    Returns:
        The superclass options as well as an option for the name of the
        step to run.
    """
    return super().get_entrypoint_options() | {STEP_NAME_OPTION}
post_run(pipeline_name: str, step_name: str) -> None

Does cleanup or post-processing after the step finished running.

Subclasses should overwrite this method if they need to run any additional code after the step execution.

Parameters:

Name Type Description Default
pipeline_name str

Name of the parent pipeline of the step that was executed.

required
step_name str

Name of the step that was executed.

required
Source code in src/zenml/entrypoints/step_entrypoint_configuration.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def post_run(
    self,
    pipeline_name: str,
    step_name: str,
) -> None:
    """Does cleanup or post-processing after the step finished running.

    Subclasses should overwrite this method if they need to run any
    additional code after the step execution.

    Args:
        pipeline_name: Name of the parent pipeline of the step that was
            executed.
        step_name: Name of the step that was executed.
    """
run() -> None

Prepares the environment and runs the configured step.

Source code in src/zenml/entrypoints/step_entrypoint_configuration.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def run(self) -> None:
    """Prepares the environment and runs the configured step."""
    deployment = self.load_deployment()

    # Activate all the integrations. This makes sure that all materializers
    # and stack component flavors are registered.
    integration_registry.activate_integrations()

    step_name = self.entrypoint_args[STEP_NAME_OPTION]

    # Change the working directory to make sure we're in the correct
    # directory where the files in the Docker image should be included.
    # This is necessary as some services overwrite the working directory
    # configured in the Docker image itself.
    os.makedirs("/app", exist_ok=True)
    os.chdir("/app")

    self.download_code_if_necessary(
        deployment=deployment, step_name=step_name
    )

    # If the working directory is not in the sys.path, we include it to make
    # sure user code gets correctly imported.
    cwd = os.getcwd()
    if cwd not in sys.path:
        sys.path.insert(0, cwd)

    pipeline_name = deployment.pipeline_configuration.name

    step = deployment.step_configurations[step_name]
    self._run_step(step, deployment=deployment)

    self.post_run(
        pipeline_name=pipeline_name,
        step_name=step_name,
    )
Functions