Skip to content

Entrypoints

Initializations for ZenML entrypoints module.

PipelineEntrypointConfiguration

Bases: BaseEntrypointConfiguration

Base class for entrypoint configurations that run an entire pipeline.

Source code in src/zenml/entrypoints/pipeline_entrypoint_configuration.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class PipelineEntrypointConfiguration(BaseEntrypointConfiguration):
    """Base class for entrypoint configurations that run an entire pipeline."""

    def run(self) -> None:
        """Prepares the environment and runs the configured pipeline."""
        deployment = self.load_deployment()

        # Activate all the integrations. This makes sure that all materializers
        # and stack component flavors are registered.
        integration_registry.activate_integrations()

        self.download_code_if_necessary(deployment=deployment)

        orchestrator = Client().active_stack.orchestrator
        orchestrator._prepare_run(deployment=deployment)

        for step in deployment.step_configurations.values():
            orchestrator.run_step(step)

run()

Prepares the environment and runs the configured pipeline.

Source code in src/zenml/entrypoints/pipeline_entrypoint_configuration.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def run(self) -> None:
    """Prepares the environment and runs the configured pipeline."""
    deployment = self.load_deployment()

    # Activate all the integrations. This makes sure that all materializers
    # and stack component flavors are registered.
    integration_registry.activate_integrations()

    self.download_code_if_necessary(deployment=deployment)

    orchestrator = Client().active_stack.orchestrator
    orchestrator._prepare_run(deployment=deployment)

    for step in deployment.step_configurations.values():
        orchestrator.run_step(step)

StepEntrypointConfiguration

Bases: BaseEntrypointConfiguration

Base class for entrypoint configurations that run a single step.

If an orchestrator needs to run steps in a separate process or environment (e.g. a docker container), this class can either be used directly or subclassed if custom behavior is necessary.

How to subclass:

Passing additional arguments to the entrypoint: If you need to pass additional arguments to the entrypoint, there are two methods that you need to implement: * get_entrypoint_options(): This method should return all the options that are required in the entrypoint. Make sure to include the result from the superclass method so the options are complete.

    * `get_entrypoint_arguments(...)`: This method should return
        a list of arguments that should be passed to the entrypoint.
        Make sure to include the result from the superclass method so
        the arguments are complete.

You'll be able to access the argument values from `self.entrypoint_args`
inside your `StepEntrypointConfiguration` subclass.

How to use:

After you created your StepEntrypointConfiguration subclass, you only have to run the entrypoint somewhere. To do this, you should execute the command returned by the get_entrypoint_command() method with the arguments returned by the get_entrypoint_arguments(...) method.

Example:

class MyStepEntrypointConfiguration(StepEntrypointConfiguration):
    ...

class MyOrchestrator(BaseOrchestrator):
    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
    ) -> Any:
        ...

        cmd = MyStepEntrypointConfiguration.get_entrypoint_command()
        for step_name, step in pipeline.steps.items():
            ...

            args = MyStepEntrypointConfiguration.get_entrypoint_arguments(
                step_name=step_name
            )
            # Run the command and pass it the arguments. Our example
            # orchestrator here executes the entrypoint in a separate
            # process, but in a real-world scenario you would probably run
            # it inside a docker container or a different environment.
            import subprocess
            subprocess.check_call(cmd + args)
