Skip to content

Entrypoints

zenml.entrypoints special

Initializations for ZenML entrypoints module.

base_entrypoint_configuration

Abstract base class for entrypoint configurations.

BaseEntrypointConfiguration (ABC)

Abstract base class for entrypoint configurations.

An entrypoint configuration specifies the arguments that should be passed to the entrypoint and what is running inside the entrypoint.

Attributes:

Name Type Description
entrypoint_args

The parsed arguments passed to the entrypoint.

```

Source code in zenml/entrypoints/base_entrypoint_configuration.py
class BaseEntrypointConfiguration(ABC):
    """Abstract base class for entrypoint configurations.

    An entrypoint configuration specifies the arguments that should be passed
    to the entrypoint and what is running inside the entrypoint.

    Attributes:
        entrypoint_args: The parsed arguments passed to the entrypoint.
    ```
    """

    def __init__(self, arguments: List[str]):
        """Initializes the entrypoint configuration.

        Args:
            arguments: Command line arguments to configure this object.
        """
        self.entrypoint_args = self._parse_arguments(arguments)

    @classmethod
    def get_entrypoint_command(cls) -> List[str]:
        """Returns a command that runs the entrypoint module.

        This entrypoint module is responsible for running the entrypoint
        configuration when called. Defaults to running the
        `zenml.entrypoints.entrypoint` module.

        **Note**: This command won't work on its own but needs to be called with
            the arguments returned by the `get_entrypoint_arguments(...)`
            method of this class.

        Returns:
            A list of strings with the command.
        """
        return DEFAULT_ENTRYPOINT_COMMAND

    @classmethod
    def get_entrypoint_options(cls) -> Set[str]:
        """Gets all options required for running with this configuration.

        Returns:
            A set of strings with all required options.
        """
        return {
            # Importable source pointing to the entrypoint configuration class
            # that should be used inside the entrypoint.
            ENTRYPOINT_CONFIG_SOURCE_OPTION,
        }

    @classmethod
    def get_entrypoint_arguments(
        cls,
        **kwargs: Any,
    ) -> List[str]:
        """Gets all arguments that the entrypoint command should be called with.

        The argument list should be something that
        `argparse.ArgumentParser.parse_args(...)` can handle (e.g.
        `["--some_option", "some_value"]` or `["--some_option=some_value"]`).
        It needs to provide values for all options returned by the
        `get_entrypoint_options()` method of this class.

        Args:
            **kwargs: Keyword args.

        Returns:
            A list of strings with the arguments.
        """
        arguments = [
            f"--{ENTRYPOINT_CONFIG_SOURCE_OPTION}",
            source_utils.resolve_class(cls),
        ]

        return arguments

    def get_run_name(self, pipeline_name: str) -> Optional[str]:
        """Returns an optional run name.

        Examples:
        * If you're in an orchestrator environment which has an equivalent
            concept to a pipeline run, you can use that as the run name. E.g.
            Kubeflow Pipelines sets a run id as environment variable in all
            pods which we can reuse: `return os.environ["KFP_RUN_ID"]`
        * If that isn't possible, you could pass a unique value as an argument
            to the entrypoint (make sure to also return it from
            `get_entrypoint_options()`) and use it like this:
            `return self.entrypoint_args["run_name_option"]`

        Args:
            pipeline_name: The name of the pipeline.

        Returns:
            The run name.
        """
        return None

    @classmethod
    def _parse_arguments(cls, arguments: List[str]) -> Dict[str, Any]:
        """Parses command line arguments.

        This method will create an `argparse.ArgumentParser` and add required
        arguments for all the options specified in the
        `get_entrypoint_options()` method of this class. Subclasses that
        require additional arguments during entrypoint execution should provide
        them by implementing the `get_custom_entrypoint_options()` class method.

        Args:
            arguments: Arguments to parse. The format should be something that
                `argparse.ArgumentParser.parse_args(...)` can handle (e.g.
                `["--some_option", "some_value"]` or
                `["--some_option=some_value"]`).

        Returns:
            Dictionary of the parsed arguments.

        # noqa: DAR402
        Raises:
            ValueError: If the arguments are not valid.
        """
        # Argument parser subclass that suppresses some argparse logs and
        # raises an exception instead of the `sys.exit()` call
        class _CustomParser(argparse.ArgumentParser):
            def error(self, message: str) -> NoReturn:
                raise ValueError(
                    f"Failed to parse entrypoint arguments: {message}"
                )

        parser = _CustomParser()

        for option_name in cls.get_entrypoint_options():
            if option_name == ENTRYPOINT_CONFIG_SOURCE_OPTION:
                # This option is already used by
                # `zenml.entrypoints.step_entrypoint` to read which config
                # class to use
                continue
            parser.add_argument(f"--{option_name}", required=True)

        result, _ = parser.parse_known_args(arguments)
        return vars(result)

    def load_deployment_config(self) -> "PipelineDeployment":
        """Loads the deployment config.

        If a subclass implements the `get_run_name` method and returns a
        value from it, this method will also update the run name of the
        deployment config.

        Returns:
            The (updated) deployment config.
        """
        config_dict = yaml_utils.read_yaml(DOCKER_IMAGE_DEPLOYMENT_CONFIG_FILE)
        deployment_config = PipelineDeployment.parse_obj(config_dict)

        custom_run_name = self.get_run_name(
            pipeline_name=deployment_config.pipeline.name
        )
        if custom_run_name:
            deployment_config = deployment_config.copy(
                update={"run_name": custom_run_name}
            )

        return deployment_config

    @abstractmethod
    def run(self) -> None:
        """Runs the entrypoint configuration."""
__init__(self, arguments) special

Initializes the entrypoint configuration.

Parameters:

Name Type Description Default
arguments List[str]

Command line arguments to configure this object.

required
Source code in zenml/entrypoints/base_entrypoint_configuration.py
def __init__(self, arguments: List[str]):
    """Initializes the entrypoint configuration.

    Args:
        arguments: Command line arguments to configure this object.
    """
    self.entrypoint_args = self._parse_arguments(arguments)
get_entrypoint_arguments(**kwargs) classmethod

Gets all arguments that the entrypoint command should be called with.

The argument list should be something that argparse.ArgumentParser.parse_args(...) can handle (e.g. ["--some_option", "some_value"] or ["--some_option=some_value"]). It needs to provide values for all options returned by the get_entrypoint_options() method of this class.

Parameters:

Name Type Description Default
**kwargs Any

Keyword args.

{}

Returns:

Type Description
List[str]

A list of strings with the arguments.

Source code in zenml/entrypoints/base_entrypoint_configuration.py
@classmethod
def get_entrypoint_arguments(
    cls,
    **kwargs: Any,
) -> List[str]:
    """Gets all arguments that the entrypoint command should be called with.

    The argument list should be something that
    `argparse.ArgumentParser.parse_args(...)` can handle (e.g.
    `["--some_option", "some_value"]` or `["--some_option=some_value"]`).
    It needs to provide values for all options returned by the
    `get_entrypoint_options()` method of this class.

    Args:
        **kwargs: Keyword args.

    Returns:
        A list of strings with the arguments.
    """
    arguments = [
        f"--{ENTRYPOINT_CONFIG_SOURCE_OPTION}",
        source_utils.resolve_class(cls),
    ]

    return arguments
get_entrypoint_command() classmethod

Returns a command that runs the entrypoint module.

This entrypoint module is responsible for running the entrypoint configuration when called. Defaults to running the zenml.entrypoints.entrypoint module.

Note: This command won't work on its own but needs to be called with the arguments returned by the get_entrypoint_arguments(...) method of this class.

Returns:

Type Description
List[str]

A list of strings with the command.

Source code in zenml/entrypoints/base_entrypoint_configuration.py
@classmethod
def get_entrypoint_command(cls) -> List[str]:
    """Returns a command that runs the entrypoint module.

    This entrypoint module is responsible for running the entrypoint
    configuration when called. Defaults to running the
    `zenml.entrypoints.entrypoint` module.

    **Note**: This command won't work on its own but needs to be called with
        the arguments returned by the `get_entrypoint_arguments(...)`
        method of this class.

    Returns:
        A list of strings with the command.
    """
    return DEFAULT_ENTRYPOINT_COMMAND
get_entrypoint_options() classmethod

Gets all options required for running with this configuration.

Returns:

Type Description
Set[str]

A set of strings with all required options.

Source code in zenml/entrypoints/base_entrypoint_configuration.py
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
    """Gets all options required for running with this configuration.

    Returns:
        A set of strings with all required options.
    """
    return {
        # Importable source pointing to the entrypoint configuration class
        # that should be used inside the entrypoint.
        ENTRYPOINT_CONFIG_SOURCE_OPTION,
    }
get_run_name(self, pipeline_name)

Returns an optional run name.

Examples:

  • If you're in an orchestrator environment which has an equivalent concept to a pipeline run, you can use that as the run name. E.g. Kubeflow Pipelines sets a run id as environment variable in all pods which we can reuse: return os.environ["KFP_RUN_ID"]
  • If that isn't possible, you could pass a unique value as an argument to the entrypoint (make sure to also return it from get_entrypoint_options()) and use it like this: return self.entrypoint_args["run_name_option"]

Parameters:

Name Type Description Default
pipeline_name str

The name of the pipeline.

required

Returns:

Type Description
Optional[str]

The run name.

Source code in zenml/entrypoints/base_entrypoint_configuration.py
def get_run_name(self, pipeline_name: str) -> Optional[str]:
    """Returns an optional run name.

    Examples:
    * If you're in an orchestrator environment which has an equivalent
        concept to a pipeline run, you can use that as the run name. E.g.
        Kubeflow Pipelines sets a run id as environment variable in all
        pods which we can reuse: `return os.environ["KFP_RUN_ID"]`
    * If that isn't possible, you could pass a unique value as an argument
        to the entrypoint (make sure to also return it from
        `get_entrypoint_options()`) and use it like this:
        `return self.entrypoint_args["run_name_option"]`

    Args:
        pipeline_name: The name of the pipeline.

    Returns:
        The run name.
    """
    return None
load_deployment_config(self)

Loads the deployment config.

If a subclass implements the get_run_name method and returns a value from it, this method will also update the run name of the deployment config.

Returns:

Type Description
PipelineDeployment

The (updated) deployment config.

Source code in zenml/entrypoints/base_entrypoint_configuration.py
def load_deployment_config(self) -> "PipelineDeployment":
    """Loads the deployment config.

    If a subclass implements the `get_run_name` method and returns a
    value from it, this method will also update the run name of the
    deployment config.

    Returns:
        The (updated) deployment config.
    """
    config_dict = yaml_utils.read_yaml(DOCKER_IMAGE_DEPLOYMENT_CONFIG_FILE)
    deployment_config = PipelineDeployment.parse_obj(config_dict)

    custom_run_name = self.get_run_name(
        pipeline_name=deployment_config.pipeline.name
    )
    if custom_run_name:
        deployment_config = deployment_config.copy(
            update={"run_name": custom_run_name}
        )

    return deployment_config
run(self)

Runs the entrypoint configuration.

Source code in zenml/entrypoints/base_entrypoint_configuration.py
@abstractmethod
def run(self) -> None:
    """Runs the entrypoint configuration."""

entrypoint

Functionality to run ZenML steps or pipelines.

main()

Runs the entrypoint configuration given by the command line arguments.

Source code in zenml/entrypoints/entrypoint.py
def main() -> None:
    """Runs the entrypoint configuration given by the command line arguments."""
    _setup_logging()

    # Make sure this entrypoint does not run an entire pipeline when
    # importing user modules. This could happen if the `pipeline.run()` call
    # is not wrapped in a function or an `if __name__== "__main__":` check)
    constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True

    # Read the source for the entrypoint configuration class from the command
    # line arguments
    parser = argparse.ArgumentParser()
    parser.add_argument(f"--{ENTRYPOINT_CONFIG_SOURCE_OPTION}", required=True)
    args, remaining_args = parser.parse_known_args()

    entrypoint_config_class = source_utils.load_and_validate_class(
        args.entrypoint_config_source,
        expected_class=BaseEntrypointConfiguration,
    )
    entrypoint_config = entrypoint_config_class(arguments=remaining_args)

    entrypoint_config.run()

pipeline_entrypoint_configuration

Abstract base class for entrypoint configurations that run a pipeline.

PipelineEntrypointConfiguration (BaseEntrypointConfiguration)

Base class for entrypoint configurations that run an entire pipeline.

Source code in zenml/entrypoints/pipeline_entrypoint_configuration.py
class PipelineEntrypointConfiguration(BaseEntrypointConfiguration):
    """Base class for entrypoint configurations that run an entire pipeline."""

    def run(self) -> None:
        """Prepares the environment and runs the configured pipeline."""
        deployment_config = self.load_deployment_config()

        # Activate all the integrations. This makes sure that all materializers
        # and stack component flavors are registered.
        integration_registry.activate_integrations()

        orchestrator = Client().active_stack.orchestrator
        orchestrator._prepare_run(deployment=deployment_config)

        for step in deployment_config.steps.values():
            entrypoint_utils.load_and_configure_step(step)
            orchestrator.run_step(step)
run(self)

Prepares the environment and runs the configured pipeline.

Source code in zenml/entrypoints/pipeline_entrypoint_configuration.py
def run(self) -> None:
    """Prepares the environment and runs the configured pipeline."""
    deployment_config = self.load_deployment_config()

    # Activate all the integrations. This makes sure that all materializers
    # and stack component flavors are registered.
    integration_registry.activate_integrations()

    orchestrator = Client().active_stack.orchestrator
    orchestrator._prepare_run(deployment=deployment_config)

    for step in deployment_config.steps.values():
        entrypoint_utils.load_and_configure_step(step)
        orchestrator.run_step(step)

step_entrypoint_configuration

Base class for entrypoint configurations that run a single step.

StepEntrypointConfiguration (BaseEntrypointConfiguration)

Base class for entrypoint configurations that run a single step.

If an orchestrator needs to run steps in a separate process or environment (e.g. a docker container), this class can either be used directly or subclassed if custom behavior like setting a run name for scheduled runs is necessary.

How to subclass:

There is only one mandatory method get_run_name(...) that you need to implement in order to get a functioning entrypoint. Inside this method you need to return a string which has to be the same for all steps that are executed as part of the same pipeline run.

Passing additional arguments to the entrypoint: If you need to pass additional arguments to the entrypoint, there are two methods that you need to implement: * get_entrypoint_options(): This method should return all the options that are required in the entrypoint. Make sure to include the result from the superclass method so the options are complete.

    * `get_entrypoint_arguments(...)`: This method should return
        a list of arguments that should be passed to the entrypoint.
        Make sure to include the result from the superclass method so
        the arguments are complete.

You'll be able to access the argument values from `self.entrypoint_args`
inside your `StepEntrypointConfiguration` subclass.
How to use:

After you created your StepEntrypointConfiguration subclass, you only have to run the entrypoint somewhere. To do this, you should execute the command returned by the get_entrypoint_command() method with the arguments returned by the get_entrypoint_arguments(...) method.

Examples:

class MyStepEntrypointConfiguration(StepEntrypointConfiguration):
    ...

class MyOrchestrator(BaseOrchestrator):
    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
    ) -> Any:
        ...

        cmd = MyStepEntrypointConfiguration.get_entrypoint_command()
        for step_name, step in pipeline.steps.items():
            ...

            args = MyStepEntrypointConfiguration.get_entrypoint_arguments(
                step_name=step_name
            )
            # Run the command and pass it the arguments. Our example
            # orchestrator here executes the entrypoint in a separate
            # process, but in a real-world scenario you would probably run
            # it inside a docker container or a different environment.
            import subprocess
            subprocess.check_call(cmd + args)
Source code in zenml/entrypoints/step_entrypoint_configuration.py
class StepEntrypointConfiguration(BaseEntrypointConfiguration):
    """Base class for entrypoint configurations that run a single step.

    If an orchestrator needs to run steps in a separate process or environment
    (e.g. a docker container), this class can either be used directly or
    subclassed if custom behavior like setting a run name for scheduled runs
    is necessary.

    How to subclass:
    ----------------
    There is only one mandatory method `get_run_name(...)` that you need to
    implement in order to get a functioning entrypoint. Inside this method you
    need to return a string which **has** to be the same for all steps that are
    executed as part of the same pipeline run.

    Passing additional arguments to the entrypoint:
        If you need to pass additional arguments to the entrypoint, there are
        two methods that you need to implement:
            * `get_entrypoint_options()`: This method should return all
                the options that are required in the entrypoint. Make sure to
                include the result from the superclass method so the options
                are complete.

            * `get_entrypoint_arguments(...)`: This method should return
                a list of arguments that should be passed to the entrypoint.
                Make sure to include the result from the superclass method so
                the arguments are complete.

        You'll be able to access the argument values from `self.entrypoint_args`
        inside your `StepEntrypointConfiguration` subclass.

    How to use:
    -----------
    After you created your `StepEntrypointConfiguration` subclass, you only
    have to run the entrypoint somewhere. To do this, you should execute the
    command returned by the `get_entrypoint_command()` method with the
    arguments returned by the `get_entrypoint_arguments(...)` method.

    Example:
    ```python
    class MyStepEntrypointConfiguration(StepEntrypointConfiguration):
        ...

    class MyOrchestrator(BaseOrchestrator):
        def prepare_or_run_pipeline(
            self,
            deployment: "PipelineDeployment",
            stack: "Stack",
        ) -> Any:
            ...

            cmd = MyStepEntrypointConfiguration.get_entrypoint_command()
            for step_name, step in pipeline.steps.items():
                ...

                args = MyStepEntrypointConfiguration.get_entrypoint_arguments(
                    step_name=step_name
                )
                # Run the command and pass it the arguments. Our example
                # orchestrator here executes the entrypoint in a separate
                # process, but in a real-world scenario you would probably run
                # it inside a docker container or a different environment.
                import subprocess
                subprocess.check_call(cmd + args)
    ```
    """

    def post_run(
        self,
        pipeline_name: str,
        step_name: str,
        execution_info: Optional[data_types.ExecutionInfo] = None,
    ) -> None:
        """Does cleanup or post-processing after the step finished running.

        Subclasses should overwrite this method if they need to run any
        additional code after the step execution.

        Args:
            pipeline_name: Name of the parent pipeline of the step that was
                executed.
            step_name: Name of the step that was executed.
            execution_info: Info about the finished step execution.
        """

    @classmethod
    def get_entrypoint_options(cls) -> Set[str]:
        """Gets all options required for running with this configuration.

        Returns:
            The superclass options as well as an option for the name of the
            step to run.
        """
        return super().get_entrypoint_options() | {STEP_NAME_OPTION}

    @classmethod
    def get_entrypoint_arguments(
        cls,
        **kwargs: Any,
    ) -> List[str]:
        """Gets all arguments that the entrypoint command should be called with.

        The argument list should be something that
        `argparse.ArgumentParser.parse_args(...)` can handle (e.g.
        `["--some_option", "some_value"]` or `["--some_option=some_value"]`).
        It needs to provide values for all options returned by the
        `get_entrypoint_options()` method of this class.

        Args:
            **kwargs: Kwargs, must include the step name.

        Returns:
            The superclass arguments as well as arguments for the name of the
            step to run.
        """
        return super().get_entrypoint_arguments(**kwargs) + [
            f"--{STEP_NAME_OPTION}",
            kwargs[STEP_NAME_OPTION],
        ]

    def run(self) -> None:
        """Prepares the environment and runs the configured step."""
        deployment_config = self.load_deployment_config()

        step_name = self.entrypoint_args[STEP_NAME_OPTION]
        pipeline_name = deployment_config.pipeline.name

        # Activate all the integrations. This makes sure that all materializers
        # and stack component flavors are registered.
        integration_registry.activate_integrations()

        step = deployment_config.steps[step_name]
        entrypoint_utils.load_and_configure_step(step)
        execution_info = self._run_step(step, deployment=deployment_config)

        self.post_run(
            pipeline_name=pipeline_name,
            step_name=step_name,
            execution_info=execution_info,
        )

    def _run_step(
        self,
        step: "Step",
        deployment: "PipelineDeployment",
    ) -> Optional[data_types.ExecutionInfo]:
        """Runs a single step.

        Args:
            step: The step to run.
            deployment: The deployment configuration.

        Returns:
            Optional execution info of the run.
        """
        orchestrator = Client().active_stack.orchestrator
        orchestrator._prepare_run(deployment=deployment)
        return orchestrator.run_step(step=step)
get_entrypoint_arguments(**kwargs) classmethod

Gets all arguments that the entrypoint command should be called with.

The argument list should be something that argparse.ArgumentParser.parse_args(...) can handle (e.g. ["--some_option", "some_value"] or ["--some_option=some_value"]). It needs to provide values for all options returned by the get_entrypoint_options() method of this class.

Parameters:

Name Type Description Default
**kwargs Any

Kwargs, must include the step name.

{}

Returns:

Type Description
List[str]

The superclass arguments as well as arguments for the name of the step to run.

Source code in zenml/entrypoints/step_entrypoint_configuration.py
@classmethod
def get_entrypoint_arguments(
    cls,
    **kwargs: Any,
) -> List[str]:
    """Gets all arguments that the entrypoint command should be called with.

    The argument list should be something that
    `argparse.ArgumentParser.parse_args(...)` can handle (e.g.
    `["--some_option", "some_value"]` or `["--some_option=some_value"]`).
    It needs to provide values for all options returned by the
    `get_entrypoint_options()` method of this class.

    Args:
        **kwargs: Kwargs, must include the step name.

    Returns:
        The superclass arguments as well as arguments for the name of the
        step to run.
    """
    return super().get_entrypoint_arguments(**kwargs) + [
        f"--{STEP_NAME_OPTION}",
        kwargs[STEP_NAME_OPTION],
    ]
get_entrypoint_options() classmethod

Gets all options required for running with this configuration.

Returns:

Type Description
Set[str]

The superclass options as well as an option for the name of the step to run.

Source code in zenml/entrypoints/step_entrypoint_configuration.py
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
    """Gets all options required for running with this configuration.

    Returns:
        The superclass options as well as an option for the name of the
        step to run.
    """
    return super().get_entrypoint_options() | {STEP_NAME_OPTION}
post_run(self, pipeline_name, step_name, execution_info=None)

Does cleanup or post-processing after the step finished running.

Subclasses should overwrite this method if they need to run any additional code after the step execution.

Parameters:

Name Type Description Default
pipeline_name str

Name of the parent pipeline of the step that was executed.

required
step_name str

Name of the step that was executed.

required
execution_info Optional[tfx.orchestration.portable.data_types.ExecutionInfo]

Info about the finished step execution.

None
Source code in zenml/entrypoints/step_entrypoint_configuration.py
def post_run(
    self,
    pipeline_name: str,
    step_name: str,
    execution_info: Optional[data_types.ExecutionInfo] = None,
) -> None:
    """Does cleanup or post-processing after the step finished running.

    Subclasses should overwrite this method if they need to run any
    additional code after the step execution.

    Args:
        pipeline_name: Name of the parent pipeline of the step that was
            executed.
        step_name: Name of the step that was executed.
        execution_info: Info about the finished step execution.
    """
run(self)

Prepares the environment and runs the configured step.

Source code in zenml/entrypoints/step_entrypoint_configuration.py
def run(self) -> None:
    """Prepares the environment and runs the configured step."""
    deployment_config = self.load_deployment_config()

    step_name = self.entrypoint_args[STEP_NAME_OPTION]
    pipeline_name = deployment_config.pipeline.name

    # Activate all the integrations. This makes sure that all materializers
    # and stack component flavors are registered.
    integration_registry.activate_integrations()

    step = deployment_config.steps[step_name]
    entrypoint_utils.load_and_configure_step(step)
    execution_info = self._run_step(step, deployment=deployment_config)

    self.post_run(
        pipeline_name=pipeline_name,
        step_name=step_name,
        execution_info=execution_info,
    )

utils

Utility functions for ZenML entrypoints.

load_and_configure_step(step)

Loads and configures a step.

Additionally, this function creates the executor class necessary for this step to be executed.

Parameters:

Name Type Description Default
step Step

The representation of the step to be loaded.

required

Returns:

Type Description
BaseStep

The configured step instance.

Source code in zenml/entrypoints/utils.py
def load_and_configure_step(step: "Step") -> "BaseStep":
    """Loads and configures a step.

    Additionally, this function creates the executor class necessary for
    this step to be executed.

    Args:
        step: The representation of the step to be loaded.

    Returns:
        The configured step instance.
    """
    step_class: Type[BaseStep] = source_utils.load_and_validate_class(
        step.spec.source, expected_class=BaseStep
    )

    step_instance = step_class()
    step_instance._configuration = step.config
    step_utils.create_executor_class(step=step_instance)

    return step_instance