Entrypoints
zenml.entrypoints
special
Initializations for ZenML entrypoints module.
base_entrypoint_configuration
Abstract base class for entrypoint configurations.
BaseEntrypointConfiguration (ABC)
Abstract base class for entrypoint configurations.
An entrypoint configuration specifies the arguments that should be passed to the entrypoint and what is running inside the entrypoint.
Attributes:
Name | Type | Description |
---|---|---|
entrypoint_args |
The parsed arguments passed to the entrypoint. |
Source code in zenml/entrypoints/base_entrypoint_configuration.py
class BaseEntrypointConfiguration(ABC):
"""Abstract base class for entrypoint configurations.
An entrypoint configuration specifies the arguments that should be passed
to the entrypoint and what is running inside the entrypoint.
Attributes:
entrypoint_args: The parsed arguments passed to the entrypoint.
"""
def __init__(self, arguments: List[str]):
"""Initializes the entrypoint configuration.
Args:
arguments: Command line arguments to configure this object.
"""
self.entrypoint_args = self._parse_arguments(arguments)
@classmethod
def get_entrypoint_command(cls) -> List[str]:
"""Returns a command that runs the entrypoint module.
This entrypoint module is responsible for running the entrypoint
configuration when called. Defaults to running the
`zenml.entrypoints.entrypoint` module.
**Note**: This command won't work on its own but needs to be called with
the arguments returned by the `get_entrypoint_arguments(...)`
method of this class.
Returns:
A list of strings with the command.
"""
return DEFAULT_ENTRYPOINT_COMMAND
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
"""Gets all options required for running with this configuration.
Returns:
A set of strings with all required options.
"""
return {
# Importable source pointing to the entrypoint configuration class
# that should be used inside the entrypoint.
ENTRYPOINT_CONFIG_SOURCE_OPTION,
# ID of the pipeline deployment to use in this entrypoint
DEPLOYMENT_ID_OPTION,
}
@classmethod
def get_entrypoint_arguments(
cls,
**kwargs: Any,
) -> List[str]:
"""Gets all arguments that the entrypoint command should be called with.
The argument list should be something that
`argparse.ArgumentParser.parse_args(...)` can handle (e.g.
`["--some_option", "some_value"]` or `["--some_option=some_value"]`).
It needs to provide values for all options returned by the
`get_entrypoint_options()` method of this class.
Args:
**kwargs: Keyword args.
Returns:
A list of strings with the arguments.
Raises:
ValueError: If no valid deployment ID is passed.
"""
deployment_id = kwargs.get(DEPLOYMENT_ID_OPTION)
if not uuid_utils.is_valid_uuid(deployment_id):
raise ValueError(
f"Missing or invalid deployment ID as argument for entrypoint "
f"configuration. Please make sure to pass a valid UUID to "
f"`{cls.__name__}.{cls.get_entrypoint_arguments.__name__}"
f"({DEPLOYMENT_ID_OPTION}=<UUID>)`."
)
arguments = [
f"--{ENTRYPOINT_CONFIG_SOURCE_OPTION}",
source_utils.resolve(cls).import_path,
f"--{DEPLOYMENT_ID_OPTION}",
str(deployment_id),
]
return arguments
@classmethod
def _parse_arguments(cls, arguments: List[str]) -> Dict[str, Any]:
"""Parses command line arguments.
This method will create an `argparse.ArgumentParser` and add required
arguments for all the options specified in the
`get_entrypoint_options()` method of this class.
Args:
arguments: Arguments to parse. The format should be something that
`argparse.ArgumentParser.parse_args(...)` can handle (e.g.
`["--some_option", "some_value"]` or
`["--some_option=some_value"]`).
Returns:
Dictionary of the parsed arguments.
# noqa: DAR402
Raises:
ValueError: If the arguments are not valid.
"""
class _CustomParser(argparse.ArgumentParser):
"""Argument parser subclass that suppresses some argparse logs.
Also raises an exception instead of the `sys.exit()` call.
"""
def error(self, message: str) -> NoReturn:
raise ValueError(
f"Failed to parse entrypoint arguments: {message}"
)
parser = _CustomParser()
for option_name in cls.get_entrypoint_options():
if option_name == ENTRYPOINT_CONFIG_SOURCE_OPTION:
# This option is already used by
# `zenml.entrypoints.entrypoint` to read which config
# class to use
continue
parser.add_argument(f"--{option_name}", required=True)
result, _ = parser.parse_known_args(arguments)
return vars(result)
def load_deployment(self) -> "PipelineDeploymentResponseModel":
"""Loads the deployment.
Returns:
The deployment.
"""
deployment_id = UUID(self.entrypoint_args[DEPLOYMENT_ID_OPTION])
return Client().zen_store.get_deployment(deployment_id=deployment_id)
def download_code_if_necessary(
self, deployment: "PipelineDeploymentResponseModel"
) -> None:
"""Downloads user code if necessary.
Args:
deployment: The deployment for which to download the code.
Raises:
RuntimeError: If the current environment requires code download
but the deployment does not have an associated code reference.
"""
requires_code_download = handle_bool_env_var(
ENV_ZENML_REQUIRES_CODE_DOWNLOAD
)
if not requires_code_download:
return
code_reference = deployment.code_reference
if not code_reference:
raise RuntimeError(
"Code download required but no code reference provided."
)
logger.info(
"Downloading code from code repository `%s` (commit `%s`).",
code_reference.code_repository.name,
code_reference.commit,
)
model = Client().get_code_repository(code_reference.code_repository.id)
repo = BaseCodeRepository.from_model(model)
code_repo_root = os.path.abspath("code")
download_dir = os.path.join(
code_repo_root, code_reference.subdirectory
)
os.makedirs(download_dir)
repo.download_files(
commit=code_reference.commit,
directory=download_dir,
repo_sub_directory=code_reference.subdirectory,
)
source_utils.set_custom_source_root(download_dir)
code_repository_utils.set_custom_local_repository(
root=code_repo_root, commit=code_reference.commit, repo=repo
)
# Add downloaded file directory to python path
sys.path.insert(0, download_dir)
logger.info("Code download finished.")
@abstractmethod
def run(self) -> None:
"""Runs the entrypoint configuration."""
__init__(self, arguments)
special
Initializes the entrypoint configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
arguments |
List[str] |
Command line arguments to configure this object. |
required |
Source code in zenml/entrypoints/base_entrypoint_configuration.py
def __init__(self, arguments: List[str]):
"""Initializes the entrypoint configuration.
Args:
arguments: Command line arguments to configure this object.
"""
self.entrypoint_args = self._parse_arguments(arguments)
download_code_if_necessary(self, deployment)
Downloads user code if necessary.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentResponseModel |
The deployment for which to download the code. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the current environment requires code download but the deployment does not have an associated code reference. |
Source code in zenml/entrypoints/base_entrypoint_configuration.py
def download_code_if_necessary(
self, deployment: "PipelineDeploymentResponseModel"
) -> None:
"""Downloads user code if necessary.
Args:
deployment: The deployment for which to download the code.
Raises:
RuntimeError: If the current environment requires code download
but the deployment does not have an associated code reference.
"""
requires_code_download = handle_bool_env_var(
ENV_ZENML_REQUIRES_CODE_DOWNLOAD
)
if not requires_code_download:
return
code_reference = deployment.code_reference
if not code_reference:
raise RuntimeError(
"Code download required but no code reference provided."
)
logger.info(
"Downloading code from code repository `%s` (commit `%s`).",
code_reference.code_repository.name,
code_reference.commit,
)
model = Client().get_code_repository(code_reference.code_repository.id)
repo = BaseCodeRepository.from_model(model)
code_repo_root = os.path.abspath("code")
download_dir = os.path.join(
code_repo_root, code_reference.subdirectory
)
os.makedirs(download_dir)
repo.download_files(
commit=code_reference.commit,
directory=download_dir,
repo_sub_directory=code_reference.subdirectory,
)
source_utils.set_custom_source_root(download_dir)
code_repository_utils.set_custom_local_repository(
root=code_repo_root, commit=code_reference.commit, repo=repo
)
# Add downloaded file directory to python path
sys.path.insert(0, download_dir)
logger.info("Code download finished.")
get_entrypoint_arguments(**kwargs)
classmethod
Gets all arguments that the entrypoint command should be called with.
The argument list should be something that
argparse.ArgumentParser.parse_args(...)
can handle (e.g.
["--some_option", "some_value"]
or ["--some_option=some_value"]
).
It needs to provide values for all options returned by the
get_entrypoint_options()
method of this class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs |
Any |
Keyword args. |
{} |
Returns:
Type | Description |
---|---|
List[str] |
A list of strings with the arguments. |
Exceptions:
Type | Description |
---|---|
ValueError |
If no valid deployment ID is passed. |
Source code in zenml/entrypoints/base_entrypoint_configuration.py
@classmethod
def get_entrypoint_arguments(
cls,
**kwargs: Any,
) -> List[str]:
"""Gets all arguments that the entrypoint command should be called with.
The argument list should be something that
`argparse.ArgumentParser.parse_args(...)` can handle (e.g.
`["--some_option", "some_value"]` or `["--some_option=some_value"]`).
It needs to provide values for all options returned by the
`get_entrypoint_options()` method of this class.
Args:
**kwargs: Keyword args.
Returns:
A list of strings with the arguments.
Raises:
ValueError: If no valid deployment ID is passed.
"""
deployment_id = kwargs.get(DEPLOYMENT_ID_OPTION)
if not uuid_utils.is_valid_uuid(deployment_id):
raise ValueError(
f"Missing or invalid deployment ID as argument for entrypoint "
f"configuration. Please make sure to pass a valid UUID to "
f"`{cls.__name__}.{cls.get_entrypoint_arguments.__name__}"
f"({DEPLOYMENT_ID_OPTION}=<UUID>)`."
)
arguments = [
f"--{ENTRYPOINT_CONFIG_SOURCE_OPTION}",
source_utils.resolve(cls).import_path,
f"--{DEPLOYMENT_ID_OPTION}",
str(deployment_id),
]
return arguments
get_entrypoint_command()
classmethod
Returns a command that runs the entrypoint module.
This entrypoint module is responsible for running the entrypoint
configuration when called. Defaults to running the
zenml.entrypoints.entrypoint
module.
Note: This command won't work on its own but needs to be called with
the arguments returned by the get_entrypoint_arguments(...)
method of this class.
Returns:
Type | Description |
---|---|
List[str] |
A list of strings with the command. |
Source code in zenml/entrypoints/base_entrypoint_configuration.py
@classmethod
def get_entrypoint_command(cls) -> List[str]:
"""Returns a command that runs the entrypoint module.
This entrypoint module is responsible for running the entrypoint
configuration when called. Defaults to running the
`zenml.entrypoints.entrypoint` module.
**Note**: This command won't work on its own but needs to be called with
the arguments returned by the `get_entrypoint_arguments(...)`
method of this class.
Returns:
A list of strings with the command.
"""
return DEFAULT_ENTRYPOINT_COMMAND
get_entrypoint_options()
classmethod
Gets all options required for running with this configuration.
Returns:
Type | Description |
---|---|
Set[str] |
A set of strings with all required options. |
Source code in zenml/entrypoints/base_entrypoint_configuration.py
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
"""Gets all options required for running with this configuration.
Returns:
A set of strings with all required options.
"""
return {
# Importable source pointing to the entrypoint configuration class
# that should be used inside the entrypoint.
ENTRYPOINT_CONFIG_SOURCE_OPTION,
# ID of the pipeline deployment to use in this entrypoint
DEPLOYMENT_ID_OPTION,
}
load_deployment(self)
Loads the deployment.
Returns:
Type | Description |
---|---|
PipelineDeploymentResponseModel |
The deployment. |
Source code in zenml/entrypoints/base_entrypoint_configuration.py
def load_deployment(self) -> "PipelineDeploymentResponseModel":
"""Loads the deployment.
Returns:
The deployment.
"""
deployment_id = UUID(self.entrypoint_args[DEPLOYMENT_ID_OPTION])
return Client().zen_store.get_deployment(deployment_id=deployment_id)
run(self)
Runs the entrypoint configuration.
Source code in zenml/entrypoints/base_entrypoint_configuration.py
@abstractmethod
def run(self) -> None:
"""Runs the entrypoint configuration."""
entrypoint
Functionality to run ZenML steps or pipelines.
main()
Runs the entrypoint configuration given by the command line arguments.
Source code in zenml/entrypoints/entrypoint.py
def main() -> None:
"""Runs the entrypoint configuration given by the command line arguments."""
_setup_logging()
# Make sure this entrypoint does not run an entire pipeline when
# importing user modules. This could happen if the `pipeline.run()` call
# is not wrapped in a function or an `if __name__== "__main__":` check)
constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True
# Read the source for the entrypoint configuration class from the command
# line arguments
parser = argparse.ArgumentParser()
parser.add_argument(f"--{ENTRYPOINT_CONFIG_SOURCE_OPTION}", required=True)
args, remaining_args = parser.parse_known_args()
entrypoint_config_class = source_utils.load_and_validate_class(
args.entrypoint_config_source,
expected_class=BaseEntrypointConfiguration,
)
entrypoint_config = entrypoint_config_class(arguments=remaining_args)
entrypoint_config.run()
pipeline_entrypoint_configuration
Abstract base class for entrypoint configurations that run a pipeline.
PipelineEntrypointConfiguration (BaseEntrypointConfiguration)
Base class for entrypoint configurations that run an entire pipeline.
Source code in zenml/entrypoints/pipeline_entrypoint_configuration.py
class PipelineEntrypointConfiguration(BaseEntrypointConfiguration):
"""Base class for entrypoint configurations that run an entire pipeline."""
def run(self) -> None:
"""Prepares the environment and runs the configured pipeline."""
deployment = self.load_deployment()
# Activate all the integrations. This makes sure that all materializers
# and stack component flavors are registered.
integration_registry.activate_integrations()
self.download_code_if_necessary(deployment=deployment)
orchestrator = Client().active_stack.orchestrator
orchestrator._prepare_run(deployment=deployment)
for step in deployment.step_configurations.values():
orchestrator.run_step(step)
run(self)
Prepares the environment and runs the configured pipeline.
Source code in zenml/entrypoints/pipeline_entrypoint_configuration.py
def run(self) -> None:
"""Prepares the environment and runs the configured pipeline."""
deployment = self.load_deployment()
# Activate all the integrations. This makes sure that all materializers
# and stack component flavors are registered.
integration_registry.activate_integrations()
self.download_code_if_necessary(deployment=deployment)
orchestrator = Client().active_stack.orchestrator
orchestrator._prepare_run(deployment=deployment)
for step in deployment.step_configurations.values():
orchestrator.run_step(step)
step_entrypoint_configuration
Base class for entrypoint configurations that run a single step.
StepEntrypointConfiguration (BaseEntrypointConfiguration)
Base class for entrypoint configurations that run a single step.
If an orchestrator needs to run steps in a separate process or environment (e.g. a docker container), this class can either be used directly or subclassed if custom behavior is necessary.
How to subclass:
Passing additional arguments to the entrypoint:
If you need to pass additional arguments to the entrypoint, there are
two methods that you need to implement:
* get_entrypoint_options()
: This method should return all
the options that are required in the entrypoint. Make sure to
include the result from the superclass method so the options
are complete.
* `get_entrypoint_arguments(...)`: This method should return
a list of arguments that should be passed to the entrypoint.
Make sure to include the result from the superclass method so
the arguments are complete.
You'll be able to access the argument values from `self.entrypoint_args`
inside your `StepEntrypointConfiguration` subclass.
How to use:
After you created your StepEntrypointConfiguration
subclass, you only
have to run the entrypoint somewhere. To do this, you should execute the
command returned by the get_entrypoint_command()
method with the
arguments returned by the get_entrypoint_arguments(...)
method.
Examples:
class MyStepEntrypointConfiguration(StepEntrypointConfiguration):
...
class MyOrchestrator(BaseOrchestrator):
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> Any:
...
cmd = MyStepEntrypointConfiguration.get_entrypoint_command()
for step_name, step in pipeline.steps.items():
...
args = MyStepEntrypointConfiguration.get_entrypoint_arguments(
step_name=step_name
)
# Run the command and pass it the arguments. Our example
# orchestrator here executes the entrypoint in a separate
# process, but in a real-world scenario you would probably run
# it inside a docker container or a different environment.
import subprocess
subprocess.check_call(cmd + args)
Source code in zenml/entrypoints/step_entrypoint_configuration.py
class StepEntrypointConfiguration(BaseEntrypointConfiguration):
"""Base class for entrypoint configurations that run a single step.
If an orchestrator needs to run steps in a separate process or environment
(e.g. a docker container), this class can either be used directly or
subclassed if custom behavior is necessary.
How to subclass:
----------------
Passing additional arguments to the entrypoint:
If you need to pass additional arguments to the entrypoint, there are
two methods that you need to implement:
* `get_entrypoint_options()`: This method should return all
the options that are required in the entrypoint. Make sure to
include the result from the superclass method so the options
are complete.
* `get_entrypoint_arguments(...)`: This method should return
a list of arguments that should be passed to the entrypoint.
Make sure to include the result from the superclass method so
the arguments are complete.
You'll be able to access the argument values from `self.entrypoint_args`
inside your `StepEntrypointConfiguration` subclass.
How to use:
-----------
After you created your `StepEntrypointConfiguration` subclass, you only
have to run the entrypoint somewhere. To do this, you should execute the
command returned by the `get_entrypoint_command()` method with the
arguments returned by the `get_entrypoint_arguments(...)` method.
Example:
```python
class MyStepEntrypointConfiguration(StepEntrypointConfiguration):
...
class MyOrchestrator(BaseOrchestrator):
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> Any:
...
cmd = MyStepEntrypointConfiguration.get_entrypoint_command()
for step_name, step in pipeline.steps.items():
...
args = MyStepEntrypointConfiguration.get_entrypoint_arguments(
step_name=step_name
)
# Run the command and pass it the arguments. Our example
# orchestrator here executes the entrypoint in a separate
# process, but in a real-world scenario you would probably run
# it inside a docker container or a different environment.
import subprocess
subprocess.check_call(cmd + args)
```
"""
def post_run(
self,
pipeline_name: str,
step_name: str,
) -> None:
"""Does cleanup or post-processing after the step finished running.
Subclasses should overwrite this method if they need to run any
additional code after the step execution.
Args:
pipeline_name: Name of the parent pipeline of the step that was
executed.
step_name: Name of the step that was executed.
"""
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
"""Gets all options required for running with this configuration.
Returns:
The superclass options as well as an option for the name of the
step to run.
"""
return super().get_entrypoint_options() | {STEP_NAME_OPTION}
@classmethod
def get_entrypoint_arguments(
cls,
**kwargs: Any,
) -> List[str]:
"""Gets all arguments that the entrypoint command should be called with.
The argument list should be something that
`argparse.ArgumentParser.parse_args(...)` can handle (e.g.
`["--some_option", "some_value"]` or `["--some_option=some_value"]`).
It needs to provide values for all options returned by the
`get_entrypoint_options()` method of this class.
Args:
**kwargs: Kwargs, must include the step name.
Returns:
The superclass arguments as well as arguments for the name of the
step to run.
"""
return super().get_entrypoint_arguments(**kwargs) + [
f"--{STEP_NAME_OPTION}",
kwargs[STEP_NAME_OPTION],
]
def run(self) -> None:
"""Prepares the environment and runs the configured step."""
deployment = self.load_deployment()
# Activate all the integrations. This makes sure that all materializers
# and stack component flavors are registered.
integration_registry.activate_integrations()
self.download_code_if_necessary(deployment=deployment)
step_name = self.entrypoint_args[STEP_NAME_OPTION]
pipeline_name = deployment.pipeline_configuration.name
step = deployment.step_configurations[step_name]
self._run_step(step, deployment=deployment)
self.post_run(
pipeline_name=pipeline_name,
step_name=step_name,
)
def _run_step(
self,
step: "Step",
deployment: "PipelineDeploymentResponseModel",
) -> None:
"""Runs a single step.
Args:
step: The step to run.
deployment: The deployment configuration.
"""
orchestrator = Client().active_stack.orchestrator
orchestrator._prepare_run(deployment=deployment)
orchestrator.run_step(step=step)
get_entrypoint_arguments(**kwargs)
classmethod
Gets all arguments that the entrypoint command should be called with.
The argument list should be something that
argparse.ArgumentParser.parse_args(...)
can handle (e.g.
["--some_option", "some_value"]
or ["--some_option=some_value"]
).
It needs to provide values for all options returned by the
get_entrypoint_options()
method of this class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs |
Any |
Kwargs, must include the step name. |
{} |
Returns:
Type | Description |
---|---|
List[str] |
The superclass arguments as well as arguments for the name of the step to run. |
Source code in zenml/entrypoints/step_entrypoint_configuration.py
@classmethod
def get_entrypoint_arguments(
cls,
**kwargs: Any,
) -> List[str]:
"""Gets all arguments that the entrypoint command should be called with.
The argument list should be something that
`argparse.ArgumentParser.parse_args(...)` can handle (e.g.
`["--some_option", "some_value"]` or `["--some_option=some_value"]`).
It needs to provide values for all options returned by the
`get_entrypoint_options()` method of this class.
Args:
**kwargs: Kwargs, must include the step name.
Returns:
The superclass arguments as well as arguments for the name of the
step to run.
"""
return super().get_entrypoint_arguments(**kwargs) + [
f"--{STEP_NAME_OPTION}",
kwargs[STEP_NAME_OPTION],
]
get_entrypoint_options()
classmethod
Gets all options required for running with this configuration.
Returns:
Type | Description |
---|---|
Set[str] |
The superclass options as well as an option for the name of the step to run. |
Source code in zenml/entrypoints/step_entrypoint_configuration.py
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
"""Gets all options required for running with this configuration.
Returns:
The superclass options as well as an option for the name of the
step to run.
"""
return super().get_entrypoint_options() | {STEP_NAME_OPTION}
post_run(self, pipeline_name, step_name)
Does cleanup or post-processing after the step finished running.
Subclasses should overwrite this method if they need to run any additional code after the step execution.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name |
str |
Name of the parent pipeline of the step that was executed. |
required |
step_name |
str |
Name of the step that was executed. |
required |
Source code in zenml/entrypoints/step_entrypoint_configuration.py
def post_run(
self,
pipeline_name: str,
step_name: str,
) -> None:
"""Does cleanup or post-processing after the step finished running.
Subclasses should overwrite this method if they need to run any
additional code after the step execution.
Args:
pipeline_name: Name of the parent pipeline of the step that was
executed.
step_name: Name of the step that was executed.
"""
run(self)
Prepares the environment and runs the configured step.
Source code in zenml/entrypoints/step_entrypoint_configuration.py
def run(self) -> None:
"""Prepares the environment and runs the configured step."""
deployment = self.load_deployment()
# Activate all the integrations. This makes sure that all materializers
# and stack component flavors are registered.
integration_registry.activate_integrations()
self.download_code_if_necessary(deployment=deployment)
step_name = self.entrypoint_args[STEP_NAME_OPTION]
pipeline_name = deployment.pipeline_configuration.name
step = deployment.step_configurations[step_name]
self._run_step(step, deployment=deployment)
self.post_run(
pipeline_name=pipeline_name,
step_name=step_name,
)