Source code in src/zenml/entrypoints/step_entrypoint_configuration.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
class StepEntrypointConfiguration(BaseEntrypointConfiguration):
    """Base class for entrypoint configurations that run a single step.

    If an orchestrator needs to run steps in a separate process or environment
    (e.g. a docker container), this class can either be used directly or
    subclassed if custom behavior is necessary.

    How to subclass:
    ----------------
    Passing additional arguments to the entrypoint:
        If you need to pass additional arguments to the entrypoint, there are
        two methods that you need to implement:
            * `get_entrypoint_options()`: This method should return all
                the options that are required in the entrypoint. Make sure to
                include the result from the superclass method so the options
                are complete.

            * `get_entrypoint_arguments(...)`: This method should return
                a list of arguments that should be passed to the entrypoint.
                Make sure to include the result from the superclass method so
                the arguments are complete.

        You'll be able to access the argument values from `self.entrypoint_args`
        inside your `StepEntrypointConfiguration` subclass.

    How to use:
    -----------
    After you created your `StepEntrypointConfiguration` subclass, you only
    have to run the entrypoint somewhere. To do this, you should execute the
    command returned by the `get_entrypoint_command()` method with the
    arguments returned by the `get_entrypoint_arguments(...)` method.

    Example:
    ```python
    class MyStepEntrypointConfiguration(StepEntrypointConfiguration):
        ...

    class MyOrchestrator(BaseOrchestrator):
        def prepare_or_run_pipeline(
            self,
            deployment: "PipelineDeployment",
            stack: "Stack",
        ) -> Any:
            ...

            cmd = MyStepEntrypointConfiguration.get_entrypoint_command()
            for step_name, step in pipeline.steps.items():
                ...

                args = MyStepEntrypointConfiguration.get_entrypoint_arguments(
                    step_name=step_name
                )
                # Run the command and pass it the arguments. Our example
                # orchestrator here executes the entrypoint in a separate
                # process, but in a real-world scenario you would probably run
                # it inside a docker container or a different environment.
                import subprocess
                subprocess.check_call(cmd + args)
    ```
    """

    def post_run(
        self,
        pipeline_name: str,
        step_name: str,
    ) -> None:
        """Does cleanup or post-processing after the step finished running.

        Subclasses should overwrite this method if they need to run any
        additional code after the step execution.

        Args:
            pipeline_name: Name of the parent pipeline of the step that was
                executed.
            step_name: Name of the step that was executed.
        """

    @classmethod
    def get_entrypoint_options(cls) -> Set[str]:
        """Gets all options required for running with this configuration.

        Returns:
            The superclass options as well as an option for the name of the
            step to run.
        """
        return super().get_entrypoint_options() | {STEP_NAME_OPTION}

    @classmethod
    def get_entrypoint_arguments(
        cls,
        **kwargs: Any,
    ) -> List[str]:
        """Gets all arguments that the entrypoint command should be called with.

        The argument list should be something that
        `argparse.ArgumentParser.parse_args(...)` can handle (e.g.
        `["--some_option", "some_value"]` or `["--some_option=some_value"]`).
        It needs to provide values for all options returned by the
        `get_entrypoint_options()` method of this class.

        Args:
            **kwargs: Kwargs, must include the step name.

        Returns:
            The superclass arguments as well as arguments for the name of the
            step to run.
        """
        return super().get_entrypoint_arguments(**kwargs) + [
            f"--{STEP_NAME_OPTION}",
            kwargs[STEP_NAME_OPTION],
        ]

    def run(self) -> None:
        """Prepares the environment and runs the configured step."""
        deployment = self.load_deployment()

        # Activate all the integrations. This makes sure that all materializers
        # and stack component flavors are registered.
        integration_registry.activate_integrations()

        step_name = self.entrypoint_args[STEP_NAME_OPTION]

        # Change the working directory to make sure we're in the correct
        # directory where the files in the Docker image should be included.
        # This is necessary as some services overwrite the working directory
        # configured in the Docker image itself.
        os.makedirs("/app", exist_ok=True)
        os.chdir("/app")

        self.download_code_if_necessary(
            deployment=deployment, step_name=step_name
        )

        # If the working directory is not in the sys.path, we include it to make
        # sure user code gets correctly imported.
        cwd = os.getcwd()
        if cwd not in sys.path:
            sys.path.insert(0, cwd)

        pipeline_name = deployment.pipeline_configuration.name

        step = deployment.step_configurations[step_name]
        self._run_step(step, deployment=deployment)

        self.post_run(
            pipeline_name=pipeline_name,
            step_name=step_name,
        )

    def _run_step(
        self,
        step: "Step",
        deployment: "PipelineDeploymentResponse",
    ) -> None:
        """Runs a single step.

        Args:
            step: The step to run.
            deployment: The deployment configuration.
        """
        orchestrator = Client().active_stack.orchestrator
        orchestrator._prepare_run(deployment=deployment)
        orchestrator.run_step(step=step)

