Pipelines
zenml.pipelines
special
build_utils
Pipeline build utilities.
build_required(deployment)
Checks whether a build is required for the deployment and active stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentBase |
The deployment for which to check. |
required |
Returns:
Type | Description |
---|---|
bool |
If a build is required. |
Source code in zenml/pipelines/build_utils.py
def build_required(deployment: "PipelineDeploymentBase") -> bool:
"""Checks whether a build is required for the deployment and active stack.
Args:
deployment: The deployment for which to check.
Returns:
If a build is required.
"""
stack = Client().active_stack
return bool(stack.get_docker_builds(deployment=deployment))
code_download_possible(deployment, code_repository=None)
Checks whether code download is possible for the deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentBase |
The deployment. |
required |
code_repository |
Optional[BaseCodeRepository] |
If provided, this code repository can be used to download the code inside the container images. |
None |
Returns:
Type | Description |
---|---|
bool |
Whether code download is possible for the deployment. |
Source code in zenml/pipelines/build_utils.py
def code_download_possible(
deployment: "PipelineDeploymentBase",
code_repository: Optional["BaseCodeRepository"] = None,
) -> bool:
"""Checks whether code download is possible for the deployment.
Args:
deployment: The deployment.
code_repository: If provided, this code repository can be used to
download the code inside the container images.
Returns:
Whether code download is possible for the deployment.
"""
for step in deployment.step_configurations.values():
if step.config.docker_settings.allow_download_from_artifact_store:
continue
if (
step.config.docker_settings.allow_download_from_code_repository
and code_repository
):
continue
return False
return True
compute_build_checksum(items, stack, code_repository=None)
Compute an overall checksum for a pipeline build.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
items |
List[BuildConfiguration] |
Items of the build. |
required |
stack |
Stack |
The stack associated with the build. Will be used to gather its requirements. |
required |
code_repository |
Optional[BaseCodeRepository] |
The code repository that will be used to download files inside the build. Will be used for its dependency specification. |
None |
Returns:
Type | Description |
---|---|
str |
The build checksum. |
Source code in zenml/pipelines/build_utils.py
def compute_build_checksum(
items: List["BuildConfiguration"],
stack: "Stack",
code_repository: Optional["BaseCodeRepository"] = None,
) -> str:
"""Compute an overall checksum for a pipeline build.
Args:
items: Items of the build.
stack: The stack associated with the build. Will be used to gather
its requirements.
code_repository: The code repository that will be used to download
files inside the build. Will be used for its dependency
specification.
Returns:
The build checksum.
"""
hash_ = hashlib.md5() # nosec
for item in items:
key = PipelineBuildBase.get_image_key(
component_key=item.key, step=item.step_name
)
settings_checksum = item.compute_settings_checksum(
stack=stack,
code_repository=code_repository,
)
hash_.update(key.encode())
hash_.update(settings_checksum.encode())
return hash_.hexdigest()
compute_stack_checksum(stack)
Compute a stack checksum.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stack |
StackResponse |
The stack for which to compute the checksum. |
required |
Returns:
Type | Description |
---|---|
str |
The checksum. |
Source code in zenml/pipelines/build_utils.py
def compute_stack_checksum(stack: StackResponse) -> str:
"""Compute a stack checksum.
Args:
stack: The stack for which to compute the checksum.
Returns:
The checksum.
"""
hash_ = hashlib.md5() # nosec
# This checksum is used to see if the stack has been updated since a build
# was created for it. We create this checksum not with specific requirements
# as these might change with new ZenML releases, but they don't actually
# invalidate those Docker images.
required_integrations = sorted(
{
component.integration
for components in stack.components.values()
for component in components
if component.integration and component.integration != "built-in"
}
)
for integration in required_integrations:
hash_.update(integration.encode())
return hash_.hexdigest()
create_pipeline_build(deployment, pipeline_id=None, code_repository=None)
Builds images and registers the output in the server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentBase |
The pipeline deployment. |
required |
pipeline_id |
Optional[uuid.UUID] |
The ID of the pipeline. |
None |
code_repository |
Optional[BaseCodeRepository] |
If provided, this code repository will be used to download inside the build images. |
None |
Returns:
Type | Description |
---|---|
Optional[PipelineBuildResponse] |
The build output. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If multiple builds with the same key but different settings were specified. |
Source code in zenml/pipelines/build_utils.py
def create_pipeline_build(
deployment: "PipelineDeploymentBase",
pipeline_id: Optional[UUID] = None,
code_repository: Optional["BaseCodeRepository"] = None,
) -> Optional["PipelineBuildResponse"]:
"""Builds images and registers the output in the server.
Args:
deployment: The pipeline deployment.
pipeline_id: The ID of the pipeline.
code_repository: If provided, this code repository will be used to
download inside the build images.
Returns:
The build output.
Raises:
RuntimeError: If multiple builds with the same key but different
settings were specified.
"""
client = Client()
stack_model = Client().active_stack_model
stack = client.active_stack
required_builds = stack.get_docker_builds(deployment=deployment)
if not required_builds:
logger.debug("No docker builds required.")
return None
logger.info(
"Building Docker image(s) for pipeline `%s`.",
deployment.pipeline_configuration.name,
)
docker_image_builder = PipelineDockerImageBuilder()
images: Dict[str, BuildItem] = {}
checksums: Dict[str, str] = {}
for build_config in required_builds:
combined_key = PipelineBuildBase.get_image_key(
component_key=build_config.key, step=build_config.step_name
)
checksum = build_config.compute_settings_checksum(
stack=stack, code_repository=code_repository
)
if combined_key in images:
previous_checksum = images[combined_key].settings_checksum
if previous_checksum != checksum:
raise RuntimeError(
f"Trying to build image for key `{combined_key}` but "
"an image for this key was already built with a "
"different configuration. This happens if multiple "
"stack components specified Docker builds for the same "
"key in the `StackComponent.get_docker_builds(...)` "
"method. If you're using custom components, make sure "
"to provide unique keys when returning your build "
"configurations to avoid this error."
)
else:
continue
if checksum in checksums:
item_key = checksums[checksum]
image_name_or_digest = images[item_key].image
contains_code = images[item_key].contains_code
dockerfile = images[item_key].dockerfile
requirements = images[item_key].requirements
else:
tag = deployment.pipeline_configuration.name
if build_config.step_name:
tag += f"-{build_config.step_name}"
tag += f"-{build_config.key}"
include_files = build_config.should_include_files(
code_repository=code_repository,
)
download_files = build_config.should_download_files(
code_repository=code_repository,
)
pass_code_repo = (
build_config.should_download_files_from_code_repository(
code_repository=code_repository
)
)
(
image_name_or_digest,
dockerfile,
requirements,
) = docker_image_builder.build_docker_image(
docker_settings=build_config.settings,
tag=tag,
stack=stack,
include_files=include_files,
download_files=download_files,
entrypoint=build_config.entrypoint,
extra_files=build_config.extra_files,
code_repository=code_repository if pass_code_repo else None,
)
contains_code = include_files
images[combined_key] = BuildItem(
image=image_name_or_digest,
dockerfile=dockerfile,
requirements=requirements,
settings_checksum=checksum,
contains_code=contains_code,
requires_code_download=download_files,
)
checksums[checksum] = combined_key
logger.info("Finished building Docker image(s).")
is_local = stack.container_registry is None
contains_code = any(item.contains_code for item in images.values())
build_checksum = compute_build_checksum(
required_builds, stack=stack, code_repository=code_repository
)
stack_checksum = compute_stack_checksum(stack=stack_model)
build_request = PipelineBuildRequest(
user=client.active_user.id,
workspace=client.active_workspace.id,
stack=stack_model.id,
pipeline=pipeline_id,
is_local=is_local,
contains_code=contains_code,
images=images,
zenml_version=zenml.__version__,
python_version=platform.python_version(),
checksum=build_checksum,
stack_checksum=stack_checksum,
)
return client.zen_store.create_build(build_request)
find_existing_build(deployment, code_repository=None)
Find an existing build for a deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentBase |
The deployment for which to find an existing build. |
required |
code_repository |
Optional[BaseCodeRepository] |
The code repository that will be used to download files in the images. |
None |
Returns:
Type | Description |
---|---|
Optional[PipelineBuildResponse] |
The existing build to reuse if found. |
Source code in zenml/pipelines/build_utils.py
def find_existing_build(
deployment: "PipelineDeploymentBase",
code_repository: Optional["BaseCodeRepository"] = None,
) -> Optional["PipelineBuildResponse"]:
"""Find an existing build for a deployment.
Args:
deployment: The deployment for which to find an existing build.
code_repository: The code repository that will be used to download
files in the images.
Returns:
The existing build to reuse if found.
"""
client = Client()
stack = client.active_stack
if not stack.container_registry:
# There can be no non-local builds that we can reuse if there is no
# container registry in the stack.
return None
python_version_prefix = ".".join(platform.python_version_tuple()[:2])
required_builds = stack.get_docker_builds(deployment=deployment)
if not required_builds:
return None
build_checksum = compute_build_checksum(
required_builds, stack=stack, code_repository=code_repository
)
matches = client.list_builds(
sort_by="desc:created",
size=1,
stack_id=stack.id,
# Until we implement stack versioning, users can still update their
# stack to update/remove the container registry. In that case, we might
# try to pull an image from a container registry that we don't have
# access to. This is why we add an additional check for the container
# registry ID here. (This is still not perfect as users can update the
# container registry URI or config, but the best we can do)
container_registry_id=stack.container_registry.id,
# The build is local and it's not clear whether the images
# exist on the current machine or if they've been overwritten.
# TODO: Should we support this by storing the unique Docker ID for
# the image and checking if an image with that ID exists locally?
is_local=False,
# The build contains some code which might be different from the
# local code the user is expecting to run
contains_code=False,
zenml_version=zenml.__version__,
# Match all patch versions of the same Python major + minor
python_version=f"startswith:{python_version_prefix}",
checksum=build_checksum,
)
if not matches.items:
return None
return matches[0]
requires_download_from_code_repository(deployment)
Checks whether the deployment needs to download code from a repository.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentBase |
The deployment. |
required |
Returns:
Type | Description |
---|---|
bool |
If the deployment needs to download code from a code repository. |
Source code in zenml/pipelines/build_utils.py
def requires_download_from_code_repository(
deployment: "PipelineDeploymentBase",
) -> bool:
"""Checks whether the deployment needs to download code from a repository.
Args:
deployment: The deployment.
Returns:
If the deployment needs to download code from a code repository.
"""
for step in deployment.step_configurations.values():
docker_settings = step.config.docker_settings
if docker_settings.allow_download_from_artifact_store:
return False
if docker_settings.allow_including_files_in_images:
return False
if docker_settings.allow_download_from_code_repository:
# The other two options are false, which means download from a
# code repo is required.
return True
return False
requires_included_code(deployment, code_repository=None)
Checks whether the deployment requires included code.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentBase |
The deployment. |
required |
code_repository |
Optional[BaseCodeRepository] |
If provided, this code repository can be used to download the code inside the container images. |
None |
Returns:
Type | Description |
---|---|
bool |
If the deployment requires code included in the container images. |
Source code in zenml/pipelines/build_utils.py
def requires_included_code(
deployment: "PipelineDeploymentBase",
code_repository: Optional["BaseCodeRepository"] = None,
) -> bool:
"""Checks whether the deployment requires included code.
Args:
deployment: The deployment.
code_repository: If provided, this code repository can be used to
download the code inside the container images.
Returns:
If the deployment requires code included in the container images.
"""
for step in deployment.step_configurations.values():
docker_settings = step.config.docker_settings
if docker_settings.allow_download_from_artifact_store:
return False
if docker_settings.allow_download_from_code_repository:
if code_repository:
continue
if docker_settings.allow_including_files_in_images:
return True
return False
reuse_or_create_pipeline_build(deployment, allow_build_reuse, pipeline_id=None, build=None, code_repository=None)
Loads or creates a pipeline build.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentBase |
The pipeline deployment for which to load or create the build. |
required |
allow_build_reuse |
bool |
If True, the build is allowed to reuse an existing build. |
required |
pipeline_id |
Optional[uuid.UUID] |
Optional ID of the pipeline to reference in the build. |
None |
build |
Union[UUID, PipelineBuildBase] |
Optional existing build. If given, the build will be fetched (or registered) in the database. If not given, a new build will be created. |
None |
code_repository |
Optional[BaseCodeRepository] |
If provided, this code repository can be used to download code inside the container images. |
None |
Returns:
Type | Description |
---|---|
Optional[PipelineBuildResponse] |
The build response. |
Source code in zenml/pipelines/build_utils.py
def reuse_or_create_pipeline_build(
deployment: "PipelineDeploymentBase",
allow_build_reuse: bool,
pipeline_id: Optional[UUID] = None,
build: Union["UUID", "PipelineBuildBase", None] = None,
code_repository: Optional["BaseCodeRepository"] = None,
) -> Optional["PipelineBuildResponse"]:
"""Loads or creates a pipeline build.
Args:
deployment: The pipeline deployment for which to load or create the
build.
allow_build_reuse: If True, the build is allowed to reuse an
existing build.
pipeline_id: Optional ID of the pipeline to reference in the build.
build: Optional existing build. If given, the build will be fetched
(or registered) in the database. If not given, a new build will
be created.
code_repository: If provided, this code repository can be used to
download code inside the container images.
Returns:
The build response.
"""
if not build:
if (
allow_build_reuse
and not deployment.should_prevent_build_reuse
and not requires_included_code(
deployment=deployment, code_repository=code_repository
)
and build_required(deployment=deployment)
):
existing_build = find_existing_build(
deployment=deployment, code_repository=code_repository
)
if existing_build:
logger.info(
"Reusing existing build `%s` for stack `%s`.",
existing_build.id,
Client().active_stack.name,
)
return existing_build
else:
logger.info(
"Unable to find a build to reuse. A previous build can be "
"reused when the following conditions are met:\n"
" * The existing build was created for the same stack, "
"ZenML version and Python version\n"
" * The stack contains a container registry\n"
" * The Docker settings of the pipeline and all its steps "
"are the same as for the existing build."
)
return create_pipeline_build(
deployment=deployment,
pipeline_id=pipeline_id,
code_repository=code_repository,
)
if isinstance(build, UUID):
build_model = Client().zen_store.get_build(build_id=build)
else:
build_request = PipelineBuildRequest(
user=Client().active_user.id,
workspace=Client().active_workspace.id,
stack=Client().active_stack_model.id,
pipeline=pipeline_id,
**build.model_dump(),
)
build_model = Client().zen_store.create_build(build=build_request)
verify_custom_build(
build=build_model,
deployment=deployment,
code_repository=code_repository,
)
return build_model
should_upload_code(deployment, build, code_reference)
Checks whether the current code should be uploaded for the deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentBase |
The deployment. |
required |
build |
Optional[zenml.models.v2.core.pipeline_build.PipelineBuildResponse] |
The build for the deployment. |
required |
code_reference |
Optional[zenml.models.v2.core.code_reference.CodeReferenceRequest] |
The code reference for the deployment. |
required |
Returns:
Type | Description |
---|---|
bool |
Whether the current code should be uploaded for the deployment. |
Source code in zenml/pipelines/build_utils.py
def should_upload_code(
deployment: PipelineDeploymentBase,
build: Optional[PipelineBuildResponse],
code_reference: Optional[CodeReferenceRequest],
) -> bool:
"""Checks whether the current code should be uploaded for the deployment.
Args:
deployment: The deployment.
build: The build for the deployment.
code_reference: The code reference for the deployment.
Returns:
Whether the current code should be uploaded for the deployment.
"""
if not build:
# No build means we don't need to download code into a Docker container
# for step execution. In other remote orchestrators that don't use
# Docker containers but instead use e.g. Wheels to run, the code should
# already be included.
return False
for step in deployment.step_configurations.values():
docker_settings = step.config.docker_settings
if (
code_reference
and docker_settings.allow_download_from_code_repository
):
# No upload needed for this step
continue
if docker_settings.allow_download_from_artifact_store:
return True
return False
verify_custom_build(build, deployment, code_repository=None)
Verify a custom build for a pipeline deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
build |
PipelineBuildResponse |
The build to verify. |
required |
deployment |
PipelineDeploymentBase |
The deployment for which to verify the build. |
required |
code_repository |
Optional[BaseCodeRepository] |
Code repository that will be used to download files for the deployment. |
None |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the build can't be used for the deployment. |
Source code in zenml/pipelines/build_utils.py
def verify_custom_build(
build: "PipelineBuildResponse",
deployment: "PipelineDeploymentBase",
code_repository: Optional["BaseCodeRepository"] = None,
) -> None:
"""Verify a custom build for a pipeline deployment.
Args:
build: The build to verify.
deployment: The deployment for which to verify the build.
code_repository: Code repository that will be used to download files
for the deployment.
Raises:
RuntimeError: If the build can't be used for the deployment.
"""
stack = Client().active_stack
required_builds = stack.get_docker_builds(deployment=deployment)
if build.stack and build.stack.id != stack.id:
logger.warning(
"The stack `%s` used for the build `%s` is not the same as the "
"stack `%s` that the pipeline will run on. This could lead "
"to issues if the stacks have different build requirements.",
build.stack.name,
build.id,
stack.name,
)
if build.contains_code:
logger.warning(
"The build you specified for this run contains code and will run "
"with the step code that was included in the Docker images which "
"might differ from the local code in your client environment."
)
if build.requires_code_download:
if requires_included_code(
deployment=deployment, code_repository=code_repository
):
raise RuntimeError(
"The `DockerSettings` of the pipeline or one of its "
"steps specify that code should be included in the Docker "
"image, but the build you "
"specified requires code download. Either update your "
"`DockerSettings` or specify a different build and try "
"again."
)
if (
requires_download_from_code_repository(deployment=deployment)
and not code_repository
):
raise RuntimeError(
"The `DockerSettings` of the pipeline or one of its "
"steps specify that code should be downloaded from a "
"code repository but "
"there is no code repository active at your current source "
f"root `{source_utils.get_source_root()}`."
)
if not code_download_possible(
deployment=deployment, code_repository=code_repository
):
raise RuntimeError(
"The `DockerSettings` of the pipeline or one of its "
"steps specify that code can not be downloaded from the "
"artifact store, but the build you specified requires code "
"download. Either update your `DockerSettings` or specify a "
"different build and try again."
)
if build.checksum:
build_checksum = compute_build_checksum(
required_builds, stack=stack, code_repository=code_repository
)
if build_checksum != build.checksum:
logger.warning(
"The Docker settings used for the build `%s` are "
"not the same as currently specified for your pipeline. "
"This means that the build you specified to run this "
"pipeline might be outdated and most likely contains "
"outdated requirements.",
build.id,
)
else:
# No checksum given for the entire build, we manually check that
# all the images exist and the setting match
for build_config in required_builds:
try:
image = build.get_image(
component_key=build_config.key,
step=build_config.step_name,
)
except KeyError:
raise RuntimeError(
"The build you specified is missing an image for key: "
f"{build_config.key}."
)
if build_config.compute_settings_checksum(
stack=stack, code_repository=code_repository
) != build.get_settings_checksum(
component_key=build_config.key, step=build_config.step_name
):
logger.warning(
"The Docker settings used to build the image `%s` are "
"not the same as currently specified for your pipeline. "
"This means that the build you specified to run this "
"pipeline might be outdated and most likely contains "
"outdated code or requirements.",
image,
)
if build.is_local:
logger.warning(
"You manually specified a local build to run your pipeline. "
"This might lead to errors if the images don't exist on "
"your local machine or the image tags have been "
"overwritten since the original build happened."
)
verify_local_repository_context(deployment, local_repo_context)
Verifies the local repository.
If the local repository exists and has no local changes, code download inside the images is possible.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentBase |
The pipeline deployment. |
required |
local_repo_context |
Optional[LocalRepositoryContext] |
The local repository active at the source root. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the deployment requires code download but code download is not possible. |
Returns:
Type | Description |
---|---|
Optional[zenml.code_repositories.base_code_repository.BaseCodeRepository] |
The code repository from which to download files for the runs of the deployment, or None if code download is not possible. |
Source code in zenml/pipelines/build_utils.py
def verify_local_repository_context(
deployment: "PipelineDeploymentBase",
local_repo_context: Optional["LocalRepositoryContext"],
) -> Optional[BaseCodeRepository]:
"""Verifies the local repository.
If the local repository exists and has no local changes, code download
inside the images is possible.
Args:
deployment: The pipeline deployment.
local_repo_context: The local repository active at the source root.
Raises:
RuntimeError: If the deployment requires code download but code download
is not possible.
Returns:
The code repository from which to download files for the runs of the
deployment, or None if code download is not possible.
"""
if build_required(deployment=deployment):
if requires_download_from_code_repository(deployment=deployment):
if not local_repo_context:
raise RuntimeError(
"The `DockerSettings` of the pipeline or one of its "
"steps specify that code should be downloaded from a "
"code repository, but "
"there is no code repository active at your current source "
f"root `{source_utils.get_source_root()}`."
)
elif local_repo_context.is_dirty:
raise RuntimeError(
"The `DockerSettings` of the pipeline or one of its "
"steps specify that code should be downloaded from a "
"code repository, but "
"the code repository active at your current source root "
f"`{source_utils.get_source_root()}` has uncommitted "
"changes."
)
elif local_repo_context.has_local_changes:
raise RuntimeError(
"The `DockerSettings` of the pipeline or one of its "
"steps specify that code should be downloaded from a "
"code repository, but "
"the code repository active at your current source root "
f"`{source_utils.get_source_root()}` has unpushed "
"changes."
)
if local_repo_context:
if local_repo_context.is_dirty:
logger.warning(
"Unable to use code repository to download code for this "
"run as there are uncommitted changes."
)
elif local_repo_context.has_local_changes:
logger.warning(
"Unable to use code repository to download code for this "
"run as there are unpushed changes."
)
code_repository = None
if local_repo_context and not local_repo_context.has_local_changes:
model = Client().get_code_repository(
local_repo_context.code_repository_id
)
code_repository = BaseCodeRepository.from_model(model)
return code_repository
pipeline_context
Pipeline context class.
PipelineContext
Provides pipeline configuration context.
Usage example:
from zenml import get_pipeline_context
...
@pipeline(
extra={
"complex_parameter": [
("sklearn.tree", "DecisionTreeClassifier"),
("sklearn.ensemble", "RandomForestClassifier"),
]
}
)
def my_pipeline():
context = get_pipeline_context()
after = []
search_steps_prefix = "hp_tuning_search_"
for i, model_search_configuration in enumerate(
context.extra["complex_parameter"]
):
step_name = f"{search_steps_prefix}{i}"
cross_validation(
model_package=model_search_configuration[0],
model_class=model_search_configuration[1],
id=step_name
)
after.append(step_name)
select_best_model(
search_steps_prefix=search_steps_prefix,
after=after,
)
Source code in zenml/pipelines/pipeline_context.py
class PipelineContext:
"""Provides pipeline configuration context.
Usage example:
```python
from zenml import get_pipeline_context
...
@pipeline(
extra={
"complex_parameter": [
("sklearn.tree", "DecisionTreeClassifier"),
("sklearn.ensemble", "RandomForestClassifier"),
]
}
)
def my_pipeline():
context = get_pipeline_context()
after = []
search_steps_prefix = "hp_tuning_search_"
for i, model_search_configuration in enumerate(
context.extra["complex_parameter"]
):
step_name = f"{search_steps_prefix}{i}"
cross_validation(
model_package=model_search_configuration[0],
model_class=model_search_configuration[1],
id=step_name
)
after.append(step_name)
select_best_model(
search_steps_prefix=search_steps_prefix,
after=after,
)
```
"""
def __init__(self, pipeline_configuration: "PipelineConfiguration"):
"""Initialize the context of the current pipeline.
Args:
pipeline_configuration: The configuration of the pipeline derived
from Pipeline class.
"""
self.name = pipeline_configuration.name
self.enable_cache = pipeline_configuration.enable_cache
self.enable_artifact_metadata = (
pipeline_configuration.enable_artifact_metadata
)
self.enable_artifact_visualization = (
pipeline_configuration.enable_artifact_visualization
)
self.enable_step_logs = pipeline_configuration.enable_step_logs
self.settings = pipeline_configuration.settings
self.extra = pipeline_configuration.extra
self.model = pipeline_configuration.model
__init__(self, pipeline_configuration)
special
Initialize the context of the current pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_configuration |
PipelineConfiguration |
The configuration of the pipeline derived from Pipeline class. |
required |
Source code in zenml/pipelines/pipeline_context.py
def __init__(self, pipeline_configuration: "PipelineConfiguration"):
"""Initialize the context of the current pipeline.
Args:
pipeline_configuration: The configuration of the pipeline derived
from Pipeline class.
"""
self.name = pipeline_configuration.name
self.enable_cache = pipeline_configuration.enable_cache
self.enable_artifact_metadata = (
pipeline_configuration.enable_artifact_metadata
)
self.enable_artifact_visualization = (
pipeline_configuration.enable_artifact_visualization
)
self.enable_step_logs = pipeline_configuration.enable_step_logs
self.settings = pipeline_configuration.settings
self.extra = pipeline_configuration.extra
self.model = pipeline_configuration.model
get_pipeline_context()
Get the context of the current pipeline.
Returns:
Type | Description |
---|---|
PipelineContext |
The context of the current pipeline. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If no active pipeline is found. |
RuntimeError |
If inside a running step. |
Source code in zenml/pipelines/pipeline_context.py
def get_pipeline_context() -> "PipelineContext":
"""Get the context of the current pipeline.
Returns:
The context of the current pipeline.
Raises:
RuntimeError: If no active pipeline is found.
RuntimeError: If inside a running step.
"""
from zenml.pipelines.pipeline_definition import Pipeline
if Pipeline.ACTIVE_PIPELINE is None:
try:
from zenml.steps.step_context import get_step_context
get_step_context()
except RuntimeError:
raise RuntimeError("No active pipeline found.")
else:
raise RuntimeError(
"Inside a step use `from zenml import get_step_context` "
"instead."
)
return PipelineContext(
pipeline_configuration=Pipeline.ACTIVE_PIPELINE.configuration
)
pipeline_decorator
ZenML pipeline decorator definition.
pipeline(_func=None, *, name=None, enable_cache=None, enable_artifact_metadata=None, enable_step_logs=None, settings=None, tags=None, extra=None, on_failure=None, on_success=None, model=None, substitutions=None)
Decorator to create a pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
_func |
Optional[F] |
The decorated function. |
None |
name |
Optional[str] |
The name of the pipeline. If left empty, the name of the decorated function will be used as a fallback. |
None |
enable_cache |
Optional[bool] |
Whether to use caching or not. |
None |
enable_artifact_metadata |
Optional[bool] |
Whether to enable artifact metadata or not. |
None |
enable_step_logs |
Optional[bool] |
If step logs should be enabled for this pipeline. |
None |
settings |
Optional[Dict[str, SettingsOrDict]] |
Settings for this pipeline. |
None |
tags |
Optional[List[str]] |
Tags to apply to runs of the pipeline. |
None |
extra |
Optional[Dict[str, Any]] |
Extra configurations for this pipeline. |
None |
on_failure |
Optional[HookSpecification] |
Callback function in event of failure of the step. Can be a
function with a single argument of type |
None |
on_success |
Optional[HookSpecification] |
Callback function in event of success of the step. Can be a
function with no arguments, or a source path to such a function
(e.g. |
None |
model |
Optional[Model] |
configuration of the model in the Model Control Plane. |
None |
substitutions |
Optional[Dict[str, str]] |
Extra placeholders to use in the name templates. |
None |
Returns:
Type | Description |
---|---|
Union[Pipeline, Callable[[F], Pipeline]] |
A pipeline instance. |
Source code in zenml/pipelines/pipeline_decorator.py
def pipeline(
_func: Optional["F"] = None,
*,
name: Optional[str] = None,
enable_cache: Optional[bool] = None,
enable_artifact_metadata: Optional[bool] = None,
enable_step_logs: Optional[bool] = None,
settings: Optional[Dict[str, "SettingsOrDict"]] = None,
tags: Optional[List[str]] = None,
extra: Optional[Dict[str, Any]] = None,
on_failure: Optional["HookSpecification"] = None,
on_success: Optional["HookSpecification"] = None,
model: Optional["Model"] = None,
substitutions: Optional[Dict[str, str]] = None,
) -> Union["Pipeline", Callable[["F"], "Pipeline"]]:
"""Decorator to create a pipeline.
Args:
_func: The decorated function.
name: The name of the pipeline. If left empty, the name of the
decorated function will be used as a fallback.
enable_cache: Whether to use caching or not.
enable_artifact_metadata: Whether to enable artifact metadata or not.
enable_step_logs: If step logs should be enabled for this pipeline.
settings: Settings for this pipeline.
tags: Tags to apply to runs of the pipeline.
extra: Extra configurations for this pipeline.
on_failure: Callback function in event of failure of the step. Can be a
function with a single argument of type `BaseException`, or a source
path to such a function (e.g. `module.my_function`).
on_success: Callback function in event of success of the step. Can be a
function with no arguments, or a source path to such a function
(e.g. `module.my_function`).
model: configuration of the model in the Model Control Plane.
substitutions: Extra placeholders to use in the name templates.
Returns:
A pipeline instance.
"""
def inner_decorator(func: "F") -> "Pipeline":
from zenml.pipelines.pipeline_definition import Pipeline
p = Pipeline(
name=name or func.__name__,
enable_cache=enable_cache,
enable_artifact_metadata=enable_artifact_metadata,
enable_step_logs=enable_step_logs,
settings=settings,
tags=tags,
extra=extra,
on_failure=on_failure,
on_success=on_success,
model=model,
entrypoint=func,
substitutions=substitutions,
)
p.__doc__ = func.__doc__
return p
return inner_decorator if _func is None else inner_decorator(_func)
pipeline_definition
Definition of a ZenML pipeline.
Pipeline
ZenML pipeline class.
Source code in zenml/pipelines/pipeline_definition.py
class Pipeline:
"""ZenML pipeline class."""
# The active pipeline is the pipeline to which step invocations will be
# added when a step is called. It is set using a context manager when a
# pipeline is called (see Pipeline.__call__ for more context)
ACTIVE_PIPELINE: ClassVar[Optional["Pipeline"]] = None
def __init__(
self,
name: str,
entrypoint: F,
enable_cache: Optional[bool] = None,
enable_artifact_metadata: Optional[bool] = None,
enable_artifact_visualization: Optional[bool] = None,
enable_step_logs: Optional[bool] = None,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
tags: Optional[List[str]] = None,
extra: Optional[Dict[str, Any]] = None,
on_failure: Optional["HookSpecification"] = None,
on_success: Optional["HookSpecification"] = None,
model: Optional["Model"] = None,
substitutions: Optional[Dict[str, str]] = None,
) -> None:
"""Initializes a pipeline.
Args:
name: The name of the pipeline.
entrypoint: The entrypoint function of the pipeline.
enable_cache: If caching should be enabled for this pipeline.
enable_artifact_metadata: If artifact metadata should be enabled for
this pipeline.
enable_artifact_visualization: If artifact visualization should be
enabled for this pipeline.
enable_step_logs: If step logs should be enabled for this pipeline.
settings: Settings for this pipeline.
tags: Tags to apply to runs of this pipeline.
extra: Extra configurations for this pipeline.
on_failure: Callback function in event of failure of the step. Can
be a function with a single argument of type `BaseException`, or
a source path to such a function (e.g. `module.my_function`).
on_success: Callback function in event of success of the step. Can
be a function with no arguments, or a source path to such a
function (e.g. `module.my_function`).
model: configuration of the model in the Model Control Plane.
substitutions: Extra placeholders to use in the name templates.
"""
self._invocations: Dict[str, StepInvocation] = {}
self._run_args: Dict[str, Any] = {}
self._configuration = PipelineConfiguration(
name=name,
)
self._from_config_file: Dict[str, Any] = {}
with self.__suppress_configure_warnings__():
self.configure(
enable_cache=enable_cache,
enable_artifact_metadata=enable_artifact_metadata,
enable_artifact_visualization=enable_artifact_visualization,
enable_step_logs=enable_step_logs,
settings=settings,
tags=tags,
extra=extra,
on_failure=on_failure,
on_success=on_success,
model=model,
substitutions=substitutions,
)
self.entrypoint = entrypoint
self._parameters: Dict[str, Any] = {}
self.__suppress_warnings_flag__ = False
@property
def name(self) -> str:
"""The name of the pipeline.
Returns:
The name of the pipeline.
"""
return self.configuration.name
@property
def enable_cache(self) -> Optional[bool]:
"""If caching is enabled for the pipeline.
Returns:
If caching is enabled for the pipeline.
"""
return self.configuration.enable_cache
@property
def configuration(self) -> PipelineConfiguration:
"""The configuration of the pipeline.
Returns:
The configuration of the pipeline.
"""
return self._configuration
@property
def invocations(self) -> Dict[str, StepInvocation]:
"""Returns the step invocations of this pipeline.
This dictionary will only be populated once the pipeline has been
called.
Returns:
The step invocations.
"""
return self._invocations
def resolve(self) -> "Source":
"""Resolves the pipeline.
Returns:
The pipeline source.
"""
return source_utils.resolve(self.entrypoint, skip_validation=True)
@property
def source_object(self) -> Any:
"""The source object of this pipeline.
Returns:
The source object of this pipeline.
"""
return self.entrypoint
@property
def source_code(self) -> str:
"""The source code of this pipeline.
Returns:
The source code of this pipeline.
"""
return inspect.getsource(self.source_object)
@property
def model(self) -> "PipelineResponse":
"""Gets the registered pipeline model for this instance.
Returns:
The registered pipeline model.
Raises:
RuntimeError: If the pipeline has not been registered yet.
"""
self._prepare_if_possible()
pipelines = Client().list_pipelines(name=self.name)
if len(pipelines) == 1:
return pipelines.items[0]
raise RuntimeError(
f"Cannot get the model of pipeline '{self.name}' because it has "
f"not been registered yet. Please ensure that the pipeline has "
f"been run or built and try again."
)
@contextmanager
def __suppress_configure_warnings__(self) -> Iterator[Any]:
"""Context manager to suppress warnings in `Pipeline.configure(...)`.
Used to suppress warnings when called from inner code and not user-facing code.
Yields:
Nothing.
"""
self.__suppress_warnings_flag__ = True
yield
self.__suppress_warnings_flag__ = False
def configure(
self,
enable_cache: Optional[bool] = None,
enable_artifact_metadata: Optional[bool] = None,
enable_artifact_visualization: Optional[bool] = None,
enable_step_logs: Optional[bool] = None,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
tags: Optional[List[str]] = None,
extra: Optional[Dict[str, Any]] = None,
on_failure: Optional["HookSpecification"] = None,
on_success: Optional["HookSpecification"] = None,
model: Optional["Model"] = None,
parameters: Optional[Dict[str, Any]] = None,
merge: bool = True,
substitutions: Optional[Dict[str, str]] = None,
) -> Self:
"""Configures the pipeline.
Configuration merging example:
* `merge==True`:
pipeline.configure(extra={"key1": 1})
pipeline.configure(extra={"key2": 2}, merge=True)
pipeline.configuration.extra # {"key1": 1, "key2": 2}
* `merge==False`:
pipeline.configure(extra={"key1": 1})
pipeline.configure(extra={"key2": 2}, merge=False)
pipeline.configuration.extra # {"key2": 2}
Args:
enable_cache: If caching should be enabled for this pipeline.
enable_artifact_metadata: If artifact metadata should be enabled for
this pipeline.
enable_artifact_visualization: If artifact visualization should be
enabled for this pipeline.
enable_step_logs: If step logs should be enabled for this pipeline.
settings: settings for this pipeline.
tags: Tags to apply to runs of this pipeline.
extra: Extra configurations for this pipeline.
on_failure: Callback function in event of failure of the step. Can
be a function with a single argument of type `BaseException`, or
a source path to such a function (e.g. `module.my_function`).
on_success: Callback function in event of success of the step. Can
be a function with no arguments, or a source path to such a
function (e.g. `module.my_function`).
merge: If `True`, will merge the given dictionary configurations
like `extra` and `settings` with existing
configurations. If `False` the given configurations will
overwrite all existing ones. See the general description of this
method for an example.
model: configuration of the model version in the Model Control Plane.
parameters: input parameters for the pipeline.
substitutions: Extra placeholders to use in the name templates.
Returns:
The pipeline instance that this method was called on.
"""
failure_hook_source = None
if on_failure:
# string of on_failure hook function to be used for this pipeline
failure_hook_source = resolve_and_validate_hook(on_failure)
success_hook_source = None
if on_success:
# string of on_success hook function to be used for this pipeline
success_hook_source = resolve_and_validate_hook(on_success)
if merge and tags and self._configuration.tags:
# Merge tags explicitly here as the recursive update later only
# merges dicts
tags = self._configuration.tags + tags
values = dict_utils.remove_none_values(
{
"enable_cache": enable_cache,
"enable_artifact_metadata": enable_artifact_metadata,
"enable_artifact_visualization": enable_artifact_visualization,
"enable_step_logs": enable_step_logs,
"settings": settings,
"tags": tags,
"extra": extra,
"failure_hook_source": failure_hook_source,
"success_hook_source": success_hook_source,
"model": model,
"parameters": parameters,
"substitutions": substitutions,
}
)
if not self.__suppress_warnings_flag__:
to_be_reapplied = []
for param_, value_ in values.items():
if (
param_ in PipelineRunConfiguration.model_fields
and param_ in self._from_config_file
and value_ != self._from_config_file[param_]
):
to_be_reapplied.append(
(param_, self._from_config_file[param_], value_)
)
if to_be_reapplied:
msg = ""
reapply_during_run_warning = (
"The value of parameter '{name}' has changed from "
"'{file_value}' to '{new_value}' set in your configuration "
"file.\n"
)
for name, file_value, new_value in to_be_reapplied:
msg += reapply_during_run_warning.format(
name=name, file_value=file_value, new_value=new_value
)
msg += (
"Configuration file value will be used during pipeline "
"run, so you change will not be efficient. Consider "
"updating your configuration file instead."
)
logger.warning(msg)
config = PipelineConfigurationUpdate(**values)
self._apply_configuration(config, merge=merge)
return self
@property
def requires_parameters(self) -> bool:
"""If the pipeline entrypoint requires parameters.
Returns:
If the pipeline entrypoint requires parameters.
"""
signature = inspect.signature(self.entrypoint, follow_wrapped=True)
return any(
parameter.default is inspect.Parameter.empty
for parameter in signature.parameters.values()
)
@property
def is_prepared(self) -> bool:
"""If the pipeline is prepared.
Prepared means that the pipeline entrypoint has been called and the
pipeline is fully defined.
Returns:
If the pipeline is prepared.
"""
return len(self.invocations) > 0
def prepare(self, *args: Any, **kwargs: Any) -> None:
"""Prepares the pipeline.
Args:
*args: Pipeline entrypoint input arguments.
**kwargs: Pipeline entrypoint input keyword arguments.
Raises:
RuntimeError: If the pipeline has parameters configured differently in
configuration file and code.
"""
# Clear existing parameters and invocations
self._parameters = {}
self._invocations = {}
conflicting_parameters = {}
parameters_ = (self.configuration.parameters or {}).copy()
if from_file_ := self._from_config_file.get("parameters", None):
parameters_ = dict_utils.recursive_update(parameters_, from_file_)
if parameters_:
for k, v_runtime in kwargs.items():
if k in parameters_:
v_config = parameters_[k]
if v_config != v_runtime:
conflicting_parameters[k] = (v_config, v_runtime)
if conflicting_parameters:
is_plural = "s" if len(conflicting_parameters) > 1 else ""
msg = f"Configured parameter{is_plural} for the pipeline `{self.name}` conflict{'' if not is_plural else 's'} with parameter{is_plural} passed in runtime:\n"
for key, values in conflicting_parameters.items():
msg += f"`{key}`: config=`{values[0]}` | runtime=`{values[1]}`\n"
msg += """This happens, if you define values for pipeline parameters in configuration file and pass same parameters from the code. Example:
```
# config.yaml
parameters:
param_name: value1
# pipeline.py
@pipeline
def pipeline_(param_name: str):
step_name()
if __name__=="__main__":
pipeline_.with_options(config_file="config.yaml")(param_name="value2")
```
To avoid this consider setting pipeline parameters only in one place (config or code).
"""
raise RuntimeError(msg)
for k, v_config in parameters_.items():
if k not in kwargs:
kwargs[k] = v_config
with self:
# Enter the context manager, so we become the active pipeline. This
# means that all steps that get called while the entrypoint function
# is executed will be added as invocation to this pipeline instance.
self._call_entrypoint(*args, **kwargs)
def register(self) -> "PipelineResponse":
"""Register the pipeline in the server.
Returns:
The registered pipeline model.
"""
# Activating the built-in integrations to load all materializers
from zenml.integrations.registry import integration_registry
self._prepare_if_possible()
integration_registry.activate_integrations()
if self.configuration.model_dump(
exclude_defaults=True, exclude={"name"}
):
logger.warning(
f"The pipeline `{self.name}` that you're registering has "
"custom configurations applied to it. These will not be "
"registered with the pipeline and won't be set when you build "
"images or run the pipeline from the CLI. To provide these "
"configurations, use the `--config` option of the `zenml "
"pipeline build/run` commands."
)
return self._register()
def build(
self,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
step_configurations: Optional[
Mapping[str, "StepConfigurationUpdateOrDict"]
] = None,
config_path: Optional[str] = None,
) -> Optional["PipelineBuildResponse"]:
"""Builds Docker images for the pipeline.
Args:
settings: Settings for the pipeline.
step_configurations: Configurations for steps of the pipeline.
config_path: Path to a yaml configuration file. This file will
be parsed as a
`zenml.config.pipeline_configurations.PipelineRunConfiguration`
object. Options provided in this file will be overwritten by
options provided in code using the other arguments of this
method.
Returns:
The build output.
"""
with track_handler(event=AnalyticsEvent.BUILD_PIPELINE):
self._prepare_if_possible()
compile_args = self._run_args.copy()
compile_args.pop("unlisted", None)
compile_args.pop("prevent_build_reuse", None)
if config_path:
compile_args["config_path"] = config_path
if step_configurations:
compile_args["step_configurations"] = step_configurations
if settings:
compile_args["settings"] = settings
deployment, _, _ = self._compile(**compile_args)
pipeline_id = self._register().id
local_repo = code_repository_utils.find_active_code_repository()
code_repository = build_utils.verify_local_repository_context(
deployment=deployment, local_repo_context=local_repo
)
return build_utils.create_pipeline_build(
deployment=deployment,
pipeline_id=pipeline_id,
code_repository=code_repository,
)
def _create_deployment(
self,
*,
run_name: Optional[str] = None,
enable_cache: Optional[bool] = None,
enable_artifact_metadata: Optional[bool] = None,
enable_artifact_visualization: Optional[bool] = None,
enable_step_logs: Optional[bool] = None,
schedule: Optional[Schedule] = None,
build: Union[str, "UUID", "PipelineBuildBase", None] = None,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
step_configurations: Optional[
Mapping[str, "StepConfigurationUpdateOrDict"]
] = None,
extra: Optional[Dict[str, Any]] = None,
config_path: Optional[str] = None,
unlisted: bool = False,
prevent_build_reuse: bool = False,
skip_schedule_registration: bool = False,
) -> PipelineDeploymentResponse:
"""Create a pipeline deployment.
Args:
run_name: Name of the pipeline run.
enable_cache: If caching should be enabled for this pipeline run.
enable_artifact_metadata: If artifact metadata should be enabled
for this pipeline run.
enable_artifact_visualization: If artifact visualization should be
enabled for this pipeline run.
enable_step_logs: If step logs should be enabled for this pipeline.
schedule: Optional schedule to use for the run.
build: Optional build to use for the run.
settings: Settings for this pipeline run.
step_configurations: Configurations for steps of the pipeline.
extra: Extra configurations for this pipeline run.
config_path: Path to a yaml configuration file. This file will
be parsed as a
`zenml.config.pipeline_configurations.PipelineRunConfiguration`
object. Options provided in this file will be overwritten by
options provided in code using the other arguments of this
method.
unlisted: Whether the pipeline run should be unlisted (not assigned
to any pipeline).
prevent_build_reuse: DEPRECATED: Use
`DockerSettings.prevent_build_reuse` instead.
skip_schedule_registration: Whether to skip schedule registration.
Returns:
The pipeline deployment.
Raises:
ValueError: If the orchestrator doesn't support scheduling, but a
schedule was given
"""
deployment, schedule, build = self._compile(
config_path=config_path,
run_name=run_name,
enable_cache=enable_cache,
enable_artifact_metadata=enable_artifact_metadata,
enable_artifact_visualization=enable_artifact_visualization,
enable_step_logs=enable_step_logs,
steps=step_configurations,
settings=settings,
schedule=schedule,
build=build,
extra=extra,
)
skip_pipeline_registration = constants.handle_bool_env_var(
constants.ENV_ZENML_SKIP_PIPELINE_REGISTRATION,
default=False,
)
register_pipeline = not (skip_pipeline_registration or unlisted)
pipeline_id = None
if register_pipeline:
pipeline_id = self._register().id
else:
logger.debug(f"Pipeline {self.name} is unlisted.")
stack = Client().active_stack
stack.validate()
schedule_id = None
if schedule and not skip_schedule_registration:
if not stack.orchestrator.config.is_schedulable:
raise ValueError(
f"Stack {stack.name} does not support scheduling. "
"Not all orchestrator types support scheduling, "
"kindly consult with "
"https://docs.zenml.io/how-to/build-pipelines/schedule-a-pipeline "
"for details."
)
if schedule.name:
schedule_name = schedule.name
else:
schedule_name = format_name_template(
deployment.run_name_template,
substitutions=deployment.pipeline_configuration.substitutions,
)
components = Client().active_stack_model.components
orchestrator = components[StackComponentType.ORCHESTRATOR][0]
schedule_model = ScheduleRequest(
workspace=Client().active_workspace.id,
user=Client().active_user.id,
pipeline_id=pipeline_id,
orchestrator_id=orchestrator.id,
name=schedule_name,
active=True,
cron_expression=schedule.cron_expression,
start_time=schedule.start_time,
end_time=schedule.end_time,
interval_second=schedule.interval_second,
catchup=schedule.catchup,
run_once_start_time=schedule.run_once_start_time,
)
schedule_id = Client().zen_store.create_schedule(schedule_model).id
logger.info(
f"Created schedule `{schedule_name}` for pipeline "
f"`{deployment.pipeline_configuration.name}`."
)
stack = Client().active_stack
stack.validate()
upload_notebook_cell_code_if_necessary(
deployment=deployment, stack=stack
)
local_repo_context = (
code_repository_utils.find_active_code_repository()
)
code_repository = build_utils.verify_local_repository_context(
deployment=deployment, local_repo_context=local_repo_context
)
if prevent_build_reuse:
logger.warning(
"Passing `prevent_build_reuse=True` to "
"`pipeline.with_opitions(...)` is deprecated. Use "
"`DockerSettings.prevent_build_reuse` instead."
)
build_model = build_utils.reuse_or_create_pipeline_build(
deployment=deployment,
pipeline_id=pipeline_id,
allow_build_reuse=not prevent_build_reuse,
build=build,
code_repository=code_repository,
)
build_id = build_model.id if build_model else None
code_reference = None
if local_repo_context and not local_repo_context.is_dirty:
source_root = source_utils.get_source_root()
subdirectory = (
Path(source_root)
.resolve()
.relative_to(local_repo_context.root)
)
code_reference = CodeReferenceRequest(
commit=local_repo_context.current_commit,
subdirectory=subdirectory.as_posix(),
code_repository=local_repo_context.code_repository_id,
)
code_path = None
if build_utils.should_upload_code(
deployment=deployment,
build=build_model,
code_reference=code_reference,
):
code_archive = code_utils.CodeArchive(
root=source_utils.get_source_root()
)
logger.info("Archiving pipeline code...")
code_path = code_utils.upload_code_if_necessary(code_archive)
request = PipelineDeploymentRequest(
user=Client().active_user.id,
workspace=Client().active_workspace.id,
stack=stack.id,
pipeline=pipeline_id,
build=build_id,
schedule=schedule_id,
code_reference=code_reference,
code_path=code_path,
**deployment.model_dump(),
)
return Client().zen_store.create_deployment(deployment=request)
def _run(
self,
) -> Optional[PipelineRunResponse]:
"""Runs the pipeline on the active stack.
Returns:
The pipeline run or `None` if running with a schedule.
"""
if constants.SHOULD_PREVENT_PIPELINE_EXECUTION:
# An environment variable was set to stop the execution of
# pipelines. This is done to prevent execution of module-level
# pipeline.run() calls when importing modules needed to run a step.
logger.info(
"Preventing execution of pipeline '%s'. If this is not "
"intended behavior, make sure to unset the environment "
"variable '%s'.",
self.name,
constants.ENV_ZENML_PREVENT_PIPELINE_EXECUTION,
)
return None
logger.info(f"Initiating a new run for the pipeline: `{self.name}`.")
with track_handler(AnalyticsEvent.RUN_PIPELINE) as analytics_handler:
stack = Client().active_stack
deployment = self._create_deployment(**self._run_args)
self.log_pipeline_deployment_metadata(deployment)
run = create_placeholder_run(deployment=deployment)
analytics_handler.metadata = self._get_pipeline_analytics_metadata(
deployment=deployment,
stack=stack,
run_id=run.id if run else None,
)
if run:
run_url = dashboard_utils.get_run_url(run)
if run_url:
logger.info(f"Dashboard URL for Pipeline Run: {run_url}")
else:
logger.info(
"You can visualize your pipeline runs in the `ZenML "
"Dashboard`. In order to try it locally, please run "
"`zenml login --local`."
)
deploy_pipeline(
deployment=deployment, stack=stack, placeholder_run=run
)
if run:
return Client().get_pipeline_run(run.id)
return None
@staticmethod
def log_pipeline_deployment_metadata(
deployment_model: PipelineDeploymentResponse,
) -> None:
"""Displays logs based on the deployment model upon running a pipeline.
Args:
deployment_model: The model for the pipeline deployment
"""
try:
# Log about the caching status
if deployment_model.pipeline_configuration.enable_cache is False:
logger.info(
f"Caching is disabled by default for "
f"`{deployment_model.pipeline_configuration.name}`."
)
# Log about the used builds
if deployment_model.build:
logger.info("Using a build:")
logger.info(
" Image(s): "
f"{', '.join([i.image for i in deployment_model.build.images.values()])}"
)
# Log about version mismatches between local and build
from zenml import __version__
if deployment_model.build.zenml_version != __version__:
logger.info(
f"ZenML version (different than the local version): "
f"{deployment_model.build.zenml_version}"
)
import platform
if (
deployment_model.build.python_version
!= platform.python_version()
):
logger.info(
f"Python version (different than the local version): "
f"{deployment_model.build.python_version}"
)
# Log about the user, stack and components
if deployment_model.user is not None:
logger.info(f"Using user: `{deployment_model.user.name}`")
if deployment_model.stack is not None:
logger.info(f"Using stack: `{deployment_model.stack.name}`")
for (
component_type,
component_models,
) in deployment_model.stack.components.items():
logger.info(
f" {component_type.value}: `{component_models[0].name}`"
)
except Exception as e:
logger.debug(f"Logging pipeline deployment metadata failed: {e}")
def write_run_configuration_template(
self, path: str, stack: Optional["Stack"] = None
) -> None:
"""Writes a run configuration yaml template.
Args:
path: The path where the template will be written.
stack: The stack for which the template should be generated. If
not given, the active stack will be used.
"""
from zenml.config.base_settings import ConfigurationLevel
from zenml.config.step_configurations import (
PartialArtifactConfiguration,
)
self._prepare_if_possible()
stack = stack or Client().active_stack
setting_classes = stack.setting_classes
setting_classes.update(settings_utils.get_general_settings())
pipeline_settings = {}
step_settings = {}
for key, setting_class in setting_classes.items():
fields = pydantic_utils.TemplateGenerator(setting_class).run()
if ConfigurationLevel.PIPELINE in setting_class.LEVEL:
pipeline_settings[key] = fields
if ConfigurationLevel.STEP in setting_class.LEVEL:
step_settings[key] = fields
steps = {}
for step_name, invocation in self.invocations.items():
step = invocation.step
outputs = {
name: PartialArtifactConfiguration()
for name in step.entrypoint_definition.outputs
}
step_template = StepConfigurationUpdate(
parameters={},
settings=step_settings,
outputs=outputs,
)
steps[step_name] = step_template
run_config = PipelineRunConfiguration(
settings=pipeline_settings, steps=steps
)
template = pydantic_utils.TemplateGenerator(run_config).run()
yaml_string = yaml.dump(template)
yaml_string = yaml_utils.comment_out_yaml(yaml_string)
with open(path, "w") as f:
f.write(yaml_string)
def _apply_configuration(
self,
config: PipelineConfigurationUpdate,
merge: bool = True,
) -> None:
"""Applies an update to the pipeline configuration.
Args:
config: The configuration update.
merge: Whether to merge the updates with the existing configuration
or not. See the `BasePipeline.configure(...)` method for a
detailed explanation.
"""
self._validate_configuration(config)
self._configuration = pydantic_utils.update_model(
self._configuration, update=config, recursive=merge
)
logger.debug("Updated pipeline configuration:")
logger.debug(self._configuration)
@staticmethod
def _validate_configuration(config: PipelineConfigurationUpdate) -> None:
"""Validates a configuration update.
Args:
config: The configuration update to validate.
"""
settings_utils.validate_setting_keys(list(config.settings))
def _get_pipeline_analytics_metadata(
self,
deployment: "PipelineDeploymentResponse",
stack: "Stack",
run_id: Optional[UUID] = None,
) -> Dict[str, Any]:
"""Returns the pipeline deployment metadata.
Args:
deployment: The pipeline deployment to track.
stack: The stack on which the pipeline will be deployed.
run_id: The ID of the pipeline run.
Returns:
the metadata about the pipeline deployment
"""
custom_materializer = False
for step in deployment.step_configurations.values():
for output in step.config.outputs.values():
for source in output.materializer_source:
if not source.is_internal:
custom_materializer = True
stack_creator = Client().get_stack(stack.id).user
active_user = Client().active_user
own_stack = stack_creator and stack_creator.id == active_user.id
stack_metadata = {
component_type.value: component.flavor
for component_type, component in stack.components.items()
}
return {
"store_type": Client().zen_store.type.value,
**stack_metadata,
"total_steps": len(self.invocations),
"schedule": bool(deployment.schedule),
"custom_materializer": custom_materializer,
"own_stack": own_stack,
"pipeline_run_id": str(run_id) if run_id else None,
}
def _compile(
self, config_path: Optional[str] = None, **run_configuration_args: Any
) -> Tuple[
"PipelineDeploymentBase",
Optional["Schedule"],
Union["PipelineBuildBase", UUID, None],
]:
"""Compiles the pipeline.
Args:
config_path: Path to a config file.
**run_configuration_args: Configurations for the pipeline run.
Returns:
A tuple containing the deployment, schedule and build of
the compiled pipeline.
"""
# Activating the built-in integrations to load all materializers
from zenml.integrations.registry import integration_registry
integration_registry.activate_integrations()
_from_config_file = self._parse_config_file(
config_path=config_path,
matcher=list(PipelineRunConfiguration.model_fields.keys()),
)
self._reconfigure_from_file_with_overrides(config_path=config_path)
run_config = PipelineRunConfiguration(**_from_config_file)
new_values = dict_utils.remove_none_values(run_configuration_args)
update = PipelineRunConfiguration.model_validate(new_values)
# Update with the values in code so they take precedence
run_config = pydantic_utils.update_model(run_config, update=update)
run_config = env_utils.substitute_env_variable_placeholders(run_config)
deployment = Compiler().compile(
pipeline=self,
stack=Client().active_stack,
run_configuration=run_config,
)
deployment = env_utils.substitute_env_variable_placeholders(deployment)
return deployment, run_config.schedule, run_config.build
def _register(self) -> "PipelineResponse":
"""Register the pipeline in the server.
Returns:
The registered pipeline model.
"""
client = Client()
def _get() -> PipelineResponse:
matching_pipelines = client.list_pipelines(
name=self.name,
size=1,
sort_by="desc:created",
)
if matching_pipelines.total:
registered_pipeline = matching_pipelines.items[0]
return registered_pipeline
raise RuntimeError("No matching pipelines found.")
try:
return _get()
except RuntimeError:
request = PipelineRequest(
workspace=client.active_workspace.id,
user=client.active_user.id,
name=self.name,
)
try:
registered_pipeline = client.zen_store.create_pipeline(
pipeline=request
)
logger.info(
"Registered new pipeline: `%s`.",
registered_pipeline.name,
)
return registered_pipeline
except EntityExistsError:
return _get()
def _compute_unique_identifier(self, pipeline_spec: PipelineSpec) -> str:
"""Computes a unique identifier from the pipeline spec and steps.
Args:
pipeline_spec: Compiled spec of the pipeline.
Returns:
The unique identifier of the pipeline.
"""
from packaging import version
hash_ = hashlib.md5() # nosec
hash_.update(pipeline_spec.json_with_string_sources.encode())
if version.parse(pipeline_spec.version) >= version.parse("0.4"):
# Only add this for newer versions to keep backwards compatibility
hash_.update(self.source_code.encode())
for step_spec in pipeline_spec.steps:
invocation = self.invocations[step_spec.pipeline_parameter_name]
step_source = invocation.step.source_code
hash_.update(step_source.encode())
return hash_.hexdigest()
def add_step_invocation(
self,
step: "BaseStep",
input_artifacts: Dict[str, StepArtifact],
external_artifacts: Dict[
str, Union["ExternalArtifact", "ArtifactVersionResponse"]
],
model_artifacts_or_metadata: Dict[str, "ModelVersionDataLazyLoader"],
client_lazy_loaders: Dict[str, "ClientLazyLoader"],
parameters: Dict[str, Any],
default_parameters: Dict[str, Any],
upstream_steps: Set[str],
custom_id: Optional[str] = None,
allow_id_suffix: bool = True,
) -> str:
"""Adds a step invocation to the pipeline.
Args:
step: The step for which to add an invocation.
input_artifacts: The input artifacts for the invocation.
external_artifacts: The external artifacts for the invocation.
model_artifacts_or_metadata: The model artifacts or metadata for
the invocation.
client_lazy_loaders: The client lazy loaders for the invocation.
parameters: The parameters for the invocation.
default_parameters: The default parameters for the invocation.
upstream_steps: The upstream steps for the invocation.
custom_id: Custom ID to use for the invocation.
allow_id_suffix: Whether a suffix can be appended to the invocation
ID.
Raises:
RuntimeError: If the method is called on an inactive pipeline.
RuntimeError: If the invocation was called with an artifact from
a different pipeline.
Returns:
The step invocation ID.
"""
if Pipeline.ACTIVE_PIPELINE != self:
raise RuntimeError(
"A step invocation can only be added to an active pipeline."
)
for artifact in input_artifacts.values():
if artifact.pipeline is not self:
raise RuntimeError(
"Got invalid input artifact for invocation of step "
f"{step.name}: The input artifact was produced by a step "
f"inside a different pipeline {artifact.pipeline.name}."
)
invocation_id = self._compute_invocation_id(
step=step, custom_id=custom_id, allow_suffix=allow_id_suffix
)
invocation = StepInvocation(
id=invocation_id,
step=step,
input_artifacts=input_artifacts,
external_artifacts=external_artifacts,
model_artifacts_or_metadata=model_artifacts_or_metadata,
client_lazy_loaders=client_lazy_loaders,
parameters=parameters,
default_parameters=default_parameters,
upstream_steps=upstream_steps,
pipeline=self,
)
self._invocations[invocation_id] = invocation
return invocation_id
def _compute_invocation_id(
self,
step: "BaseStep",
custom_id: Optional[str] = None,
allow_suffix: bool = True,
) -> str:
"""Compute the invocation ID.
Args:
step: The step for which to compute the ID.
custom_id: Custom ID to use for the invocation.
allow_suffix: Whether a suffix can be appended to the invocation
ID.
Raises:
RuntimeError: If no ID suffix is allowed and an invocation for the
same ID already exists.
RuntimeError: If no unique invocation ID can be found.
Returns:
The invocation ID.
"""
base_id = id_ = custom_id or step.name
if id_ not in self.invocations:
return id_
if not allow_suffix:
raise RuntimeError("Duplicate step ID")
for index in range(2, 10000):
id_ = f"{base_id}_{index}"
if id_ not in self.invocations:
return id_
raise RuntimeError("Unable to find step ID")
def __enter__(self) -> Self:
"""Activate the pipeline context.
Raises:
RuntimeError: If a different pipeline is already active.
Returns:
The pipeline instance.
"""
if Pipeline.ACTIVE_PIPELINE:
raise RuntimeError(
"Unable to enter pipeline context. A different pipeline "
f"{Pipeline.ACTIVE_PIPELINE.name} is already active."
)
Pipeline.ACTIVE_PIPELINE = self
return self
def __exit__(self, *args: Any) -> None:
"""Deactivates the pipeline context.
Args:
*args: The arguments passed to the context exit handler.
"""
Pipeline.ACTIVE_PIPELINE = None
def _parse_config_file(
self, config_path: Optional[str], matcher: List[str]
) -> Dict[str, Any]:
"""Parses the given configuration file and sets `self._from_config_file`.
Args:
config_path: Path to a yaml configuration file.
matcher: List of keys to match in the configuration file.
Returns:
Parsed config file according to matcher settings.
"""
_from_config_file: Dict[str, Any] = {}
if config_path:
with open(config_path, "r") as f:
_from_config_file = yaml.load(f, Loader=yaml.SafeLoader)
_from_config_file = dict_utils.remove_none_values(
{k: v for k, v in _from_config_file.items() if k in matcher}
)
if "model" in _from_config_file:
if "model" in self._from_config_file:
_from_config_file["model"] = self._from_config_file[
"model"
]
else:
from zenml.model.model import Model
_from_config_file["model"] = Model.model_validate(
_from_config_file["model"]
)
return _from_config_file
def with_options(
self,
run_name: Optional[str] = None,
schedule: Optional[Schedule] = None,
build: Union[str, "UUID", "PipelineBuildBase", None] = None,
step_configurations: Optional[
Mapping[str, "StepConfigurationUpdateOrDict"]
] = None,
steps: Optional[Mapping[str, "StepConfigurationUpdateOrDict"]] = None,
config_path: Optional[str] = None,
unlisted: bool = False,
prevent_build_reuse: bool = False,
**kwargs: Any,
) -> "Pipeline":
"""Copies the pipeline and applies the given configurations.
Args:
run_name: Name of the pipeline run.
schedule: Optional schedule to use for the run.
build: Optional build to use for the run.
step_configurations: Configurations for steps of the pipeline.
steps: Configurations for steps of the pipeline. This is equivalent
to `step_configurations`, and will be ignored if
`step_configurations` is set as well.
config_path: Path to a yaml configuration file. This file will
be parsed as a
`zenml.config.pipeline_configurations.PipelineRunConfiguration`
object. Options provided in this file will be overwritten by
options provided in code using the other arguments of this
method.
unlisted: Whether the pipeline run should be unlisted (not assigned
to any pipeline).
prevent_build_reuse: DEPRECATED: Use
`DockerSettings.prevent_build_reuse` instead.
**kwargs: Pipeline configuration options. These will be passed
to the `pipeline.configure(...)` method.
Returns:
The copied pipeline instance.
"""
if steps and step_configurations:
logger.warning(
"Step configurations were passed using both the "
"`step_configurations` and `steps` keywords, ignoring the "
"values passed using the `steps` keyword."
)
pipeline_copy = self.copy()
pipeline_copy._reconfigure_from_file_with_overrides(
config_path=config_path, **kwargs
)
run_args = dict_utils.remove_none_values(
{
"run_name": run_name,
"schedule": schedule,
"build": build,
"step_configurations": step_configurations or steps,
"config_path": config_path,
"unlisted": unlisted,
"prevent_build_reuse": prevent_build_reuse,
}
)
pipeline_copy._run_args.update(run_args)
return pipeline_copy
def copy(self) -> "Pipeline":
"""Copies the pipeline.
Returns:
The pipeline copy.
"""
return copy.deepcopy(self)
def __call__(
self, *args: Any, **kwargs: Any
) -> Optional[PipelineRunResponse]:
"""Handle a call of the pipeline.
This method does one of two things:
* If there is an active pipeline context, it calls the pipeline
entrypoint function within that context and the step invocations
will be added to the active pipeline.
* If no pipeline is active, it activates this pipeline before calling
the entrypoint function.
Args:
*args: Entrypoint function arguments.
**kwargs: Entrypoint function keyword arguments.
Returns:
If called within another pipeline, returns the outputs of the
`entrypoint` method. Otherwise, returns the pipeline run or `None`
if running with a schedule.
"""
if Pipeline.ACTIVE_PIPELINE:
# Calling a pipeline inside a pipeline, we return the potential
# outputs of the entrypoint function
# TODO: This currently ignores the configuration of the pipeline
# and instead applies the configuration of the previously active
# pipeline. Is this what we want?
return self.entrypoint(*args, **kwargs)
self.prepare(*args, **kwargs)
return self._run()
def _call_entrypoint(self, *args: Any, **kwargs: Any) -> None:
"""Calls the pipeline entrypoint function with the given arguments.
Args:
*args: Entrypoint function arguments.
**kwargs: Entrypoint function keyword arguments.
Raises:
ValueError: If an input argument is missing or not JSON
serializable.
"""
try:
validated_args = pydantic_utils.validate_function_args(
self.entrypoint,
ConfigDict(arbitrary_types_allowed=False),
*args,
**kwargs,
)
except ValidationError as e:
raise ValueError(
"Invalid or missing pipeline function entrypoint arguments. "
"Only JSON serializable inputs are allowed as pipeline inputs."
"Check out the pydantic error above for more details."
) from e
self._parameters = validated_args
self.entrypoint(**validated_args)
def _prepare_if_possible(self) -> None:
"""Prepares the pipeline if possible.
Raises:
RuntimeError: If the pipeline is not prepared and the preparation
requires parameters.
"""
if not self.is_prepared:
if self.requires_parameters:
raise RuntimeError(
f"Failed while trying to prepare pipeline {self.name}. "
"The entrypoint function of the pipeline requires "
"arguments. Please prepare the pipeline by calling "
"`pipeline_instance.prepare(...)` and try again."
)
else:
self.prepare()
def create_run_template(
self, name: str, **kwargs: Any
) -> RunTemplateResponse:
"""Create a run template for the pipeline.
Args:
name: The name of the run template.
**kwargs: Keyword arguments for the client method to create a run
template.
Returns:
The created run template.
"""
self._prepare_if_possible()
deployment = self._create_deployment(
**self._run_args, skip_schedule_registration=True
)
return Client().create_run_template(
name=name, deployment_id=deployment.id, **kwargs
)
def _reconfigure_from_file_with_overrides(
self,
config_path: Optional[str] = None,
**kwargs: Any,
) -> None:
"""Update the pipeline configuration from config file.
Accepts overrides as kwargs.
Args:
config_path: Path to a yaml configuration file. This file will
be parsed as a
`zenml.config.pipeline_configurations.PipelineRunConfiguration`
object. Options provided in this file will be overwritten by
options provided in code using the other arguments of this
method.
**kwargs: Pipeline configuration options. These will be passed
to the `pipeline.configure(...)` method.
"""
self._from_config_file = {}
if config_path:
self._from_config_file = self._parse_config_file(
config_path=config_path,
matcher=inspect.getfullargspec(self.configure)[0],
)
_from_config_file = dict_utils.recursive_update(
self._from_config_file, kwargs
)
with self.__suppress_configure_warnings__():
self.configure(**_from_config_file)
configuration: PipelineConfiguration
property
readonly
The configuration of the pipeline.
Returns:
Type | Description |
---|---|
PipelineConfiguration |
The configuration of the pipeline. |
enable_cache: Optional[bool]
property
readonly
If caching is enabled for the pipeline.
Returns:
Type | Description |
---|---|
Optional[bool] |
If caching is enabled for the pipeline. |
invocations: Dict[str, zenml.steps.step_invocation.StepInvocation]
property
readonly
Returns the step invocations of this pipeline.
This dictionary will only be populated once the pipeline has been called.
Returns:
Type | Description |
---|---|
Dict[str, zenml.steps.step_invocation.StepInvocation] |
The step invocations. |
is_prepared: bool
property
readonly
If the pipeline is prepared.
Prepared means that the pipeline entrypoint has been called and the pipeline is fully defined.
Returns:
Type | Description |
---|---|
bool |
If the pipeline is prepared. |
model: PipelineResponse
property
readonly
Gets the registered pipeline model for this instance.
Returns:
Type | Description |
---|---|
PipelineResponse |
The registered pipeline model. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the pipeline has not been registered yet. |
name: str
property
readonly
The name of the pipeline.
Returns:
Type | Description |
---|---|
str |
The name of the pipeline. |
requires_parameters: bool
property
readonly
If the pipeline entrypoint requires parameters.
Returns:
Type | Description |
---|---|
bool |
If the pipeline entrypoint requires parameters. |
source_code: str
property
readonly
The source code of this pipeline.
Returns:
Type | Description |
---|---|
str |
The source code of this pipeline. |
source_object: Any
property
readonly
The source object of this pipeline.
Returns:
Type | Description |
---|---|
Any |
The source object of this pipeline. |
__call__(self, *args, **kwargs)
special
Handle a call of the pipeline.
This method does one of two things: * If there is an active pipeline context, it calls the pipeline entrypoint function within that context and the step invocations will be added to the active pipeline. * If no pipeline is active, it activates this pipeline before calling the entrypoint function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
Entrypoint function arguments. |
() |
**kwargs |
Any |
Entrypoint function keyword arguments. |
{} |
Returns:
Type | Description |
---|---|
Optional[zenml.models.v2.core.pipeline_run.PipelineRunResponse] |
If called within another pipeline, returns the outputs of the
|
Source code in zenml/pipelines/pipeline_definition.py
def __call__(
self, *args: Any, **kwargs: Any
) -> Optional[PipelineRunResponse]:
"""Handle a call of the pipeline.
This method does one of two things:
* If there is an active pipeline context, it calls the pipeline
entrypoint function within that context and the step invocations
will be added to the active pipeline.
* If no pipeline is active, it activates this pipeline before calling
the entrypoint function.
Args:
*args: Entrypoint function arguments.
**kwargs: Entrypoint function keyword arguments.
Returns:
If called within another pipeline, returns the outputs of the
`entrypoint` method. Otherwise, returns the pipeline run or `None`
if running with a schedule.
"""
if Pipeline.ACTIVE_PIPELINE:
# Calling a pipeline inside a pipeline, we return the potential
# outputs of the entrypoint function
# TODO: This currently ignores the configuration of the pipeline
# and instead applies the configuration of the previously active
# pipeline. Is this what we want?
return self.entrypoint(*args, **kwargs)
self.prepare(*args, **kwargs)
return self._run()
__enter__(self)
special
Activate the pipeline context.
Exceptions:
Type | Description |
---|---|
RuntimeError |
If a different pipeline is already active. |
Returns:
Type | Description |
---|---|
Self |
The pipeline instance. |
Source code in zenml/pipelines/pipeline_definition.py
def __enter__(self) -> Self:
"""Activate the pipeline context.
Raises:
RuntimeError: If a different pipeline is already active.
Returns:
The pipeline instance.
"""
if Pipeline.ACTIVE_PIPELINE:
raise RuntimeError(
"Unable to enter pipeline context. A different pipeline "
f"{Pipeline.ACTIVE_PIPELINE.name} is already active."
)
Pipeline.ACTIVE_PIPELINE = self
return self
__exit__(self, *args)
special
Deactivates the pipeline context.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
The arguments passed to the context exit handler. |
() |
Source code in zenml/pipelines/pipeline_definition.py
def __exit__(self, *args: Any) -> None:
"""Deactivates the pipeline context.
Args:
*args: The arguments passed to the context exit handler.
"""
Pipeline.ACTIVE_PIPELINE = None
__init__(self, name, entrypoint, enable_cache=None, enable_artifact_metadata=None, enable_artifact_visualization=None, enable_step_logs=None, settings=None, tags=None, extra=None, on_failure=None, on_success=None, model=None, substitutions=None)
special
Initializes a pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the pipeline. |
required |
entrypoint |
~F |
The entrypoint function of the pipeline. |
required |
enable_cache |
Optional[bool] |
If caching should be enabled for this pipeline. |
None |
enable_artifact_metadata |
Optional[bool] |
If artifact metadata should be enabled for this pipeline. |
None |
enable_artifact_visualization |
Optional[bool] |
If artifact visualization should be enabled for this pipeline. |
None |
enable_step_logs |
Optional[bool] |
If step logs should be enabled for this pipeline. |
None |
settings |
Optional[Mapping[str, SettingsOrDict]] |
Settings for this pipeline. |
None |
tags |
Optional[List[str]] |
Tags to apply to runs of this pipeline. |
None |
extra |
Optional[Dict[str, Any]] |
Extra configurations for this pipeline. |
None |
on_failure |
Optional[HookSpecification] |
Callback function in event of failure of the step. Can
be a function with a single argument of type |
None |
on_success |
Optional[HookSpecification] |
Callback function in event of success of the step. Can
be a function with no arguments, or a source path to such a
function (e.g. |
None |
model |
Optional[Model] |
configuration of the model in the Model Control Plane. |
None |
substitutions |
Optional[Dict[str, str]] |
Extra placeholders to use in the name templates. |
None |
Source code in zenml/pipelines/pipeline_definition.py
def __init__(
self,
name: str,
entrypoint: F,
enable_cache: Optional[bool] = None,
enable_artifact_metadata: Optional[bool] = None,
enable_artifact_visualization: Optional[bool] = None,
enable_step_logs: Optional[bool] = None,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
tags: Optional[List[str]] = None,
extra: Optional[Dict[str, Any]] = None,
on_failure: Optional["HookSpecification"] = None,
on_success: Optional["HookSpecification"] = None,
model: Optional["Model"] = None,
substitutions: Optional[Dict[str, str]] = None,
) -> None:
"""Initializes a pipeline.
Args:
name: The name of the pipeline.
entrypoint: The entrypoint function of the pipeline.
enable_cache: If caching should be enabled for this pipeline.
enable_artifact_metadata: If artifact metadata should be enabled for
this pipeline.
enable_artifact_visualization: If artifact visualization should be
enabled for this pipeline.
enable_step_logs: If step logs should be enabled for this pipeline.
settings: Settings for this pipeline.
tags: Tags to apply to runs of this pipeline.
extra: Extra configurations for this pipeline.
on_failure: Callback function in event of failure of the step. Can
be a function with a single argument of type `BaseException`, or
a source path to such a function (e.g. `module.my_function`).
on_success: Callback function in event of success of the step. Can
be a function with no arguments, or a source path to such a
function (e.g. `module.my_function`).
model: configuration of the model in the Model Control Plane.
substitutions: Extra placeholders to use in the name templates.
"""
self._invocations: Dict[str, StepInvocation] = {}
self._run_args: Dict[str, Any] = {}
self._configuration = PipelineConfiguration(
name=name,
)
self._from_config_file: Dict[str, Any] = {}
with self.__suppress_configure_warnings__():
self.configure(
enable_cache=enable_cache,
enable_artifact_metadata=enable_artifact_metadata,
enable_artifact_visualization=enable_artifact_visualization,
enable_step_logs=enable_step_logs,
settings=settings,
tags=tags,
extra=extra,
on_failure=on_failure,
on_success=on_success,
model=model,
substitutions=substitutions,
)
self.entrypoint = entrypoint
self._parameters: Dict[str, Any] = {}
self.__suppress_warnings_flag__ = False
__suppress_configure_warnings__(self)
special
Context manager to suppress warnings in Pipeline.configure(...)
.
Used to suppress warnings when called from inner code and not user-facing code.
Yields:
Type | Description |
---|---|
Iterator[Any] |
Nothing. |
Source code in zenml/pipelines/pipeline_definition.py
@contextmanager
def __suppress_configure_warnings__(self) -> Iterator[Any]:
"""Context manager to suppress warnings in `Pipeline.configure(...)`.
Used to suppress warnings when called from inner code and not user-facing code.
Yields:
Nothing.
"""
self.__suppress_warnings_flag__ = True
yield
self.__suppress_warnings_flag__ = False
add_step_invocation(self, step, input_artifacts, external_artifacts, model_artifacts_or_metadata, client_lazy_loaders, parameters, default_parameters, upstream_steps, custom_id=None, allow_id_suffix=True)
Adds a step invocation to the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step |
BaseStep |
The step for which to add an invocation. |
required |
input_artifacts |
Dict[str, zenml.steps.entrypoint_function_utils.StepArtifact] |
The input artifacts for the invocation. |
required |
external_artifacts |
Dict[str, Union[ExternalArtifact, ArtifactVersionResponse]] |
The external artifacts for the invocation. |
required |
model_artifacts_or_metadata |
Dict[str, ModelVersionDataLazyLoader] |
The model artifacts or metadata for the invocation. |
required |
client_lazy_loaders |
Dict[str, ClientLazyLoader] |
The client lazy loaders for the invocation. |
required |
parameters |
Dict[str, Any] |
The parameters for the invocation. |
required |
default_parameters |
Dict[str, Any] |
The default parameters for the invocation. |
required |
upstream_steps |
Set[str] |
The upstream steps for the invocation. |
required |
custom_id |
Optional[str] |
Custom ID to use for the invocation. |
None |
allow_id_suffix |
bool |
Whether a suffix can be appended to the invocation ID. |
True |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the method is called on an inactive pipeline. |
RuntimeError |
If the invocation was called with an artifact from a different pipeline. |
Returns:
Type | Description |
---|---|
str |
The step invocation ID. |
Source code in zenml/pipelines/pipeline_definition.py
def add_step_invocation(
self,
step: "BaseStep",
input_artifacts: Dict[str, StepArtifact],
external_artifacts: Dict[
str, Union["ExternalArtifact", "ArtifactVersionResponse"]
],
model_artifacts_or_metadata: Dict[str, "ModelVersionDataLazyLoader"],
client_lazy_loaders: Dict[str, "ClientLazyLoader"],
parameters: Dict[str, Any],
default_parameters: Dict[str, Any],
upstream_steps: Set[str],
custom_id: Optional[str] = None,
allow_id_suffix: bool = True,
) -> str:
"""Adds a step invocation to the pipeline.
Args:
step: The step for which to add an invocation.
input_artifacts: The input artifacts for the invocation.
external_artifacts: The external artifacts for the invocation.
model_artifacts_or_metadata: The model artifacts or metadata for
the invocation.
client_lazy_loaders: The client lazy loaders for the invocation.
parameters: The parameters for the invocation.
default_parameters: The default parameters for the invocation.
upstream_steps: The upstream steps for the invocation.
custom_id: Custom ID to use for the invocation.
allow_id_suffix: Whether a suffix can be appended to the invocation
ID.
Raises:
RuntimeError: If the method is called on an inactive pipeline.
RuntimeError: If the invocation was called with an artifact from
a different pipeline.
Returns:
The step invocation ID.
"""
if Pipeline.ACTIVE_PIPELINE != self:
raise RuntimeError(
"A step invocation can only be added to an active pipeline."
)
for artifact in input_artifacts.values():
if artifact.pipeline is not self:
raise RuntimeError(
"Got invalid input artifact for invocation of step "
f"{step.name}: The input artifact was produced by a step "
f"inside a different pipeline {artifact.pipeline.name}."
)
invocation_id = self._compute_invocation_id(
step=step, custom_id=custom_id, allow_suffix=allow_id_suffix
)
invocation = StepInvocation(
id=invocation_id,
step=step,
input_artifacts=input_artifacts,
external_artifacts=external_artifacts,
model_artifacts_or_metadata=model_artifacts_or_metadata,
client_lazy_loaders=client_lazy_loaders,
parameters=parameters,
default_parameters=default_parameters,
upstream_steps=upstream_steps,
pipeline=self,
)
self._invocations[invocation_id] = invocation
return invocation_id
build(self, settings=None, step_configurations=None, config_path=None)
Builds Docker images for the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
settings |
Optional[Mapping[str, SettingsOrDict]] |
Settings for the pipeline. |
None |
step_configurations |
Optional[Mapping[str, StepConfigurationUpdateOrDict]] |
Configurations for steps of the pipeline. |
None |
config_path |
Optional[str] |
Path to a yaml configuration file. This file will
be parsed as a
|
None |
Returns:
Type | Description |
---|---|
Optional[PipelineBuildResponse] |
The build output. |
Source code in zenml/pipelines/pipeline_definition.py
def build(
self,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
step_configurations: Optional[
Mapping[str, "StepConfigurationUpdateOrDict"]
] = None,
config_path: Optional[str] = None,
) -> Optional["PipelineBuildResponse"]:
"""Builds Docker images for the pipeline.
Args:
settings: Settings for the pipeline.
step_configurations: Configurations for steps of the pipeline.
config_path: Path to a yaml configuration file. This file will
be parsed as a
`zenml.config.pipeline_configurations.PipelineRunConfiguration`
object. Options provided in this file will be overwritten by
options provided in code using the other arguments of this
method.
Returns:
The build output.
"""
with track_handler(event=AnalyticsEvent.BUILD_PIPELINE):
self._prepare_if_possible()
compile_args = self._run_args.copy()
compile_args.pop("unlisted", None)
compile_args.pop("prevent_build_reuse", None)
if config_path:
compile_args["config_path"] = config_path
if step_configurations:
compile_args["step_configurations"] = step_configurations
if settings:
compile_args["settings"] = settings
deployment, _, _ = self._compile(**compile_args)
pipeline_id = self._register().id
local_repo = code_repository_utils.find_active_code_repository()
code_repository = build_utils.verify_local_repository_context(
deployment=deployment, local_repo_context=local_repo
)
return build_utils.create_pipeline_build(
deployment=deployment,
pipeline_id=pipeline_id,
code_repository=code_repository,
)
configure(self, enable_cache=None, enable_artifact_metadata=None, enable_artifact_visualization=None, enable_step_logs=None, settings=None, tags=None, extra=None, on_failure=None, on_success=None, model=None, parameters=None, merge=True, substitutions=None)
Configures the pipeline.
Configuration merging example:
* merge==True
:
pipeline.configure(extra={"key1": 1})
pipeline.configure(extra={"key2": 2}, merge=True)
pipeline.configuration.extra # {"key1": 1, "key2": 2}
* merge==False
:
pipeline.configure(extra={"key1": 1})
pipeline.configure(extra={"key2": 2}, merge=False)
pipeline.configuration.extra # {"key2": 2}
Parameters:
Name | Type | Description | Default |
---|---|---|---|
enable_cache |
Optional[bool] |
If caching should be enabled for this pipeline. |
None |
enable_artifact_metadata |
Optional[bool] |
If artifact metadata should be enabled for this pipeline. |
None |
enable_artifact_visualization |
Optional[bool] |
If artifact visualization should be enabled for this pipeline. |
None |
enable_step_logs |
Optional[bool] |
If step logs should be enabled for this pipeline. |
None |
settings |
Optional[Mapping[str, SettingsOrDict]] |
settings for this pipeline. |
None |
tags |
Optional[List[str]] |
Tags to apply to runs of this pipeline. |
None |
extra |
Optional[Dict[str, Any]] |
Extra configurations for this pipeline. |
None |
on_failure |
Optional[HookSpecification] |
Callback function in event of failure of the step. Can
be a function with a single argument of type |
None |
on_success |
Optional[HookSpecification] |
Callback function in event of success of the step. Can
be a function with no arguments, or a source path to such a
function (e.g. |
None |
merge |
bool |
If |
True |
model |
Optional[Model] |
configuration of the model version in the Model Control Plane. |
None |
parameters |
Optional[Dict[str, Any]] |
input parameters for the pipeline. |
None |
substitutions |
Optional[Dict[str, str]] |
Extra placeholders to use in the name templates. |
None |
Returns:
Type | Description |
---|---|
Self |
The pipeline instance that this method was called on. |
Source code in zenml/pipelines/pipeline_definition.py
def configure(
self,
enable_cache: Optional[bool] = None,
enable_artifact_metadata: Optional[bool] = None,
enable_artifact_visualization: Optional[bool] = None,
enable_step_logs: Optional[bool] = None,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
tags: Optional[List[str]] = None,
extra: Optional[Dict[str, Any]] = None,
on_failure: Optional["HookSpecification"] = None,
on_success: Optional["HookSpecification"] = None,
model: Optional["Model"] = None,
parameters: Optional[Dict[str, Any]] = None,
merge: bool = True,
substitutions: Optional[Dict[str, str]] = None,
) -> Self:
"""Configures the pipeline.
Configuration merging example:
* `merge==True`:
pipeline.configure(extra={"key1": 1})
pipeline.configure(extra={"key2": 2}, merge=True)
pipeline.configuration.extra # {"key1": 1, "key2": 2}
* `merge==False`:
pipeline.configure(extra={"key1": 1})
pipeline.configure(extra={"key2": 2}, merge=False)
pipeline.configuration.extra # {"key2": 2}
Args:
enable_cache: If caching should be enabled for this pipeline.
enable_artifact_metadata: If artifact metadata should be enabled for
this pipeline.
enable_artifact_visualization: If artifact visualization should be
enabled for this pipeline.
enable_step_logs: If step logs should be enabled for this pipeline.
settings: settings for this pipeline.
tags: Tags to apply to runs of this pipeline.
extra: Extra configurations for this pipeline.
on_failure: Callback function in event of failure of the step. Can
be a function with a single argument of type `BaseException`, or
a source path to such a function (e.g. `module.my_function`).
on_success: Callback function in event of success of the step. Can
be a function with no arguments, or a source path to such a
function (e.g. `module.my_function`).
merge: If `True`, will merge the given dictionary configurations
like `extra` and `settings` with existing
configurations. If `False` the given configurations will
overwrite all existing ones. See the general description of this
method for an example.
model: configuration of the model version in the Model Control Plane.
parameters: input parameters for the pipeline.
substitutions: Extra placeholders to use in the name templates.
Returns:
The pipeline instance that this method was called on.
"""
failure_hook_source = None
if on_failure:
# string of on_failure hook function to be used for this pipeline
failure_hook_source = resolve_and_validate_hook(on_failure)
success_hook_source = None
if on_success:
# string of on_success hook function to be used for this pipeline
success_hook_source = resolve_and_validate_hook(on_success)
if merge and tags and self._configuration.tags:
# Merge tags explicitly here as the recursive update later only
# merges dicts
tags = self._configuration.tags + tags
values = dict_utils.remove_none_values(
{
"enable_cache": enable_cache,
"enable_artifact_metadata": enable_artifact_metadata,
"enable_artifact_visualization": enable_artifact_visualization,
"enable_step_logs": enable_step_logs,
"settings": settings,
"tags": tags,
"extra": extra,
"failure_hook_source": failure_hook_source,
"success_hook_source": success_hook_source,
"model": model,
"parameters": parameters,
"substitutions": substitutions,
}
)
if not self.__suppress_warnings_flag__:
to_be_reapplied = []
for param_, value_ in values.items():
if (
param_ in PipelineRunConfiguration.model_fields
and param_ in self._from_config_file
and value_ != self._from_config_file[param_]
):
to_be_reapplied.append(
(param_, self._from_config_file[param_], value_)
)
if to_be_reapplied:
msg = ""
reapply_during_run_warning = (
"The value of parameter '{name}' has changed from "
"'{file_value}' to '{new_value}' set in your configuration "
"file.\n"
)
for name, file_value, new_value in to_be_reapplied:
msg += reapply_during_run_warning.format(
name=name, file_value=file_value, new_value=new_value
)
msg += (
"Configuration file value will be used during pipeline "
"run, so you change will not be efficient. Consider "
"updating your configuration file instead."
)
logger.warning(msg)
config = PipelineConfigurationUpdate(**values)
self._apply_configuration(config, merge=merge)
return self
copy(self)
Copies the pipeline.
Returns:
Type | Description |
---|---|
Pipeline |
The pipeline copy. |
Source code in zenml/pipelines/pipeline_definition.py
def copy(self) -> "Pipeline":
"""Copies the pipeline.
Returns:
The pipeline copy.
"""
return copy.deepcopy(self)
create_run_template(self, name, **kwargs)
Create a run template for the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the run template. |
required |
**kwargs |
Any |
Keyword arguments for the client method to create a run template. |
{} |
Returns:
Type | Description |
---|---|
RunTemplateResponse |
The created run template. |
Source code in zenml/pipelines/pipeline_definition.py
def create_run_template(
self, name: str, **kwargs: Any
) -> RunTemplateResponse:
"""Create a run template for the pipeline.
Args:
name: The name of the run template.
**kwargs: Keyword arguments for the client method to create a run
template.
Returns:
The created run template.
"""
self._prepare_if_possible()
deployment = self._create_deployment(
**self._run_args, skip_schedule_registration=True
)
return Client().create_run_template(
name=name, deployment_id=deployment.id, **kwargs
)
log_pipeline_deployment_metadata(deployment_model)
staticmethod
Displays logs based on the deployment model upon running a pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment_model |
PipelineDeploymentResponse |
The model for the pipeline deployment |
required |
Source code in zenml/pipelines/pipeline_definition.py
@staticmethod
def log_pipeline_deployment_metadata(
deployment_model: PipelineDeploymentResponse,
) -> None:
"""Displays logs based on the deployment model upon running a pipeline.
Args:
deployment_model: The model for the pipeline deployment
"""
try:
# Log about the caching status
if deployment_model.pipeline_configuration.enable_cache is False:
logger.info(
f"Caching is disabled by default for "
f"`{deployment_model.pipeline_configuration.name}`."
)
# Log about the used builds
if deployment_model.build:
logger.info("Using a build:")
logger.info(
" Image(s): "
f"{', '.join([i.image for i in deployment_model.build.images.values()])}"
)
# Log about version mismatches between local and build
from zenml import __version__
if deployment_model.build.zenml_version != __version__:
logger.info(
f"ZenML version (different than the local version): "
f"{deployment_model.build.zenml_version}"
)
import platform
if (
deployment_model.build.python_version
!= platform.python_version()
):
logger.info(
f"Python version (different than the local version): "
f"{deployment_model.build.python_version}"
)
# Log about the user, stack and components
if deployment_model.user is not None:
logger.info(f"Using user: `{deployment_model.user.name}`")
if deployment_model.stack is not None:
logger.info(f"Using stack: `{deployment_model.stack.name}`")
for (
component_type,
component_models,
) in deployment_model.stack.components.items():
logger.info(
f" {component_type.value}: `{component_models[0].name}`"
)
except Exception as e:
logger.debug(f"Logging pipeline deployment metadata failed: {e}")
prepare(self, *args, **kwargs)
Prepares the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
Pipeline entrypoint input arguments. |
() |
**kwargs |
Any |
Pipeline entrypoint input keyword arguments. |
{} |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the pipeline has parameters configured differently in configuration file and code. |
Source code in zenml/pipelines/pipeline_definition.py
def prepare(self, *args: Any, **kwargs: Any) -> None:
"""Prepares the pipeline.
Args:
*args: Pipeline entrypoint input arguments.
**kwargs: Pipeline entrypoint input keyword arguments.
Raises:
RuntimeError: If the pipeline has parameters configured differently in
configuration file and code.
"""
# Clear existing parameters and invocations
self._parameters = {}
self._invocations = {}
conflicting_parameters = {}
parameters_ = (self.configuration.parameters or {}).copy()
if from_file_ := self._from_config_file.get("parameters", None):
parameters_ = dict_utils.recursive_update(parameters_, from_file_)
if parameters_:
for k, v_runtime in kwargs.items():
if k in parameters_:
v_config = parameters_[k]
if v_config != v_runtime:
conflicting_parameters[k] = (v_config, v_runtime)
if conflicting_parameters:
is_plural = "s" if len(conflicting_parameters) > 1 else ""
msg = f"Configured parameter{is_plural} for the pipeline `{self.name}` conflict{'' if not is_plural else 's'} with parameter{is_plural} passed in runtime:\n"
for key, values in conflicting_parameters.items():
msg += f"`{key}`: config=`{values[0]}` | runtime=`{values[1]}`\n"
msg += """This happens, if you define values for pipeline parameters in configuration file and pass same parameters from the code. Example:
```
# config.yaml
parameters:
param_name: value1
# pipeline.py
@pipeline
def pipeline_(param_name: str):
step_name()
if __name__=="__main__":
pipeline_.with_options(config_file="config.yaml")(param_name="value2")
```
To avoid this consider setting pipeline parameters only in one place (config or code).
"""
raise RuntimeError(msg)
for k, v_config in parameters_.items():
if k not in kwargs:
kwargs[k] = v_config
with self:
# Enter the context manager, so we become the active pipeline. This
# means that all steps that get called while the entrypoint function
# is executed will be added as invocation to this pipeline instance.
self._call_entrypoint(*args, **kwargs)
register(self)
Register the pipeline in the server.
Returns:
Type | Description |
---|---|
PipelineResponse |
The registered pipeline model. |
Source code in zenml/pipelines/pipeline_definition.py
def register(self) -> "PipelineResponse":
"""Register the pipeline in the server.
Returns:
The registered pipeline model.
"""
# Activating the built-in integrations to load all materializers
from zenml.integrations.registry import integration_registry
self._prepare_if_possible()
integration_registry.activate_integrations()
if self.configuration.model_dump(
exclude_defaults=True, exclude={"name"}
):
logger.warning(
f"The pipeline `{self.name}` that you're registering has "
"custom configurations applied to it. These will not be "
"registered with the pipeline and won't be set when you build "
"images or run the pipeline from the CLI. To provide these "
"configurations, use the `--config` option of the `zenml "
"pipeline build/run` commands."
)
return self._register()
resolve(self)
Resolves the pipeline.
Returns:
Type | Description |
---|---|
Source |
The pipeline source. |
Source code in zenml/pipelines/pipeline_definition.py
def resolve(self) -> "Source":
"""Resolves the pipeline.
Returns:
The pipeline source.
"""
return source_utils.resolve(self.entrypoint, skip_validation=True)
with_options(self, run_name=None, schedule=None, build=None, step_configurations=None, steps=None, config_path=None, unlisted=False, prevent_build_reuse=False, **kwargs)
Copies the pipeline and applies the given configurations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_name |
Optional[str] |
Name of the pipeline run. |
None |
schedule |
Optional[zenml.config.schedule.Schedule] |
Optional schedule to use for the run. |
None |
build |
Union[str, UUID, PipelineBuildBase] |
Optional build to use for the run. |
None |
step_configurations |
Optional[Mapping[str, StepConfigurationUpdateOrDict]] |
Configurations for steps of the pipeline. |
None |
steps |
Optional[Mapping[str, StepConfigurationUpdateOrDict]] |
Configurations for steps of the pipeline. This is equivalent
to |
None |
config_path |
Optional[str] |
Path to a yaml configuration file. This file will
be parsed as a
|
None |
unlisted |
bool |
Whether the pipeline run should be unlisted (not assigned to any pipeline). |
False |
prevent_build_reuse |
bool |
DEPRECATED: Use
|
False |
**kwargs |
Any |
Pipeline configuration options. These will be passed
to the |
{} |
Returns:
Type | Description |
---|---|
Pipeline |
The copied pipeline instance. |
Source code in zenml/pipelines/pipeline_definition.py
def with_options(
self,
run_name: Optional[str] = None,
schedule: Optional[Schedule] = None,
build: Union[str, "UUID", "PipelineBuildBase", None] = None,
step_configurations: Optional[
Mapping[str, "StepConfigurationUpdateOrDict"]
] = None,
steps: Optional[Mapping[str, "StepConfigurationUpdateOrDict"]] = None,
config_path: Optional[str] = None,
unlisted: bool = False,
prevent_build_reuse: bool = False,
**kwargs: Any,
) -> "Pipeline":
"""Copies the pipeline and applies the given configurations.
Args:
run_name: Name of the pipeline run.
schedule: Optional schedule to use for the run.
build: Optional build to use for the run.
step_configurations: Configurations for steps of the pipeline.
steps: Configurations for steps of the pipeline. This is equivalent
to `step_configurations`, and will be ignored if
`step_configurations` is set as well.
config_path: Path to a yaml configuration file. This file will
be parsed as a
`zenml.config.pipeline_configurations.PipelineRunConfiguration`
object. Options provided in this file will be overwritten by
options provided in code using the other arguments of this
method.
unlisted: Whether the pipeline run should be unlisted (not assigned
to any pipeline).
prevent_build_reuse: DEPRECATED: Use
`DockerSettings.prevent_build_reuse` instead.
**kwargs: Pipeline configuration options. These will be passed
to the `pipeline.configure(...)` method.
Returns:
The copied pipeline instance.
"""
if steps and step_configurations:
logger.warning(
"Step configurations were passed using both the "
"`step_configurations` and `steps` keywords, ignoring the "
"values passed using the `steps` keyword."
)
pipeline_copy = self.copy()
pipeline_copy._reconfigure_from_file_with_overrides(
config_path=config_path, **kwargs
)
run_args = dict_utils.remove_none_values(
{
"run_name": run_name,
"schedule": schedule,
"build": build,
"step_configurations": step_configurations or steps,
"config_path": config_path,
"unlisted": unlisted,
"prevent_build_reuse": prevent_build_reuse,
}
)
pipeline_copy._run_args.update(run_args)
return pipeline_copy
write_run_configuration_template(self, path, stack=None)
Writes a run configuration yaml template.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path where the template will be written. |
required |
stack |
Optional[Stack] |
The stack for which the template should be generated. If not given, the active stack will be used. |
None |
Source code in zenml/pipelines/pipeline_definition.py
def write_run_configuration_template(
self, path: str, stack: Optional["Stack"] = None
) -> None:
"""Writes a run configuration yaml template.
Args:
path: The path where the template will be written.
stack: The stack for which the template should be generated. If
not given, the active stack will be used.
"""
from zenml.config.base_settings import ConfigurationLevel
from zenml.config.step_configurations import (
PartialArtifactConfiguration,
)
self._prepare_if_possible()
stack = stack or Client().active_stack
setting_classes = stack.setting_classes
setting_classes.update(settings_utils.get_general_settings())
pipeline_settings = {}
step_settings = {}
for key, setting_class in setting_classes.items():
fields = pydantic_utils.TemplateGenerator(setting_class).run()
if ConfigurationLevel.PIPELINE in setting_class.LEVEL:
pipeline_settings[key] = fields
if ConfigurationLevel.STEP in setting_class.LEVEL:
step_settings[key] = fields
steps = {}
for step_name, invocation in self.invocations.items():
step = invocation.step
outputs = {
name: PartialArtifactConfiguration()
for name in step.entrypoint_definition.outputs
}
step_template = StepConfigurationUpdate(
parameters={},
settings=step_settings,
outputs=outputs,
)
steps[step_name] = step_template
run_config = PipelineRunConfiguration(
settings=pipeline_settings, steps=steps
)
template = pydantic_utils.TemplateGenerator(run_config).run()
yaml_string = yaml.dump(template)
yaml_string = yaml_utils.comment_out_yaml(yaml_string)
with open(path, "w") as f:
f.write(yaml_string)
run_utils
Utility functions for running pipelines.
create_placeholder_run(deployment)
Create a placeholder run for the deployment.
If the deployment contains a schedule, no placeholder run will be created.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentResponse |
The deployment for which to create the placeholder run. |
required |
Returns:
Type | Description |
---|---|
Optional[PipelineRunResponse] |
The placeholder run or |
Source code in zenml/pipelines/run_utils.py
def create_placeholder_run(
deployment: "PipelineDeploymentResponse",
) -> Optional["PipelineRunResponse"]:
"""Create a placeholder run for the deployment.
If the deployment contains a schedule, no placeholder run will be
created.
Args:
deployment: The deployment for which to create the placeholder run.
Returns:
The placeholder run or `None` if no run was created.
"""
assert deployment.user
if deployment.schedule:
return None
start_time = datetime.utcnow()
run_request = PipelineRunRequest(
name=string_utils.format_name_template(
name_template=deployment.run_name_template,
substitutions=deployment.pipeline_configuration._get_full_substitutions(
start_time
),
),
# We set the start time on the placeholder run already to
# make it consistent with the {time} placeholder in the
# run name. This means the placeholder run will usually
# have longer durations than scheduled runs, as for them
# the start_time is only set once the first step starts
# running.
start_time=start_time,
orchestrator_run_id=None,
user=deployment.user.id,
workspace=deployment.workspace.id,
deployment=deployment.id,
pipeline=deployment.pipeline.id if deployment.pipeline else None,
status=ExecutionStatus.INITIALIZING,
tags=deployment.pipeline_configuration.tags,
)
return Client().zen_store.create_run(run_request)
deploy_pipeline(deployment, stack, placeholder_run=None)
Run a deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentResponse |
The deployment to run. |
required |
stack |
Stack |
The stack on which to run the deployment. |
required |
placeholder_run |
Optional[PipelineRunResponse] |
An optional placeholder run for the deployment. |
None |
Exceptions:
Type | Description |
---|---|
Exception |
Any exception that happened while deploying or running (in case it happens synchronously) the pipeline. |
Source code in zenml/pipelines/run_utils.py
def deploy_pipeline(
deployment: "PipelineDeploymentResponse",
stack: "Stack",
placeholder_run: Optional["PipelineRunResponse"] = None,
) -> None:
"""Run a deployment.
Args:
deployment: The deployment to run.
stack: The stack on which to run the deployment.
placeholder_run: An optional placeholder run for the deployment.
Raises:
Exception: Any exception that happened while deploying or running
(in case it happens synchronously) the pipeline.
"""
# Prevent execution of nested pipelines which might lead to
# unexpected behavior
previous_value = constants.SHOULD_PREVENT_PIPELINE_EXECUTION
constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True
try:
stack.prepare_pipeline_deployment(deployment=deployment)
stack.deploy_pipeline(
deployment=deployment,
placeholder_run=placeholder_run,
)
except Exception as e:
if (
placeholder_run
and Client()
.get_pipeline_run(placeholder_run.id, hydrate=False)
.status
== ExecutionStatus.INITIALIZING
):
# The run failed during the initialization phase -> We change it's
# status to `Failed`
publish_failed_pipeline_run(placeholder_run.id)
raise e
finally:
constants.SHOULD_PREVENT_PIPELINE_EXECUTION = previous_value
get_all_sources_from_value(value)
Get all source objects from a value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Any |
The value from which to get all the source objects. |
required |
Returns:
Type | Description |
---|---|
List[zenml.config.source.Source] |
List of source objects for the given value. |
Source code in zenml/pipelines/run_utils.py
def get_all_sources_from_value(value: Any) -> List[Source]:
"""Get all source objects from a value.
Args:
value: The value from which to get all the source objects.
Returns:
List of source objects for the given value.
"""
sources = []
if isinstance(value, Source):
sources.append(value)
elif isinstance(value, BaseModel):
for v in value.__dict__.values():
sources.extend(get_all_sources_from_value(v))
elif isinstance(value, Dict):
for v in value.values():
sources.extend(get_all_sources_from_value(v))
elif isinstance(value, (List, Set, tuple)):
for v in value:
sources.extend(get_all_sources_from_value(v))
return sources
get_default_run_name(pipeline_name)
Gets the default name for a pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name |
str |
Name of the pipeline which will be run. |
required |
Returns:
Type | Description |
---|---|
str |
Run name. |
Source code in zenml/pipelines/run_utils.py
def get_default_run_name(pipeline_name: str) -> str:
"""Gets the default name for a pipeline run.
Args:
pipeline_name: Name of the pipeline which will be run.
Returns:
Run name.
"""
return f"{pipeline_name}-{{date}}-{{time}}"
get_placeholder_run(deployment_id)
Get the placeholder run for a deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment_id |
UUID |
ID of the deployment for which to get the placeholder run. |
required |
Returns:
Type | Description |
---|---|
Optional[PipelineRunResponse] |
The placeholder run or |
Source code in zenml/pipelines/run_utils.py
def get_placeholder_run(
deployment_id: UUID,
) -> Optional["PipelineRunResponse"]:
"""Get the placeholder run for a deployment.
Args:
deployment_id: ID of the deployment for which to get the placeholder
run.
Returns:
The placeholder run or `None` if there exists no placeholder run for the
deployment.
"""
runs = Client().list_pipeline_runs(
sort_by="asc:created",
size=1,
deployment_id=deployment_id,
status=ExecutionStatus.INITIALIZING,
)
if len(runs.items) == 0:
return None
run = runs.items[0]
if run.orchestrator_run_id is None:
return run
return None
upload_notebook_cell_code_if_necessary(deployment, stack)
Upload notebook cell code if necessary.
This function checks if any of the steps of the pipeline that will be executed in a different process are defined in a notebook. If that is the case, it will extract that notebook cell code into python files and upload an archive of all the necessary files to the artifact store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentBase |
The deployment. |
required |
stack |
Stack |
The stack on which the deployment will happen. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the code for one of the steps that will run out of process cannot be extracted into a python file. |
Source code in zenml/pipelines/run_utils.py
def upload_notebook_cell_code_if_necessary(
deployment: "PipelineDeploymentBase", stack: "Stack"
) -> None:
"""Upload notebook cell code if necessary.
This function checks if any of the steps of the pipeline that will be
executed in a different process are defined in a notebook. If that is the
case, it will extract that notebook cell code into python files and upload
an archive of all the necessary files to the artifact store.
Args:
deployment: The deployment.
stack: The stack on which the deployment will happen.
Raises:
RuntimeError: If the code for one of the steps that will run out of
process cannot be extracted into a python file.
"""
should_upload = False
resolved_notebook_sources = source_utils.get_resolved_notebook_sources()
for step in deployment.step_configurations.values():
source = step.spec.source
if source.type == SourceType.NOTEBOOK:
if (
stack.orchestrator.flavor != "local"
or step.config.step_operator
):
should_upload = True
cell_code = resolved_notebook_sources.get(
source.import_path, None
)
# Code does not run in-process, which means we need to
# extract the step code into a python file
if not cell_code:
raise RuntimeError(
f"Unable to run step {step.config.name}. This step is "
"defined in a notebook and you're trying to run it "
"in a remote environment, but ZenML was not able to "
"detect the step code in the notebook. To fix "
"this error, define your step in a python file instead "
"of a notebook."
)
if should_upload:
logger.info("Uploading notebook code...")
for _, cell_code in resolved_notebook_sources.items():
notebook_utils.warn_about_notebook_cell_magic_commands(
cell_code=cell_code
)
module_name = notebook_utils.compute_cell_replacement_module_name(
cell_code=cell_code
)
file_name = f"{module_name}.py"
code_utils.upload_notebook_code(
artifact_store=stack.artifact_store,
cell_code=cell_code,
file_name=file_name,
)
all_deployment_sources = get_all_sources_from_value(deployment)
for source in all_deployment_sources:
if source.type == SourceType.NOTEBOOK:
setattr(source, "artifact_store_id", stack.artifact_store.id)
logger.info("Upload finished.")
validate_run_config_is_runnable_from_server(run_configuration)
Validates that the run configuration can be used to run from the server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_configuration |
PipelineRunConfiguration |
The run configuration to validate. |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
If there are values in the run configuration that are not allowed when running a pipeline from the server. |
Source code in zenml/pipelines/run_utils.py
def validate_run_config_is_runnable_from_server(
run_configuration: "PipelineRunConfiguration",
) -> None:
"""Validates that the run configuration can be used to run from the server.
Args:
run_configuration: The run configuration to validate.
Raises:
ValueError: If there are values in the run configuration that are not
allowed when running a pipeline from the server.
"""
if run_configuration.parameters:
raise ValueError(
"Can't set parameters when running pipeline via Rest API."
)
if run_configuration.build:
raise ValueError("Can't set build when running pipeline via Rest API.")
if run_configuration.schedule:
raise ValueError(
"Can't set schedule when running pipeline via Rest API."
)
if run_configuration.settings.get("docker"):
raise ValueError(
"Can't set DockerSettings when running pipeline via Rest API."
)
for step_update in run_configuration.steps.values():
if step_update.settings.get("docker"):
raise ValueError(
"Can't set DockerSettings when running pipeline via Rest API."
)
validate_stack_is_runnable_from_server(zen_store, stack)
Validate if a stack model is runnable from the server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
zen_store |
BaseZenStore |
ZenStore to use for listing flavors. |
required |
stack |
StackResponse |
The stack to validate. |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
If the stack has components of a custom flavor or local components. |
Source code in zenml/pipelines/run_utils.py
def validate_stack_is_runnable_from_server(
zen_store: BaseZenStore, stack: StackResponse
) -> None:
"""Validate if a stack model is runnable from the server.
Args:
zen_store: ZenStore to use for listing flavors.
stack: The stack to validate.
Raises:
ValueError: If the stack has components of a custom flavor or local
components.
"""
for component_list in stack.components.values():
assert len(component_list) == 1
component = component_list[0]
flavors = zen_store.list_flavors(
FlavorFilter(name=component.flavor_name, type=component.type)
)
assert len(flavors) == 1
flavor_model = flavors[0]
if flavor_model.workspace is not None:
raise ValueError("No custom stack component flavors allowed.")
flavor = Flavor.from_model(flavor_model)
component_config = flavor.config_class(**component.configuration)
if component_config.is_local:
raise ValueError("No local stack components allowed.")
wait_for_pipeline_run_to_finish(run_id)
Waits until a pipeline run is finished.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id |
UUID |
ID of the run for which to wait. |
required |
Returns:
Type | Description |
---|---|
PipelineRunResponse |
Model of the finished run. |
Source code in zenml/pipelines/run_utils.py
def wait_for_pipeline_run_to_finish(run_id: UUID) -> "PipelineRunResponse":
"""Waits until a pipeline run is finished.
Args:
run_id: ID of the run for which to wait.
Returns:
Model of the finished run.
"""
sleep_interval = 1
max_sleep_interval = 64
while True:
run = Client().get_pipeline_run(run_id)
if run.status.is_finished:
return run
logger.info(
"Waiting for pipeline run with ID %s to finish (current status: %s)",
run_id,
run.status,
)
time.sleep(sleep_interval)
if sleep_interval < max_sleep_interval:
sleep_interval *= 2