get_entrypoint_arguments(**kwargs) classmethod

Gets all arguments that the entrypoint command should be called with.

The argument list should be something that argparse.ArgumentParser.parse_args(...) can handle (e.g. ["--some_option", "some_value"] or ["--some_option=some_value"]). It needs to provide values for all options returned by the get_entrypoint_options() method of this class.

Parameters:

Name Type Description Default
**kwargs Any

Kwargs, must include the step name.

{}

Returns:

Type Description
List[str]

The superclass arguments as well as arguments for the name of the

List[str]

step to run.

Source code in src/zenml/entrypoints/step_entrypoint_configuration.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
@classmethod
def get_entrypoint_arguments(
    cls,
    **kwargs: Any,
) -> List[str]:
    """Gets all arguments that the entrypoint command should be called with.

    The argument list should be something that
    `argparse.ArgumentParser.parse_args(...)` can handle (e.g.
    `["--some_option", "some_value"]` or `["--some_option=some_value"]`).
    It needs to provide values for all options returned by the
    `get_entrypoint_options()` method of this class.

    Args:
        **kwargs: Kwargs, must include the step name.

    Returns:
        The superclass arguments as well as arguments for the name of the
        step to run.
    """
    return super().get_entrypoint_arguments(**kwargs) + [
        f"--{STEP_NAME_OPTION}",
        kwargs[STEP_NAME_OPTION],
    ]

get_entrypoint_options() classmethod

Gets all options required for running with this configuration.

Returns:

Type Description
Set[str]

The superclass options as well as an option for the name of the

Set[str]

step to run.

Source code in src/zenml/entrypoints/step_entrypoint_configuration.py
113
114
115
116
117
118
119
120
121
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
    """Gets all options required for running with this configuration.

    Returns:
        The superclass options as well as an option for the name of the
        step to run.
    """
    return super().get_entrypoint_options() | {STEP_NAME_OPTION}

post_run(pipeline_name, step_name)

Does cleanup or post-processing after the step finished running.

Subclasses should overwrite this method if they need to run any additional code after the step execution.

Parameters:

Name Type Description Default
pipeline_name str

Name of the parent pipeline of the step that was executed.

required
step_name str

Name of the step that was executed.

required
Source code in src/zenml/entrypoints/step_entrypoint_configuration.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def post_run(
    self,
    pipeline_name: str,
    step_name: str,
) -> None:
    """Does cleanup or post-processing after the step finished running.

    Subclasses should overwrite this method if they need to run any
    additional code after the step execution.

    Args:
        pipeline_name: Name of the parent pipeline of the step that was
            executed.
        step_name: Name of the step that was executed.
    """

run()

Prepares the environment and runs the configured step.

Source code in src/zenml/entrypoints/step_entrypoint_configuration.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def run(self) -> None:
    """Prepares the environment and runs the configured step."""
    deployment = self.load_deployment()

    # Activate all the integrations. This makes sure that all materializers
    # and stack component flavors are registered.
    integration_registry.activate_integrations()

    step_name = self.entrypoint_args[STEP_NAME_OPTION]

    # Change the working directory to make sure we're in the correct
    # directory where the files in the Docker image should be included.
    # This is necessary as some services overwrite the working directory
    # configured in the Docker image itself.
    os.makedirs("/app", exist_ok=True)
    os.chdir("/app")

    self.download_code_if_necessary(
        deployment=deployment, step_name=step_name
    )

    # If the working directory is not in the sys.path, we include it to make
    # sure user code gets correctly imported.
    cwd = os.getcwd()
    if cwd not in sys.path:
        sys.path.insert(0, cwd)

    pipeline_name = deployment.pipeline_configuration.name

    step = deployment.step_configurations[step_name]
    self._run_step(step, deployment=deployment)

    self.post_run(
        pipeline_name=pipeline_name,
        step_name=step_name,
    )