New
zenml.new
special
pipelines
special
build_utils
Pipeline build utilities.
compute_build_checksum(items, stack, code_repository=None)
Compute an overall checksum for a pipeline build.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
items |
List[BuildConfiguration] |
Items of the build. |
required |
stack |
Stack |
The stack associated with the build. Will be used to gather its requirements. |
required |
code_repository |
Optional[BaseCodeRepository] |
The code repository that will be used to download files inside the build. Will be used for its dependency specification. |
None |
Returns:
Type | Description |
---|---|
str |
The build checksum. |
Source code in zenml/new/pipelines/build_utils.py
def compute_build_checksum(
items: List["BuildConfiguration"],
stack: "Stack",
code_repository: Optional["BaseCodeRepository"] = None,
) -> str:
"""Compute an overall checksum for a pipeline build.
Args:
items: Items of the build.
stack: The stack associated with the build. Will be used to gather
its requirements.
code_repository: The code repository that will be used to download
files inside the build. Will be used for its dependency
specification.
Returns:
The build checksum.
"""
hash_ = hashlib.md5() # nosec
for item in items:
key = PipelineBuildBase.get_image_key(
component_key=item.key, step=item.step_name
)
settings_checksum = item.compute_settings_checksum(
stack=stack,
code_repository=code_repository,
)
hash_.update(key.encode())
hash_.update(settings_checksum.encode())
return hash_.hexdigest()
create_pipeline_build(deployment, pipeline_id=None, code_repository=None)
Builds images and registers the output in the server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentBase |
The pipeline deployment. |
required |
pipeline_id |
Optional[uuid.UUID] |
The ID of the pipeline. |
None |
code_repository |
Optional[BaseCodeRepository] |
If provided, this code repository will be used to download inside the build images. |
None |
Returns:
Type | Description |
---|---|
Optional[PipelineBuildResponse] |
The build output. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If multiple builds with the same key but different settings were specified. |
Source code in zenml/new/pipelines/build_utils.py
def create_pipeline_build(
deployment: "PipelineDeploymentBase",
pipeline_id: Optional[UUID] = None,
code_repository: Optional["BaseCodeRepository"] = None,
) -> Optional["PipelineBuildResponse"]:
"""Builds images and registers the output in the server.
Args:
deployment: The pipeline deployment.
pipeline_id: The ID of the pipeline.
code_repository: If provided, this code repository will be used to
download inside the build images.
Returns:
The build output.
Raises:
RuntimeError: If multiple builds with the same key but different
settings were specified.
"""
client = Client()
stack = client.active_stack
required_builds = stack.get_docker_builds(deployment=deployment)
if not required_builds:
logger.debug("No docker builds required.")
return None
logger.info(
"Building Docker image(s) for pipeline `%s`.",
deployment.pipeline_configuration.name,
)
docker_image_builder = PipelineDockerImageBuilder()
images: Dict[str, BuildItem] = {}
checksums: Dict[str, str] = {}
for build_config in required_builds:
combined_key = PipelineBuildBase.get_image_key(
component_key=build_config.key, step=build_config.step_name
)
checksum = build_config.compute_settings_checksum(
stack=stack, code_repository=code_repository
)
if combined_key in images:
previous_checksum = images[combined_key].settings_checksum
if previous_checksum != checksum:
raise RuntimeError(
f"Trying to build image for key `{combined_key}` but "
"an image for this key was already built with a "
"different configuration. This happens if multiple "
"stack components specified Docker builds for the same "
"key in the `StackComponent.get_docker_builds(...)` "
"method. If you're using custom components, make sure "
"to provide unique keys when returning your build "
"configurations to avoid this error."
)
else:
continue
if checksum in checksums:
item_key = checksums[checksum]
image_name_or_digest = images[item_key].image
contains_code = images[item_key].contains_code
dockerfile = images[item_key].dockerfile
requirements = images[item_key].requirements
else:
tag = deployment.pipeline_configuration.name
if build_config.step_name:
tag += f"-{build_config.step_name}"
tag += f"-{build_config.key}"
include_files = build_config.should_include_files(
code_repository=code_repository,
)
download_files = build_config.should_download_files(
code_repository=code_repository,
)
(
image_name_or_digest,
dockerfile,
requirements,
) = docker_image_builder.build_docker_image(
docker_settings=build_config.settings,
tag=tag,
stack=stack,
include_files=include_files,
download_files=download_files,
entrypoint=build_config.entrypoint,
extra_files=build_config.extra_files,
code_repository=code_repository,
)
contains_code = include_files
images[combined_key] = BuildItem(
image=image_name_or_digest,
dockerfile=dockerfile,
requirements=requirements,
settings_checksum=checksum,
contains_code=contains_code,
requires_code_download=download_files,
)
checksums[checksum] = combined_key
logger.info("Finished building Docker image(s).")
is_local = stack.container_registry is None
contains_code = any(item.contains_code for item in images.values())
build_checksum = compute_build_checksum(
required_builds, stack=stack, code_repository=code_repository
)
build_request = PipelineBuildRequest(
user=client.active_user.id,
workspace=client.active_workspace.id,
stack=client.active_stack_model.id,
pipeline=pipeline_id,
is_local=is_local,
contains_code=contains_code,
images=images,
zenml_version=zenml.__version__,
python_version=platform.python_version(),
checksum=build_checksum,
)
return client.zen_store.create_build(build_request)
find_existing_build(deployment, code_repository)
Find an existing build for a deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentBase |
The deployment for which to find an existing build. |
required |
code_repository |
BaseCodeRepository |
The code repository that will be used to download files in the images. |
required |
Returns:
Type | Description |
---|---|
Optional[PipelineBuildResponse] |
The existing build to reuse if found. |
Source code in zenml/new/pipelines/build_utils.py
def find_existing_build(
deployment: "PipelineDeploymentBase",
code_repository: "BaseCodeRepository",
) -> Optional["PipelineBuildResponse"]:
"""Find an existing build for a deployment.
Args:
deployment: The deployment for which to find an existing build.
code_repository: The code repository that will be used to download
files in the images.
Returns:
The existing build to reuse if found.
"""
client = Client()
stack = client.active_stack
python_version_prefix = ".".join(platform.python_version_tuple()[:2])
required_builds = stack.get_docker_builds(deployment=deployment)
if not required_builds:
return None
build_checksum = compute_build_checksum(
required_builds, stack=stack, code_repository=code_repository
)
matches = client.list_builds(
sort_by="desc:created",
size=1,
stack_id=stack.id,
# The build is local and it's not clear whether the images
# exist on the current machine or if they've been overwritten.
# TODO: Should we support this by storing the unique Docker ID for
# the image and checking if an image with that ID exists locally?
is_local=False,
# The build contains some code which might be different from the
# local code the user is expecting to run
contains_code=False,
zenml_version=zenml.__version__,
# Match all patch versions of the same Python major + minor
python_version=f"startswith:{python_version_prefix}",
checksum=build_checksum,
)
if not matches.items:
return None
return matches[0]
reuse_or_create_pipeline_build(deployment, allow_build_reuse, pipeline_id=None, build=None, code_repository=None)
Loads or creates a pipeline build.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentBase |
The pipeline deployment for which to load or create the build. |
required |
allow_build_reuse |
bool |
If True, the build is allowed to reuse an existing build. |
required |
pipeline_id |
Optional[uuid.UUID] |
Optional ID of the pipeline to reference in the build. |
None |
build |
Union[UUID, PipelineBuildBase] |
Optional existing build. If given, the build will be fetched (or registered) in the database. If not given, a new build will be created. |
None |
code_repository |
Optional[BaseCodeRepository] |
If provided, this code repository will be used to download inside the build images. |
None |
Returns:
Type | Description |
---|---|
Optional[PipelineBuildResponse] |
The build response. |
Source code in zenml/new/pipelines/build_utils.py
def reuse_or_create_pipeline_build(
deployment: "PipelineDeploymentBase",
allow_build_reuse: bool,
pipeline_id: Optional[UUID] = None,
build: Union["UUID", "PipelineBuildBase", None] = None,
code_repository: Optional["BaseCodeRepository"] = None,
) -> Optional["PipelineBuildResponse"]:
"""Loads or creates a pipeline build.
Args:
deployment: The pipeline deployment for which to load or create the
build.
allow_build_reuse: If True, the build is allowed to reuse an
existing build.
pipeline_id: Optional ID of the pipeline to reference in the build.
build: Optional existing build. If given, the build will be fetched
(or registered) in the database. If not given, a new build will
be created.
code_repository: If provided, this code repository will be used to
download inside the build images.
Returns:
The build response.
"""
if not build:
if (
allow_build_reuse
and code_repository
and not deployment.requires_included_files
):
existing_build = find_existing_build(
deployment=deployment, code_repository=code_repository
)
if existing_build:
logger.info(
"Reusing existing build `%s` for stack `%s`.",
existing_build.id,
Client().active_stack.name,
)
return existing_build
return create_pipeline_build(
deployment=deployment,
pipeline_id=pipeline_id,
code_repository=code_repository,
)
if isinstance(build, UUID):
build_model = Client().zen_store.get_build(build_id=build)
else:
build_request = PipelineBuildRequest(
user=Client().active_user.id,
workspace=Client().active_workspace.id,
stack=Client().active_stack_model.id,
pipeline=pipeline_id,
**build.dict(),
)
build_model = Client().zen_store.create_build(build=build_request)
verify_custom_build(
build=build_model,
deployment=deployment,
code_repository=code_repository,
)
return build_model
verify_custom_build(build, deployment, code_repository=None)
Verify a custom build for a pipeline deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
build |
PipelineBuildResponse |
The build to verify. |
required |
deployment |
PipelineDeploymentBase |
The deployment for which to verify the build. |
required |
code_repository |
Optional[BaseCodeRepository] |
Code repository that will be used to download files for the deployment. |
None |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the build can't be used for the deployment. |
Source code in zenml/new/pipelines/build_utils.py
def verify_custom_build(
build: "PipelineBuildResponse",
deployment: "PipelineDeploymentBase",
code_repository: Optional["BaseCodeRepository"] = None,
) -> None:
"""Verify a custom build for a pipeline deployment.
Args:
build: The build to verify.
deployment: The deployment for which to verify the build.
code_repository: Code repository that will be used to download files
for the deployment.
Raises:
RuntimeError: If the build can't be used for the deployment.
"""
stack = Client().active_stack
required_builds = stack.get_docker_builds(deployment=deployment)
if build.stack and build.stack.id != stack.id:
logger.warning(
"The stack `%s` used for the build `%s` is not the same as the "
"stack `%s` that the pipeline will run on. This could lead "
"to issues if the stacks have different build requirements.",
build.stack.name,
build.id,
stack.name,
)
if build.contains_code:
logger.warning(
"The build you specified for this run contains code and will run "
"with the step code that was included in the Docker images which "
"might differ from the local code in your client environment."
)
if build.requires_code_download and not code_repository:
raise RuntimeError(
"The build you specified does not include code but code download "
"not possible. This might be because you don't have a code "
"repository registered or the code repository contains local "
"changes."
)
if build.checksum:
build_checksum = compute_build_checksum(
required_builds, stack=stack, code_repository=code_repository
)
if build_checksum != build.checksum:
logger.warning(
"The Docker settings used for the build `%s` are "
"not the same as currently specified for your pipeline. "
"This means that the build you specified to run this "
"pipeline might be outdated and most likely contains "
"outdated requirements.",
build.id,
)
else:
# No checksum given for the entire build, we manually check that
# all the images exist and the setting match
for build_config in required_builds:
try:
image = build.get_image(
component_key=build_config.key,
step=build_config.step_name,
)
except KeyError:
raise RuntimeError(
"The build you specified is missing an image for key: "
f"{build_config.key}."
)
if build_config.compute_settings_checksum(
stack=stack, code_repository=code_repository
) != build.get_settings_checksum(
component_key=build_config.key, step=build_config.step_name
):
logger.warning(
"The Docker settings used to build the image `%s` are "
"not the same as currently specified for your pipeline. "
"This means that the build you specified to run this "
"pipeline might be outdated and most likely contains "
"outdated code or requirements.",
image,
)
if build.is_local:
logger.warning(
"You manually specified a local build to run your pipeline. "
"This might lead to errors if the images don't exist on "
"your local machine or the image tags have been "
"overwritten since the original build happened."
)
verify_local_repository_context(deployment, local_repo_context)
Verifies the local repository.
If the local repository exists and has no local changes, code download inside the images is possible.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentBase |
The pipeline deployment. |
required |
local_repo_context |
Optional[LocalRepositoryContext] |
The local repository active at the source root. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the deployment requires code download but code download is not possible. |
Returns:
Type | Description |
---|---|
Optional[zenml.code_repositories.base_code_repository.BaseCodeRepository] |
The code repository from which to download files for the runs of the deployment, or None if code download is not possible. |
Source code in zenml/new/pipelines/build_utils.py
def verify_local_repository_context(
deployment: "PipelineDeploymentBase",
local_repo_context: Optional["LocalRepositoryContext"],
) -> Optional[BaseCodeRepository]:
"""Verifies the local repository.
If the local repository exists and has no local changes, code download
inside the images is possible.
Args:
deployment: The pipeline deployment.
local_repo_context: The local repository active at the source root.
Raises:
RuntimeError: If the deployment requires code download but code download
is not possible.
Returns:
The code repository from which to download files for the runs of the
deployment, or None if code download is not possible.
"""
if deployment.requires_code_download:
if not local_repo_context:
raise RuntimeError(
"The `DockerSettings` of the pipeline or one of its "
"steps specify that code should be included in the "
"Docker image (`source_files='download'`), but there is no "
"code repository active at your current source root "
f"`{source_utils.get_source_root()}`."
)
elif local_repo_context.is_dirty:
raise RuntimeError(
"The `DockerSettings` of the pipeline or one of its "
"steps specify that code should be included in the "
"Docker image (`source_files='download'`), but the code "
"repository active at your current source root "
f"`{source_utils.get_source_root()}` has uncommitted "
"changes."
)
elif local_repo_context.has_local_changes:
raise RuntimeError(
"The `DockerSettings` of the pipeline or one of its "
"steps specify that code should be included in the "
"Docker image (`source_files='download'`), but the code "
"repository active at your current source root "
f"`{source_utils.get_source_root()}` has unpushed "
"changes."
)
code_repository = None
if local_repo_context and not local_repo_context.has_local_changes:
model = Client().get_code_repository(
local_repo_context.code_repository_id
)
code_repository = BaseCodeRepository.from_model(model)
return code_repository
deserialization_utils
Pipeline deserialization utilities.
load_pipeline(model)
Load a pipeline from a model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
PipelineResponse |
The pipeline model to load. |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
If the pipeline can't be loaded due to an old model spec (version <0.2). |
Returns:
Type | Description |
---|---|
Pipeline |
The loaded pipeline. |
Source code in zenml/new/pipelines/deserialization_utils.py
def load_pipeline(model: "PipelineResponse") -> "Pipeline":
"""Load a pipeline from a model.
Args:
model: The pipeline model to load.
Raises:
ValueError: If the pipeline can't be loaded due to an old model spec
(version <0.2).
Returns:
The loaded pipeline.
"""
model_version = version.parse(model.spec.version)
if model_version < version.parse("0.2"):
raise ValueError(
"Loading a pipeline is only possible for pipeline specs with "
"version 0.2 or higher."
)
elif model_version == version.parse("0.2"):
pipeline_instance = load_pipeline_v_0_2(model=model)
elif model_version == version.parse("0.3"):
pipeline_instance = load_pipeline_v_0_3(model=model)
else:
pipeline_instance = load_pipeline_v_0_4(model=model)
version_hash = pipeline_instance._compute_unique_identifier(
pipeline_spec=model.spec
)
if version_hash != model.version_hash:
logger.warning(
"Trying to load pipeline version `%s`, but the local step code "
"changed since this pipeline version was registered. Using "
"this pipeline instance will result in a different pipeline "
"version being registered or reused.",
model.version,
)
return pipeline_instance
load_pipeline_v_0_2(model)
Load a pipeline from a model with spec version 0.2.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
PipelineResponse |
The pipeline model to load. |
required |
Returns:
Type | Description |
---|---|
Pipeline |
The loaded pipeline. |
Source code in zenml/new/pipelines/deserialization_utils.py
def load_pipeline_v_0_2(model: "PipelineResponse") -> "Pipeline":
"""Load a pipeline from a model with spec version 0.2.
Args:
model: The pipeline model to load.
Returns:
The loaded pipeline.
"""
return _load_legacy_pipeline(
model=model, use_pipeline_parameter_name=False
)
load_pipeline_v_0_3(model)
Load a pipeline from a model with spec version 0.3.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
PipelineResponse |
The pipeline model to load. |
required |
Returns:
Type | Description |
---|---|
Pipeline |
The loaded pipeline. |
Source code in zenml/new/pipelines/deserialization_utils.py
def load_pipeline_v_0_3(model: "PipelineResponse") -> "Pipeline":
"""Load a pipeline from a model with spec version 0.3.
Args:
model: The pipeline model to load.
Returns:
The loaded pipeline.
"""
return _load_legacy_pipeline(model=model, use_pipeline_parameter_name=True)
load_pipeline_v_0_4(model)
Load a pipeline from a model with spec version 0.4.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
PipelineResponse |
The pipeline model to load. |
required |
Exceptions:
Type | Description |
---|---|
TypeError |
If the pipeline source does not refer to a pipeline instance. |
Returns:
Type | Description |
---|---|
Pipeline |
The loaded pipeline. |
Source code in zenml/new/pipelines/deserialization_utils.py
def load_pipeline_v_0_4(model: "PipelineResponse") -> "Pipeline":
"""Load a pipeline from a model with spec version 0.4.
Args:
model: The pipeline model to load.
Raises:
TypeError: If the pipeline source does not refer to a pipeline instance.
Returns:
The loaded pipeline.
"""
pipeline_source = model.spec.source
assert pipeline_source
pipeline = source_utils.load(pipeline_source)
if not isinstance(pipeline, Pipeline):
raise TypeError("Not a pipeline")
pipeline.prepare(**model.spec.parameters)
return pipeline
model_utils
Pipeline utilities to support Model Control Plane.
NewModelVersionRequest (BaseModel)
pydantic-model
Request to create a new model version.
Source code in zenml/new/pipelines/model_utils.py
class NewModelVersionRequest(BaseModel):
"""Request to create a new model version."""
class Requester(BaseModel):
"""Requester of a new model version."""
source: str
name: str
def __repr__(self) -> str:
"""Return a string representation of the requester.
Returns:
A string representation of the requester.
"""
return f"{self.source}::{self.name}"
requesters: List[Requester] = []
_model_version: Optional[ModelVersion] = PrivateAttr(default=None)
@property
def model_version(self) -> ModelVersion:
"""Model version getter.
Returns:
The model version.
Raises:
RuntimeError: If the model version is not set.
"""
if self._model_version is None:
raise RuntimeError("Model version is not set.")
return self._model_version
def update_request(
self,
model_version: ModelVersion,
requester: "NewModelVersionRequest.Requester",
) -> None:
"""Update from `ModelVersion` in place.
Args:
model_version: `ModelVersion` to use.
requester: Requester of a new model version.
"""
self.requesters.append(requester)
if self._model_version is None:
self._model_version = model_version
self._model_version._merge(model_version)
model_version: ModelVersion
property
readonly
Model version getter.
Returns:
Type | Description |
---|---|
ModelVersion |
The model version. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the model version is not set. |
Requester (BaseModel)
pydantic-model
Requester of a new model version.
Source code in zenml/new/pipelines/model_utils.py
class Requester(BaseModel):
"""Requester of a new model version."""
source: str
name: str
def __repr__(self) -> str:
"""Return a string representation of the requester.
Returns:
A string representation of the requester.
"""
return f"{self.source}::{self.name}"
__repr__(self)
special
Return a string representation of the requester.
Returns:
Type | Description |
---|---|
str |
A string representation of the requester. |
Source code in zenml/new/pipelines/model_utils.py
def __repr__(self) -> str:
"""Return a string representation of the requester.
Returns:
A string representation of the requester.
"""
return f"{self.source}::{self.name}"
update_request(self, model_version, requester)
Update from ModelVersion
in place.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_version |
ModelVersion |
|
required |
requester |
NewModelVersionRequest.Requester |
Requester of a new model version. |
required |
Source code in zenml/new/pipelines/model_utils.py
def update_request(
self,
model_version: ModelVersion,
requester: "NewModelVersionRequest.Requester",
) -> None:
"""Update from `ModelVersion` in place.
Args:
model_version: `ModelVersion` to use.
requester: Requester of a new model version.
"""
self.requesters.append(requester)
if self._model_version is None:
self._model_version = model_version
self._model_version._merge(model_version)
pipeline
Definition of a ZenML pipeline.
Pipeline
ZenML pipeline class.
Source code in zenml/new/pipelines/pipeline.py
class Pipeline:
"""ZenML pipeline class."""
# The active pipeline is the pipeline to which step invocations will be
# added when a step is called. It is set using a context manager when a
# pipeline is called (see Pipeline.__call__ for more context)
ACTIVE_PIPELINE: ClassVar[Optional["Pipeline"]] = None
def __init__(
self,
name: str,
entrypoint: F,
enable_cache: Optional[bool] = None,
enable_artifact_metadata: Optional[bool] = None,
enable_artifact_visualization: Optional[bool] = None,
enable_step_logs: Optional[bool] = None,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
extra: Optional[Dict[str, Any]] = None,
on_failure: Optional["HookSpecification"] = None,
on_success: Optional["HookSpecification"] = None,
model_version: Optional["ModelVersion"] = None,
) -> None:
"""Initializes a pipeline.
Args:
name: The name of the pipeline.
entrypoint: The entrypoint function of the pipeline.
enable_cache: If caching should be enabled for this pipeline.
enable_artifact_metadata: If artifact metadata should be enabled for
this pipeline.
enable_artifact_visualization: If artifact visualization should be
enabled for this pipeline.
enable_step_logs: If step logs should be enabled for this pipeline.
settings: settings for this pipeline.
extra: Extra configurations for this pipeline.
on_failure: Callback function in event of failure of the step. Can
be a function with a single argument of type `BaseException`, or
a source path to such a function (e.g. `module.my_function`).
on_success: Callback function in event of success of the step. Can
be a function with no arguments, or a source path to such a
function (e.g. `module.my_function`).
model_version: configuration of the model version in the Model Control Plane.
"""
self._invocations: Dict[str, StepInvocation] = {}
self._run_args: Dict[str, Any] = {}
self._configuration = PipelineConfiguration(
name=name,
)
self._from_config_file: Dict[str, Any] = {}
with self.__suppress_configure_warnings__():
self.configure(
enable_cache=enable_cache,
enable_artifact_metadata=enable_artifact_metadata,
enable_artifact_visualization=enable_artifact_visualization,
enable_step_logs=enable_step_logs,
settings=settings,
extra=extra,
on_failure=on_failure,
on_success=on_success,
model_version=model_version,
)
self.entrypoint = entrypoint
self._parameters: Dict[str, Any] = {}
self.__suppress_warnings_flag__ = False
self.__new_unnamed_model_versions_in_current_run__: Dict[str, int] = {}
@property
def name(self) -> str:
"""The name of the pipeline.
Returns:
The name of the pipeline.
"""
return self.configuration.name
@property
def enable_cache(self) -> Optional[bool]:
"""If caching is enabled for the pipeline.
Returns:
If caching is enabled for the pipeline.
"""
return self.configuration.enable_cache
@property
def configuration(self) -> PipelineConfiguration:
"""The configuration of the pipeline.
Returns:
The configuration of the pipeline.
"""
return self._configuration
@property
def invocations(self) -> Dict[str, StepInvocation]:
"""Returns the step invocations of this pipeline.
This dictionary will only be populated once the pipeline has been
called.
Returns:
The step invocations.
"""
return self._invocations
def resolve(self) -> "Source":
"""Resolves the pipeline.
Returns:
The pipeline source.
"""
return source_utils.resolve(self.entrypoint, skip_validation=True)
@property
def source_object(self) -> Any:
"""The source object of this pipeline.
Returns:
The source object of this pipeline.
"""
return self.entrypoint
@property
def source_code(self) -> str:
"""The source code of this pipeline.
Returns:
The source code of this pipeline.
"""
return inspect.getsource(self.source_object)
@classmethod
def from_model(cls, model: "PipelineResponse") -> "Pipeline":
"""Creates a pipeline instance from a model.
Args:
model: The model to load the pipeline instance from.
Returns:
The pipeline instance.
"""
from zenml.new.pipelines.deserialization_utils import load_pipeline
return load_pipeline(model=model)
@property
def model(self) -> "PipelineResponse":
"""Gets the registered pipeline model for this instance.
Returns:
The registered pipeline model.
Raises:
RuntimeError: If the pipeline has not been registered yet.
"""
self._prepare_if_possible()
pipeline_spec = Compiler().compile_spec(self)
version_hash = self._compute_unique_identifier(
pipeline_spec=pipeline_spec
)
pipelines = Client().list_pipelines(
name=self.name, version_hash=version_hash
)
if len(pipelines) == 1:
return pipelines.items[0]
raise RuntimeError(
f"Cannot get the model of pipeline '{self.name}' because it has "
f"not been registered yet. Please ensure that the pipeline has "
f"been run and that at least one step has been executed."
)
@contextmanager
def __suppress_configure_warnings__(self) -> Iterator[Any]:
"""Context manager to suppress warnings in `Pipeline.configure(...)`.
Used to suppress warnings when called from inner code and not user-facing code.
Yields:
Nothing.
"""
self.__suppress_warnings_flag__ = True
yield
self.__suppress_warnings_flag__ = False
def configure(
self: T,
enable_cache: Optional[bool] = None,
enable_artifact_metadata: Optional[bool] = None,
enable_artifact_visualization: Optional[bool] = None,
enable_step_logs: Optional[bool] = None,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
extra: Optional[Dict[str, Any]] = None,
on_failure: Optional["HookSpecification"] = None,
on_success: Optional["HookSpecification"] = None,
model_version: Optional["ModelVersion"] = None,
parameters: Optional[Dict[str, Any]] = None,
merge: bool = True,
) -> T:
"""Configures the pipeline.
Configuration merging example:
* `merge==True`:
pipeline.configure(extra={"key1": 1})
pipeline.configure(extra={"key2": 2}, merge=True)
pipeline.configuration.extra # {"key1": 1, "key2": 2}
* `merge==False`:
pipeline.configure(extra={"key1": 1})
pipeline.configure(extra={"key2": 2}, merge=False)
pipeline.configuration.extra # {"key2": 2}
Args:
enable_cache: If caching should be enabled for this pipeline.
enable_artifact_metadata: If artifact metadata should be enabled for
this pipeline.
enable_artifact_visualization: If artifact visualization should be
enabled for this pipeline.
enable_step_logs: If step logs should be enabled for this pipeline.
settings: settings for this pipeline.
extra: Extra configurations for this pipeline.
on_failure: Callback function in event of failure of the step. Can
be a function with a single argument of type `BaseException`, or
a source path to such a function (e.g. `module.my_function`).
on_success: Callback function in event of success of the step. Can
be a function with no arguments, or a source path to such a
function (e.g. `module.my_function`).
merge: If `True`, will merge the given dictionary configurations
like `extra` and `settings` with existing
configurations. If `False` the given configurations will
overwrite all existing ones. See the general description of this
method for an example.
model_version: configuration of the model version in the Model Control Plane.
parameters: input parameters for the pipeline.
Returns:
The pipeline instance that this method was called on.
"""
failure_hook_source = None
if on_failure:
# string of on_failure hook function to be used for this pipeline
failure_hook_source = resolve_and_validate_hook(on_failure)
success_hook_source = None
if on_success:
# string of on_success hook function to be used for this pipeline
success_hook_source = resolve_and_validate_hook(on_success)
values = dict_utils.remove_none_values(
{
"enable_cache": enable_cache,
"enable_artifact_metadata": enable_artifact_metadata,
"enable_artifact_visualization": enable_artifact_visualization,
"enable_step_logs": enable_step_logs,
"settings": settings,
"extra": extra,
"failure_hook_source": failure_hook_source,
"success_hook_source": success_hook_source,
"model_version": model_version,
"parameters": parameters,
}
)
if not self.__suppress_warnings_flag__:
to_be_reapplied = []
for param_, value_ in values.items():
if (
param_ in PipelineRunConfiguration.__fields__
and param_ in self._from_config_file
and value_ != self._from_config_file[param_]
):
to_be_reapplied.append(
(param_, self._from_config_file[param_], value_)
)
if to_be_reapplied:
msg = ""
reapply_during_run_warning = (
"The value of parameter '{name}' has changed from "
"'{file_value}' to '{new_value}' set in your configuration file.\n"
)
for name, file_value, new_value in to_be_reapplied:
msg += reapply_during_run_warning.format(
name=name, file_value=file_value, new_value=new_value
)
msg += (
"Configuration file value will be used during pipeline run, "
"so you change will not be efficient. Consider updating your "
"configuration file instead."
)
logger.warning(msg)
config = PipelineConfigurationUpdate(**values)
self._apply_configuration(config, merge=merge)
return self
@property
def requires_parameters(self) -> bool:
"""If the pipeline entrypoint requires parameters.
Returns:
If the pipeline entrypoint requires parameters.
"""
signature = inspect.signature(self.entrypoint, follow_wrapped=True)
return any(
parameter.default is inspect.Parameter.empty
for parameter in signature.parameters.values()
)
@property
def is_prepared(self) -> bool:
"""If the pipeline is prepared.
Prepared means that the pipeline entrypoint has been called and the
pipeline is fully defined.
Returns:
If the pipeline is prepared.
"""
return len(self.invocations) > 0
def prepare(self, *args: Any, **kwargs: Any) -> None:
"""Prepares the pipeline.
Args:
*args: Pipeline entrypoint input arguments.
**kwargs: Pipeline entrypoint input keyword arguments.
Raises:
RuntimeError: If the pipeline has parameters configured differently in
configuration file and code.
"""
# Clear existing parameters and invocations
self._parameters = {}
self._invocations = {}
conflicting_parameters = {}
parameters_ = (self.configuration.parameters or {}).copy()
if from_file_ := self._from_config_file.get("parameters", None):
parameters_ = dict_utils.recursive_update(parameters_, from_file_)
if parameters_:
for k, v_runtime in kwargs.items():
if k in parameters_:
v_config = parameters_[k]
if v_config != v_runtime:
conflicting_parameters[k] = (v_config, v_runtime)
if conflicting_parameters:
is_plural = "s" if len(conflicting_parameters) > 1 else ""
msg = f"Configured parameter{is_plural} for the pipeline `{self.name}` conflict{'' if not is_plural else 's'} with parameter{is_plural} passed in runtime:\n"
for key, values in conflicting_parameters.items():
msg += f"`{key}`: config=`{values[0]}` | runtime=`{values[1]}`\n"
msg += """This happens, if you define values for pipeline parameters in configuration file and pass same parameters from the code. Example:
```
# config.yaml
parameters:
param_name: value1
# pipeline.py
@pipeline
def pipeline_(param_name: str):
step_name()
if __name__=="__main__":
pipeline_.with_options(config_file="config.yaml")(param_name="value2")
```
To avoid this consider setting pipeline parameters only in one place (config or code).
"""
raise RuntimeError(msg)
for k, v_config in parameters_.items():
if k not in kwargs:
kwargs[k] = v_config
with self:
# Enter the context manager, so we become the active pipeline. This
# means that all steps that get called while the entrypoint function
# is executed will be added as invocation to this pipeline instance.
self._call_entrypoint(*args, **kwargs)
def register(self) -> "PipelineResponse":
"""Register the pipeline in the server.
Returns:
The registered pipeline model.
"""
# Activating the built-in integrations to load all materializers
from zenml.integrations.registry import integration_registry
self._prepare_if_possible()
integration_registry.activate_integrations()
if self.configuration.dict(exclude_defaults=True, exclude={"name"}):
logger.warning(
f"The pipeline `{self.name}` that you're registering has "
"custom configurations applied to it. These will not be "
"registered with the pipeline and won't be set when you build "
"images or run the pipeline from the CLI. To provide these "
"configurations, use the `--config` option of the `zenml "
"pipeline build/run` commands."
)
pipeline_spec = Compiler().compile_spec(self)
return self._register(pipeline_spec=pipeline_spec)
def build(
self,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
step_configurations: Optional[
Mapping[str, "StepConfigurationUpdateOrDict"]
] = None,
config_path: Optional[str] = None,
) -> Optional["PipelineBuildResponse"]:
"""Builds Docker images for the pipeline.
Args:
settings: Settings for the pipeline.
step_configurations: Configurations for steps of the pipeline.
config_path: Path to a yaml configuration file. This file will
be parsed as a
`zenml.config.pipeline_configurations.PipelineRunConfiguration`
object. Options provided in this file will be overwritten by
options provided in code using the other arguments of this
method.
Returns:
The build output.
"""
with track_handler(event=AnalyticsEvent.BUILD_PIPELINE):
self._prepare_if_possible()
deployment, pipeline_spec, _, _ = self._compile(
config_path=config_path,
steps=step_configurations,
settings=settings,
)
pipeline_id = self._register(pipeline_spec=pipeline_spec).id
local_repo = code_repository_utils.find_active_code_repository()
code_repository = build_utils.verify_local_repository_context(
deployment=deployment, local_repo_context=local_repo
)
return build_utils.create_pipeline_build(
deployment=deployment,
pipeline_id=pipeline_id,
code_repository=code_repository,
)
def _run(
self,
*,
run_name: Optional[str] = None,
enable_cache: Optional[bool] = None,
enable_artifact_metadata: Optional[bool] = None,
enable_artifact_visualization: Optional[bool] = None,
enable_step_logs: Optional[bool] = None,
schedule: Optional[Schedule] = None,
build: Union[str, "UUID", "PipelineBuildBase", None] = None,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
step_configurations: Optional[
Mapping[str, "StepConfigurationUpdateOrDict"]
] = None,
extra: Optional[Dict[str, Any]] = None,
config_path: Optional[str] = None,
unlisted: bool = False,
prevent_build_reuse: bool = False,
) -> None:
"""Runs the pipeline on the active stack.
Args:
run_name: Name of the pipeline run.
enable_cache: If caching should be enabled for this pipeline run.
enable_artifact_metadata: If artifact metadata should be enabled
for this pipeline run.
enable_artifact_visualization: If artifact visualization should be
enabled for this pipeline run.
enable_step_logs: If step logs should be enabled for this pipeline.
schedule: Optional schedule to use for the run.
build: Optional build to use for the run.
settings: Settings for this pipeline run.
step_configurations: Configurations for steps of the pipeline.
extra: Extra configurations for this pipeline run.
config_path: Path to a yaml configuration file. This file will
be parsed as a
`zenml.config.pipeline_configurations.PipelineRunConfiguration`
object. Options provided in this file will be overwritten by
options provided in code using the other arguments of this
method.
unlisted: Whether the pipeline run should be unlisted (not assigned
to any pipeline).
prevent_build_reuse: Whether to prevent the reuse of a build.
Raises:
Exception: bypass any exception from pipeline up.
"""
if constants.SHOULD_PREVENT_PIPELINE_EXECUTION:
# An environment variable was set to stop the execution of
# pipelines. This is done to prevent execution of module-level
# pipeline.run() calls when importing modules needed to run a step.
logger.info(
"Preventing execution of pipeline '%s'. If this is not "
"intended behavior, make sure to unset the environment "
"variable '%s'.",
self.name,
constants.ENV_ZENML_PREVENT_PIPELINE_EXECUTION,
)
return
logger.info(f"Initiating a new run for the pipeline: `{self.name}`.")
with track_handler(AnalyticsEvent.RUN_PIPELINE) as analytics_handler:
deployment, pipeline_spec, schedule, build = self._compile(
config_path=config_path,
run_name=run_name,
enable_cache=enable_cache,
enable_artifact_metadata=enable_artifact_metadata,
enable_artifact_visualization=enable_artifact_visualization,
enable_step_logs=enable_step_logs,
steps=step_configurations,
settings=settings,
schedule=schedule,
build=build,
extra=extra,
)
skip_pipeline_registration = constants.handle_bool_env_var(
constants.ENV_ZENML_SKIP_PIPELINE_REGISTRATION,
default=False,
)
register_pipeline = not (skip_pipeline_registration or unlisted)
pipeline_id = None
if register_pipeline:
pipeline_id = self._register(pipeline_spec=pipeline_spec).id
else:
logger.debug(f"Pipeline {self.name} is unlisted.")
# TODO: check whether orchestrator even support scheduling before
# registering the schedule
schedule_id = None
if schedule:
if schedule.name:
schedule_name = schedule.name
else:
date = datetime.utcnow().strftime("%Y_%m_%d")
time = datetime.utcnow().strftime("%H_%M_%S_%f")
schedule_name = deployment.run_name_template.format(
date=date, time=time
)
components = Client().active_stack_model.components
orchestrator = components[StackComponentType.ORCHESTRATOR][0]
schedule_model = ScheduleRequest(
workspace=Client().active_workspace.id,
user=Client().active_user.id,
pipeline_id=pipeline_id,
orchestrator_id=orchestrator.id,
name=schedule_name,
active=True,
cron_expression=schedule.cron_expression,
start_time=schedule.start_time,
end_time=schedule.end_time,
interval_second=schedule.interval_second,
catchup=schedule.catchup,
)
schedule_id = (
Client().zen_store.create_schedule(schedule_model).id
)
logger.info(
f"Created schedule `{schedule_name}` for pipeline "
f"`{deployment.pipeline_configuration.name}`."
)
stack = Client().active_stack
stack.validate()
self.prepare_model_versions(deployment)
local_repo_context = (
code_repository_utils.find_active_code_repository()
)
code_repository = build_utils.verify_local_repository_context(
deployment=deployment, local_repo_context=local_repo_context
)
build_model = build_utils.reuse_or_create_pipeline_build(
deployment=deployment,
pipeline_id=pipeline_id,
allow_build_reuse=not prevent_build_reuse,
build=build,
code_repository=code_repository,
)
build_id = build_model.id if build_model else None
code_reference = None
if local_repo_context and not local_repo_context.is_dirty:
source_root = source_utils.get_source_root()
subdirectory = (
Path(source_root)
.resolve()
.relative_to(local_repo_context.root)
)
code_reference = CodeReferenceRequest(
commit=local_repo_context.current_commit,
subdirectory=subdirectory.as_posix(),
code_repository=local_repo_context.code_repository_id,
)
deployment_request = PipelineDeploymentRequest(
user=Client().active_user.id,
workspace=Client().active_workspace.id,
stack=stack.id,
pipeline=pipeline_id,
build=build_id,
schedule=schedule_id,
code_reference=code_reference,
**deployment.dict(),
)
deployment_model = Client().zen_store.create_deployment(
deployment=deployment_request
)
analytics_handler.metadata = self._get_pipeline_analytics_metadata(
deployment=deployment_model, stack=stack
)
stack.prepare_pipeline_deployment(deployment=deployment_model)
self.log_pipeline_deployment_metadata(deployment_model)
# Prevent execution of nested pipelines which might lead to
# unexpected behavior
constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True
try:
stack.deploy_pipeline(deployment=deployment_model)
except Exception as e:
raise e
finally:
constants.SHOULD_PREVENT_PIPELINE_EXECUTION = False
runs = Client().list_pipeline_runs(
deployment_id=deployment_model.id,
sort_by="desc:start_time",
size=1,
)
if runs.items:
run_url = dashboard_utils.get_run_url(runs[0])
if run_url:
logger.info(f"Dashboard URL: {run_url}")
else:
logger.info(
"You can visualize your pipeline runs in the `ZenML "
"Dashboard`. In order to try it locally, please run "
"`zenml up`."
)
else:
logger.warning(
f"Your orchestrator '{stack.orchestrator.name}' is "
f"running remotely. Note that the pipeline run will "
f"only show up on the ZenML dashboard once the first "
f"step has started executing on the remote "
f"infrastructure.",
)
@staticmethod
def log_pipeline_deployment_metadata(
deployment_model: PipelineDeploymentResponse,
) -> None:
"""Displays logs based on the deployment model upon running a pipeline.
Args:
deployment_model: The model for the pipeline deployment
"""
try:
# Log about the schedule/run
if deployment_model.schedule:
logger.info(
"Scheduling a run with the schedule: "
f"`{deployment_model.schedule.name}`"
)
else:
logger.info("Executing a new run.")
# Log about the caching status
if deployment_model.pipeline_configuration.enable_cache is False:
logger.info(
f"Caching is disabled by default for "
f"`{deployment_model.pipeline_configuration.name}`."
)
# Log about the used builds
if deployment_model.build:
logger.info("Using a build:")
logger.info(
" Image(s): "
f"{', '.join([i.image for i in deployment_model.build.images.values()])}"
)
# Log about version mismatches between local and build
from zenml import __version__
if deployment_model.build.zenml_version != __version__:
logger.info(
f"ZenML version (different than the local version): "
f"{deployment_model.build.zenml_version}"
)
import platform
if (
deployment_model.build.python_version
!= platform.python_version()
):
logger.info(
f"Python version (different than the local version): "
f"{deployment_model.build.python_version}"
)
# Log about the user, stack and components
if deployment_model.user is not None:
logger.info(f"Using user: `{deployment_model.user.name}`")
if deployment_model.stack is not None:
logger.info(f"Using stack: `{deployment_model.stack.name}`")
for (
component_type,
component_models,
) in deployment_model.stack.components.items():
logger.info(
f" {component_type.value}: `{component_models[0].name}`"
)
except Exception as e:
logger.debug(f"Logging pipeline deployment metadata failed: {e}")
def _update_new_requesters(
self,
requester_name: str,
model_version: "ModelVersion",
new_versions_requested: Dict[
Tuple[str, Optional[str]], NewModelVersionRequest
],
other_model_versions: Set["ModelVersion"],
) -> None:
key = (
model_version.name,
str(model_version.version) if model_version.version else None,
)
if model_version.version is None:
version_existed = False
else:
try:
model_version._get_model_version()
version_existed = key not in new_versions_requested
except KeyError:
version_existed = False
if not version_existed:
model_version.was_created_in_this_run = True
new_versions_requested[key].update_request(
model_version,
NewModelVersionRequest.Requester(
source="step", name=requester_name
),
)
else:
other_model_versions.add(model_version)
def prepare_model_versions(
self, deployment: "PipelineDeploymentBase"
) -> None:
"""Create model versions which are missing and validate existing ones that are used in the pipeline run.
Args:
deployment: The pipeline deployment configuration.
"""
new_versions_requested: Dict[
Tuple[str, Optional[str]], NewModelVersionRequest
] = defaultdict(NewModelVersionRequest)
other_model_versions: Set["ModelVersion"] = set()
all_steps_have_own_config = True
for step in deployment.step_configurations.values():
step_model_version = step.config.model_version
all_steps_have_own_config = (
all_steps_have_own_config
and step.config.model_version is not None
)
if step_model_version:
self._update_new_requesters(
model_version=step_model_version,
requester_name=step.config.name,
new_versions_requested=new_versions_requested,
other_model_versions=other_model_versions,
)
if not all_steps_have_own_config:
pipeline_model_version = (
deployment.pipeline_configuration.model_version
)
if pipeline_model_version:
self._update_new_requesters(
model_version=pipeline_model_version,
requester_name=self.name,
new_versions_requested=new_versions_requested,
other_model_versions=other_model_versions,
)
elif deployment.pipeline_configuration.model_version is not None:
logger.warning(
f"ModelConfig of pipeline `{self.name}` is overridden in all "
f"steps. "
)
self._validate_new_version_requests(new_versions_requested)
for other_model_version in other_model_versions:
other_model_version._validate_config_in_runtime()
def _validate_new_version_requests(
self,
new_versions_requested: Dict[
Tuple[str, Optional[str]], NewModelVersionRequest
],
) -> None:
"""Validate the model version that are used in the pipeline run.
Args:
new_versions_requested: A dict of new model version request objects.
"""
for key, data in new_versions_requested.items():
model_name, model_version = key
if len(data.requesters) > 1:
logger.warning(
f"New version of model version `{model_name}::{model_version or 'NEW'}` "
f"requested in multiple decorators:\n{data.requesters}\n We recommend "
"that `ModelVersion` requesting new version is configured only in one "
"place of the pipeline."
)
data.model_version._validate_config_in_runtime()
self.__new_unnamed_model_versions_in_current_run__[
data.model_version.name
] = data.model_version.number
def get_runs(self, **kwargs: Any) -> List["PipelineRunResponse"]:
"""(Deprecated) Get runs of this pipeline.
Args:
**kwargs: Further arguments for filtering or pagination that are
passed to `client.list_pipeline_runs()`.
Returns:
List of runs of this pipeline.
"""
logger.warning(
"`Pipeline.get_runs()` is deprecated and will be removed in a "
"future version. Please use `Pipeline.model.get_runs()` instead."
)
return self.model.get_runs(**kwargs)
def write_run_configuration_template(
self, path: str, stack: Optional["Stack"] = None
) -> None:
"""Writes a run configuration yaml template.
Args:
path: The path where the template will be written.
stack: The stack for which the template should be generated. If
not given, the active stack will be used.
"""
from zenml.config.base_settings import ConfigurationLevel
from zenml.config.step_configurations import (
PartialArtifactConfiguration,
)
self._prepare_if_possible()
stack = stack or Client().active_stack
setting_classes = stack.setting_classes
setting_classes.update(settings_utils.get_general_settings())
pipeline_settings = {}
step_settings = {}
for key, setting_class in setting_classes.items():
fields = pydantic_utils.TemplateGenerator(setting_class).run()
if ConfigurationLevel.PIPELINE in setting_class.LEVEL:
pipeline_settings[key] = fields
if ConfigurationLevel.STEP in setting_class.LEVEL:
step_settings[key] = fields
steps = {}
for step_name, invocation in self.invocations.items():
step = invocation.step
parameters = (
pydantic_utils.TemplateGenerator(
step.entrypoint_definition.legacy_params.annotation
).run()
if step.entrypoint_definition.legacy_params
else {}
)
outputs = {
name: PartialArtifactConfiguration()
for name in step.entrypoint_definition.outputs
}
step_template = StepConfigurationUpdate(
parameters=parameters,
settings=step_settings,
outputs=outputs,
)
steps[step_name] = step_template
run_config = PipelineRunConfiguration(
settings=pipeline_settings, steps=steps
)
template = pydantic_utils.TemplateGenerator(run_config).run()
yaml_string = yaml.dump(template)
yaml_string = yaml_utils.comment_out_yaml(yaml_string)
with open(path, "w") as f:
f.write(yaml_string)
def _apply_configuration(
self,
config: PipelineConfigurationUpdate,
merge: bool = True,
) -> None:
"""Applies an update to the pipeline configuration.
Args:
config: The configuration update.
merge: Whether to merge the updates with the existing configuration
or not. See the `BasePipeline.configure(...)` method for a
detailed explanation.
"""
self._validate_configuration(config)
self._configuration = pydantic_utils.update_model(
self._configuration, update=config, recursive=merge
)
logger.debug("Updated pipeline configuration:")
logger.debug(self._configuration)
@staticmethod
def _validate_configuration(config: PipelineConfigurationUpdate) -> None:
"""Validates a configuration update.
Args:
config: The configuration update to validate.
"""
settings_utils.validate_setting_keys(list(config.settings))
def _get_pipeline_analytics_metadata(
self,
deployment: "PipelineDeploymentResponse",
stack: "Stack",
) -> Dict[str, Any]:
"""Returns the pipeline deployment metadata.
Args:
deployment: The pipeline deployment to track.
stack: The stack on which the pipeline will be deployed.
Returns:
the metadata about the pipeline deployment
"""
custom_materializer = False
for step in deployment.step_configurations.values():
for output in step.config.outputs.values():
for source in output.materializer_source:
if not source.is_internal:
custom_materializer = True
stack_creator = Client().get_stack(stack.id).user
active_user = Client().active_user
own_stack = stack_creator and stack_creator.id == active_user.id
stack_metadata = {
component_type.value: component.flavor
for component_type, component in stack.components.items()
}
return {
"store_type": Client().zen_store.type.value,
**stack_metadata,
"total_steps": len(self.invocations),
"schedule": bool(deployment.schedule),
"custom_materializer": custom_materializer,
"own_stack": own_stack,
}
def _compile(
self, config_path: Optional[str] = None, **run_configuration_args: Any
) -> Tuple[
"PipelineDeploymentBase",
"PipelineSpec",
Optional["Schedule"],
Union["PipelineBuildBase", UUID, None],
]:
"""Compiles the pipeline.
Args:
config_path: Path to a config file.
**run_configuration_args: Configurations for the pipeline run.
Returns:
A tuple containing the deployment, spec, schedule and build of
the compiled pipeline.
"""
# Activating the built-in integrations to load all materializers
from zenml.integrations.registry import integration_registry
integration_registry.activate_integrations()
self._parse_config_file(
config_path=config_path,
matcher=list(PipelineRunConfiguration.__fields__.keys()),
)
run_config = PipelineRunConfiguration(**self._from_config_file)
new_values = dict_utils.remove_none_values(run_configuration_args)
update = PipelineRunConfiguration.parse_obj(new_values)
# Update with the values in code so they take precedence
run_config = pydantic_utils.update_model(run_config, update=update)
deployment, pipeline_spec = Compiler().compile(
pipeline=self,
stack=Client().active_stack,
run_configuration=run_config,
)
return deployment, pipeline_spec, run_config.schedule, run_config.build
def _register(self, pipeline_spec: "PipelineSpec") -> "PipelineResponse":
"""Register the pipeline in the server.
Args:
pipeline_spec: The pipeline spec to register.
Returns:
The registered pipeline model.
"""
version_hash = self._compute_unique_identifier(
pipeline_spec=pipeline_spec
)
client = Client()
matching_pipelines = client.list_pipelines(
name=self.name,
version_hash=version_hash,
size=1,
sort_by="desc:created",
)
if matching_pipelines.total:
registered_pipeline = matching_pipelines.items[0]
logger.info(
"Reusing registered version: `(version: %s)`.",
registered_pipeline.version,
)
return registered_pipeline
latest_version = self._get_latest_version() or 0
version = str(latest_version + 1)
request = PipelineRequest(
workspace=client.active_workspace.id,
user=client.active_user.id,
name=self.name,
version=version,
version_hash=version_hash,
spec=pipeline_spec,
docstring=self.__doc__,
)
registered_pipeline = client.zen_store.create_pipeline(
pipeline=request
)
logger.info(
"Registered new version: `(version %s)`.",
registered_pipeline.version,
)
return registered_pipeline
def _compute_unique_identifier(self, pipeline_spec: PipelineSpec) -> str:
"""Computes a unique identifier from the pipeline spec and steps.
Args:
pipeline_spec: Compiled spec of the pipeline.
Returns:
The unique identifier of the pipeline.
"""
from packaging import version
hash_ = hashlib.md5() # nosec
hash_.update(pipeline_spec.json_with_string_sources.encode())
if version.parse(pipeline_spec.version) >= version.parse("0.4"):
# Only add this for newer versions to keep backwards compatibility
hash_.update(self.source_code.encode())
for step_spec in pipeline_spec.steps:
invocation = self.invocations[step_spec.pipeline_parameter_name]
step_source = invocation.step.source_code
hash_.update(step_source.encode())
return hash_.hexdigest()
def _get_latest_version(self) -> Optional[int]:
"""Gets the latest version of this pipeline.
Returns:
The latest version or `None` if no version exists.
"""
all_pipelines = Client().list_pipelines(
name=self.name, sort_by="desc:created", size=1
)
if not all_pipelines.total:
return None
pipeline = all_pipelines.items[0]
if pipeline.version == "UNVERSIONED":
return None
return int(all_pipelines.items[0].version)
def add_step_invocation(
self,
step: "BaseStep",
input_artifacts: Dict[str, StepArtifact],
external_artifacts: Dict[str, "ExternalArtifact"],
parameters: Dict[str, Any],
upstream_steps: Set[str],
custom_id: Optional[str] = None,
allow_id_suffix: bool = True,
) -> str:
"""Adds a step invocation to the pipeline.
Args:
step: The step for which to add an invocation.
input_artifacts: The input artifacts for the invocation.
external_artifacts: The external artifacts for the invocation.
parameters: The parameters for the invocation.
upstream_steps: The upstream steps for the invocation.
custom_id: Custom ID to use for the invocation.
allow_id_suffix: Whether a suffix can be appended to the invocation
ID.
Raises:
RuntimeError: If the method is called on an inactive pipeline.
RuntimeError: If the invocation was called with an artifact from
a different pipeline.
Returns:
The step invocation ID.
"""
if Pipeline.ACTIVE_PIPELINE != self:
raise RuntimeError(
"A step invocation can only be added to an active pipeline."
)
for artifact in input_artifacts.values():
if artifact.pipeline is not self:
raise RuntimeError(
"Got invalid input artifact for invocation of step "
f"{step.name}: The input artifact was produced by a step "
f"inside a different pipeline {artifact.pipeline.name}."
)
invocation_id = self._compute_invocation_id(
step=step, custom_id=custom_id, allow_suffix=allow_id_suffix
)
invocation = StepInvocation(
id=invocation_id,
step=step,
input_artifacts=input_artifacts,
external_artifacts=external_artifacts,
parameters=parameters,
upstream_steps=upstream_steps,
pipeline=self,
)
self._invocations[invocation_id] = invocation
return invocation_id
def _compute_invocation_id(
self,
step: "BaseStep",
custom_id: Optional[str] = None,
allow_suffix: bool = True,
) -> str:
"""Compute the invocation ID.
Args:
step: The step for which to compute the ID.
custom_id: Custom ID to use for the invocation.
allow_suffix: Whether a suffix can be appended to the invocation
ID.
Raises:
RuntimeError: If no ID suffix is allowed and an invocation for the
same ID already exists.
RuntimeError: If no unique invocation ID can be found.
Returns:
The invocation ID.
"""
base_id = id_ = custom_id or step.name
if id_ not in self.invocations:
return id_
if not allow_suffix:
raise RuntimeError("Duplicate step ID")
for index in range(2, 10000):
id_ = f"{base_id}_{index}"
if id_ not in self.invocations:
return id_
raise RuntimeError("Unable to find step ID")
def __enter__(self: T) -> T:
"""Activate the pipeline context.
Raises:
RuntimeError: If a different pipeline is already active.
Returns:
The pipeline instance.
"""
if Pipeline.ACTIVE_PIPELINE:
raise RuntimeError(
"Unable to enter pipeline context. A different pipeline "
f"{Pipeline.ACTIVE_PIPELINE.name} is already active."
)
Pipeline.ACTIVE_PIPELINE = self
return self
def __exit__(self, *args: Any) -> None:
"""Deactivates the pipeline context.
Args:
*args: The arguments passed to the context exit handler.
"""
Pipeline.ACTIVE_PIPELINE = None
def _parse_config_file(
self, config_path: Optional[str], matcher: List[str]
) -> None:
"""Parses the given configuration file and sets `self._from_config_file`.
Args:
config_path: Path to a yaml configuration file.
matcher: List of keys to match in the configuration file.
"""
_from_config_file: Dict[str, Any] = {}
if config_path:
with open(config_path, "r") as f:
_from_config_file = yaml.load(f, Loader=yaml.SafeLoader)
_from_config_file = dict_utils.remove_none_values(
{k: v for k, v in _from_config_file.items() if k in matcher}
)
if "model_version" in _from_config_file:
if "model_version" in self._from_config_file:
_from_config_file[
"model_version"
] = self._from_config_file["model_version"]
else:
from zenml.model.model_version import ModelVersion
_from_config_file[
"model_version"
] = ModelVersion.parse_obj(
_from_config_file["model_version"]
)
self._from_config_file = _from_config_file
def with_options(
self,
run_name: Optional[str] = None,
schedule: Optional[Schedule] = None,
build: Union[str, "UUID", "PipelineBuildBase", None] = None,
step_configurations: Optional[
Mapping[str, "StepConfigurationUpdateOrDict"]
] = None,
config_path: Optional[str] = None,
unlisted: bool = False,
prevent_build_reuse: bool = False,
**kwargs: Any,
) -> "Pipeline":
"""Copies the pipeline and applies the given configurations.
Args:
run_name: Name of the pipeline run.
schedule: Optional schedule to use for the run.
build: Optional build to use for the run.
step_configurations: Configurations for steps of the pipeline.
config_path: Path to a yaml configuration file. This file will
be parsed as a
`zenml.config.pipeline_configurations.PipelineRunConfiguration`
object. Options provided in this file will be overwritten by
options provided in code using the other arguments of this
method.
unlisted: Whether the pipeline run should be unlisted (not assigned
to any pipeline).
prevent_build_reuse: Whether to prevent the reuse of a build.
**kwargs: Pipeline configuration options. These will be passed
to the `pipeline.configure(...)` method.
Returns:
The copied pipeline instance.
"""
pipeline_copy = self.copy()
pipeline_copy._parse_config_file(
config_path=config_path,
matcher=inspect.getfullargspec(self.configure)[0],
)
pipeline_copy._from_config_file = dict_utils.recursive_update(
pipeline_copy._from_config_file, kwargs
)
with pipeline_copy.__suppress_configure_warnings__():
pipeline_copy.configure(**pipeline_copy._from_config_file)
run_args = dict_utils.remove_none_values(
{
"run_name": run_name,
"schedule": schedule,
"build": build,
"step_configurations": step_configurations,
"config_path": config_path,
"unlisted": unlisted,
"prevent_build_reuse": prevent_build_reuse,
}
)
pipeline_copy._run_args.update(run_args)
return pipeline_copy
def copy(self) -> "Pipeline":
"""Copies the pipeline.
Returns:
The pipeline copy.
"""
return copy.deepcopy(self)
def __call__(self, *args: Any, **kwargs: Any) -> Any:
"""Handle a call of the pipeline.
This method does one of two things:
* If there is an active pipeline context, it calls the pipeline
entrypoint function within that context and the step invocations
will be added to the active pipeline.
* If no pipeline is active, it activates this pipeline before calling
the entrypoint function.
Args:
*args: Entrypoint function arguments.
**kwargs: Entrypoint function keyword arguments.
Returns:
The outputs of the entrypoint function call.
"""
if Pipeline.ACTIVE_PIPELINE:
# Calling a pipeline inside a pipeline, we return the potential
# outputs of the entrypoint function
# TODO: This currently ignores the configuration of the pipeline
# and instead applies the configuration of the previously active
# pipeline. Is this what we want?
return self.entrypoint(*args, **kwargs)
self.prepare(*args, **kwargs)
return self._run(**self._run_args)
def _call_entrypoint(self, *args: Any, **kwargs: Any) -> None:
"""Calls the pipeline entrypoint function with the given arguments.
Args:
*args: Entrypoint function arguments.
**kwargs: Entrypoint function keyword arguments.
Raises:
ValueError: If an input argument is missing or not JSON
serializable.
"""
try:
validated_args = pydantic_utils.validate_function_args(
self.entrypoint,
{"arbitrary_types_allowed": False, "smart_union": True},
*args,
**kwargs,
)
except ValidationError as e:
raise ValueError(
"Invalid or missing inputs for pipeline entrypoint function. "
"Only JSON serializable inputs are allowed as pipeline inputs."
) from e
self._parameters = validated_args
self.entrypoint(**validated_args)
def _prepare_if_possible(self) -> None:
"""Prepares the pipeline if possible.
Raises:
RuntimeError: If the pipeline is not prepared and the preparation
requires parameters.
"""
if not self.is_prepared:
if self.requires_parameters:
raise RuntimeError(
f"Failed while trying to prepare pipeline {self.name}. "
"The entrypoint function of the pipeline requires "
"arguments. Please prepare the pipeline by calling "
"`pipeline_instance.prepare(...)` and try again."
)
else:
self.prepare()
configuration: PipelineConfiguration
property
readonly
The configuration of the pipeline.
Returns:
Type | Description |
---|---|
PipelineConfiguration |
The configuration of the pipeline. |
enable_cache: Optional[bool]
property
readonly
If caching is enabled for the pipeline.
Returns:
Type | Description |
---|---|
Optional[bool] |
If caching is enabled for the pipeline. |
invocations: Dict[str, zenml.steps.step_invocation.StepInvocation]
property
readonly
Returns the step invocations of this pipeline.
This dictionary will only be populated once the pipeline has been called.
Returns:
Type | Description |
---|---|
Dict[str, zenml.steps.step_invocation.StepInvocation] |
The step invocations. |
is_prepared: bool
property
readonly
If the pipeline is prepared.
Prepared means that the pipeline entrypoint has been called and the pipeline is fully defined.
Returns:
Type | Description |
---|---|
bool |
If the pipeline is prepared. |
model: PipelineResponse
property
readonly
Gets the registered pipeline model for this instance.
Returns:
Type | Description |
---|---|
PipelineResponse |
The registered pipeline model. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the pipeline has not been registered yet. |
name: str
property
readonly
The name of the pipeline.
Returns:
Type | Description |
---|---|
str |
The name of the pipeline. |
requires_parameters: bool
property
readonly
If the pipeline entrypoint requires parameters.
Returns:
Type | Description |
---|---|
bool |
If the pipeline entrypoint requires parameters. |
source_code: str
property
readonly
The source code of this pipeline.
Returns:
Type | Description |
---|---|
str |
The source code of this pipeline. |
source_object: Any
property
readonly
The source object of this pipeline.
Returns:
Type | Description |
---|---|
Any |
The source object of this pipeline. |
__call__(self, *args, **kwargs)
special
Handle a call of the pipeline.
This method does one of two things: * If there is an active pipeline context, it calls the pipeline entrypoint function within that context and the step invocations will be added to the active pipeline. * If no pipeline is active, it activates this pipeline before calling the entrypoint function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
Entrypoint function arguments. |
() |
**kwargs |
Any |
Entrypoint function keyword arguments. |
{} |
Returns:
Type | Description |
---|---|
Any |
The outputs of the entrypoint function call. |
Source code in zenml/new/pipelines/pipeline.py
def __call__(self, *args: Any, **kwargs: Any) -> Any:
"""Handle a call of the pipeline.
This method does one of two things:
* If there is an active pipeline context, it calls the pipeline
entrypoint function within that context and the step invocations
will be added to the active pipeline.
* If no pipeline is active, it activates this pipeline before calling
the entrypoint function.
Args:
*args: Entrypoint function arguments.
**kwargs: Entrypoint function keyword arguments.
Returns:
The outputs of the entrypoint function call.
"""
if Pipeline.ACTIVE_PIPELINE:
# Calling a pipeline inside a pipeline, we return the potential
# outputs of the entrypoint function
# TODO: This currently ignores the configuration of the pipeline
# and instead applies the configuration of the previously active
# pipeline. Is this what we want?
return self.entrypoint(*args, **kwargs)
self.prepare(*args, **kwargs)
return self._run(**self._run_args)
__enter__(self)
special
Activate the pipeline context.
Exceptions:
Type | Description |
---|---|
RuntimeError |
If a different pipeline is already active. |
Returns:
Type | Description |
---|---|
~T |
The pipeline instance. |
Source code in zenml/new/pipelines/pipeline.py
def __enter__(self: T) -> T:
"""Activate the pipeline context.
Raises:
RuntimeError: If a different pipeline is already active.
Returns:
The pipeline instance.
"""
if Pipeline.ACTIVE_PIPELINE:
raise RuntimeError(
"Unable to enter pipeline context. A different pipeline "
f"{Pipeline.ACTIVE_PIPELINE.name} is already active."
)
Pipeline.ACTIVE_PIPELINE = self
return self
__exit__(self, *args)
special
Deactivates the pipeline context.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
The arguments passed to the context exit handler. |
() |
Source code in zenml/new/pipelines/pipeline.py
def __exit__(self, *args: Any) -> None:
"""Deactivates the pipeline context.
Args:
*args: The arguments passed to the context exit handler.
"""
Pipeline.ACTIVE_PIPELINE = None
__init__(self, name, entrypoint, enable_cache=None, enable_artifact_metadata=None, enable_artifact_visualization=None, enable_step_logs=None, settings=None, extra=None, on_failure=None, on_success=None, model_version=None)
special
Initializes a pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the pipeline. |
required |
entrypoint |
~F |
The entrypoint function of the pipeline. |
required |
enable_cache |
Optional[bool] |
If caching should be enabled for this pipeline. |
None |
enable_artifact_metadata |
Optional[bool] |
If artifact metadata should be enabled for this pipeline. |
None |
enable_artifact_visualization |
Optional[bool] |
If artifact visualization should be enabled for this pipeline. |
None |
enable_step_logs |
Optional[bool] |
If step logs should be enabled for this pipeline. |
None |
settings |
Optional[Mapping[str, SettingsOrDict]] |
settings for this pipeline. |
None |
extra |
Optional[Dict[str, Any]] |
Extra configurations for this pipeline. |
None |
on_failure |
Optional[HookSpecification] |
Callback function in event of failure of the step. Can
be a function with a single argument of type |
None |
on_success |
Optional[HookSpecification] |
Callback function in event of success of the step. Can
be a function with no arguments, or a source path to such a
function (e.g. |
None |
model_version |
Optional[ModelVersion] |
configuration of the model version in the Model Control Plane. |
None |
Source code in zenml/new/pipelines/pipeline.py
def __init__(
self,
name: str,
entrypoint: F,
enable_cache: Optional[bool] = None,
enable_artifact_metadata: Optional[bool] = None,
enable_artifact_visualization: Optional[bool] = None,
enable_step_logs: Optional[bool] = None,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
extra: Optional[Dict[str, Any]] = None,
on_failure: Optional["HookSpecification"] = None,
on_success: Optional["HookSpecification"] = None,
model_version: Optional["ModelVersion"] = None,
) -> None:
"""Initializes a pipeline.
Args:
name: The name of the pipeline.
entrypoint: The entrypoint function of the pipeline.
enable_cache: If caching should be enabled for this pipeline.
enable_artifact_metadata: If artifact metadata should be enabled for
this pipeline.
enable_artifact_visualization: If artifact visualization should be
enabled for this pipeline.
enable_step_logs: If step logs should be enabled for this pipeline.
settings: settings for this pipeline.
extra: Extra configurations for this pipeline.
on_failure: Callback function in event of failure of the step. Can
be a function with a single argument of type `BaseException`, or
a source path to such a function (e.g. `module.my_function`).
on_success: Callback function in event of success of the step. Can
be a function with no arguments, or a source path to such a
function (e.g. `module.my_function`).
model_version: configuration of the model version in the Model Control Plane.
"""
self._invocations: Dict[str, StepInvocation] = {}
self._run_args: Dict[str, Any] = {}
self._configuration = PipelineConfiguration(
name=name,
)
self._from_config_file: Dict[str, Any] = {}
with self.__suppress_configure_warnings__():
self.configure(
enable_cache=enable_cache,
enable_artifact_metadata=enable_artifact_metadata,
enable_artifact_visualization=enable_artifact_visualization,
enable_step_logs=enable_step_logs,
settings=settings,
extra=extra,
on_failure=on_failure,
on_success=on_success,
model_version=model_version,
)
self.entrypoint = entrypoint
self._parameters: Dict[str, Any] = {}
self.__suppress_warnings_flag__ = False
self.__new_unnamed_model_versions_in_current_run__: Dict[str, int] = {}
__suppress_configure_warnings__(self)
special
Context manager to suppress warnings in Pipeline.configure(...)
.
Used to suppress warnings when called from inner code and not user-facing code.
Yields:
Type | Description |
---|---|
Iterator[Any] |
Nothing. |
Source code in zenml/new/pipelines/pipeline.py
@contextmanager
def __suppress_configure_warnings__(self) -> Iterator[Any]:
"""Context manager to suppress warnings in `Pipeline.configure(...)`.
Used to suppress warnings when called from inner code and not user-facing code.
Yields:
Nothing.
"""
self.__suppress_warnings_flag__ = True
yield
self.__suppress_warnings_flag__ = False
add_step_invocation(self, step, input_artifacts, external_artifacts, parameters, upstream_steps, custom_id=None, allow_id_suffix=True)
Adds a step invocation to the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step |
BaseStep |
The step for which to add an invocation. |
required |
input_artifacts |
Dict[str, zenml.steps.entrypoint_function_utils.StepArtifact] |
The input artifacts for the invocation. |
required |
external_artifacts |
Dict[str, ExternalArtifact] |
The external artifacts for the invocation. |
required |
parameters |
Dict[str, Any] |
The parameters for the invocation. |
required |
upstream_steps |
Set[str] |
The upstream steps for the invocation. |
required |
custom_id |
Optional[str] |
Custom ID to use for the invocation. |
None |
allow_id_suffix |
bool |
Whether a suffix can be appended to the invocation ID. |
True |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the method is called on an inactive pipeline. |
RuntimeError |
If the invocation was called with an artifact from a different pipeline. |
Returns:
Type | Description |
---|---|
str |
The step invocation ID. |
Source code in zenml/new/pipelines/pipeline.py
def add_step_invocation(
self,
step: "BaseStep",
input_artifacts: Dict[str, StepArtifact],
external_artifacts: Dict[str, "ExternalArtifact"],
parameters: Dict[str, Any],
upstream_steps: Set[str],
custom_id: Optional[str] = None,
allow_id_suffix: bool = True,
) -> str:
"""Adds a step invocation to the pipeline.
Args:
step: The step for which to add an invocation.
input_artifacts: The input artifacts for the invocation.
external_artifacts: The external artifacts for the invocation.
parameters: The parameters for the invocation.
upstream_steps: The upstream steps for the invocation.
custom_id: Custom ID to use for the invocation.
allow_id_suffix: Whether a suffix can be appended to the invocation
ID.
Raises:
RuntimeError: If the method is called on an inactive pipeline.
RuntimeError: If the invocation was called with an artifact from
a different pipeline.
Returns:
The step invocation ID.
"""
if Pipeline.ACTIVE_PIPELINE != self:
raise RuntimeError(
"A step invocation can only be added to an active pipeline."
)
for artifact in input_artifacts.values():
if artifact.pipeline is not self:
raise RuntimeError(
"Got invalid input artifact for invocation of step "
f"{step.name}: The input artifact was produced by a step "
f"inside a different pipeline {artifact.pipeline.name}."
)
invocation_id = self._compute_invocation_id(
step=step, custom_id=custom_id, allow_suffix=allow_id_suffix
)
invocation = StepInvocation(
id=invocation_id,
step=step,
input_artifacts=input_artifacts,
external_artifacts=external_artifacts,
parameters=parameters,
upstream_steps=upstream_steps,
pipeline=self,
)
self._invocations[invocation_id] = invocation
return invocation_id
build(self, settings=None, step_configurations=None, config_path=None)
Builds Docker images for the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
settings |
Optional[Mapping[str, SettingsOrDict]] |
Settings for the pipeline. |
None |
step_configurations |
Optional[Mapping[str, StepConfigurationUpdateOrDict]] |
Configurations for steps of the pipeline. |
None |
config_path |
Optional[str] |
Path to a yaml configuration file. This file will
be parsed as a
|
None |
Returns:
Type | Description |
---|---|
Optional[PipelineBuildResponse] |
The build output. |
Source code in zenml/new/pipelines/pipeline.py
def build(
self,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
step_configurations: Optional[
Mapping[str, "StepConfigurationUpdateOrDict"]
] = None,
config_path: Optional[str] = None,
) -> Optional["PipelineBuildResponse"]:
"""Builds Docker images for the pipeline.
Args:
settings: Settings for the pipeline.
step_configurations: Configurations for steps of the pipeline.
config_path: Path to a yaml configuration file. This file will
be parsed as a
`zenml.config.pipeline_configurations.PipelineRunConfiguration`
object. Options provided in this file will be overwritten by
options provided in code using the other arguments of this
method.
Returns:
The build output.
"""
with track_handler(event=AnalyticsEvent.BUILD_PIPELINE):
self._prepare_if_possible()
deployment, pipeline_spec, _, _ = self._compile(
config_path=config_path,
steps=step_configurations,
settings=settings,
)
pipeline_id = self._register(pipeline_spec=pipeline_spec).id
local_repo = code_repository_utils.find_active_code_repository()
code_repository = build_utils.verify_local_repository_context(
deployment=deployment, local_repo_context=local_repo
)
return build_utils.create_pipeline_build(
deployment=deployment,
pipeline_id=pipeline_id,
code_repository=code_repository,
)
configure(self, enable_cache=None, enable_artifact_metadata=None, enable_artifact_visualization=None, enable_step_logs=None, settings=None, extra=None, on_failure=None, on_success=None, model_version=None, parameters=None, merge=True)
Configures the pipeline.
Configuration merging example:
* merge==True
:
pipeline.configure(extra={"key1": 1})
pipeline.configure(extra={"key2": 2}, merge=True)
pipeline.configuration.extra # {"key1": 1, "key2": 2}
* merge==False
:
pipeline.configure(extra={"key1": 1})
pipeline.configure(extra={"key2": 2}, merge=False)
pipeline.configuration.extra # {"key2": 2}
Parameters:
Name | Type | Description | Default |
---|---|---|---|
enable_cache |
Optional[bool] |
If caching should be enabled for this pipeline. |
None |
enable_artifact_metadata |
Optional[bool] |
If artifact metadata should be enabled for this pipeline. |
None |
enable_artifact_visualization |
Optional[bool] |
If artifact visualization should be enabled for this pipeline. |
None |
enable_step_logs |
Optional[bool] |
If step logs should be enabled for this pipeline. |
None |
settings |
Optional[Mapping[str, SettingsOrDict]] |
settings for this pipeline. |
None |
extra |
Optional[Dict[str, Any]] |
Extra configurations for this pipeline. |
None |
on_failure |
Optional[HookSpecification] |
Callback function in event of failure of the step. Can
be a function with a single argument of type |
None |
on_success |
Optional[HookSpecification] |
Callback function in event of success of the step. Can
be a function with no arguments, or a source path to such a
function (e.g. |
None |
merge |
bool |
If |
True |
model_version |
Optional[ModelVersion] |
configuration of the model version in the Model Control Plane. |
None |
parameters |
Optional[Dict[str, Any]] |
input parameters for the pipeline. |
None |
Returns:
Type | Description |
---|---|
~T |
The pipeline instance that this method was called on. |
Source code in zenml/new/pipelines/pipeline.py
def configure(
self: T,
enable_cache: Optional[bool] = None,
enable_artifact_metadata: Optional[bool] = None,
enable_artifact_visualization: Optional[bool] = None,
enable_step_logs: Optional[bool] = None,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
extra: Optional[Dict[str, Any]] = None,
on_failure: Optional["HookSpecification"] = None,
on_success: Optional["HookSpecification"] = None,
model_version: Optional["ModelVersion"] = None,
parameters: Optional[Dict[str, Any]] = None,
merge: bool = True,
) -> T:
"""Configures the pipeline.
Configuration merging example:
* `merge==True`:
pipeline.configure(extra={"key1": 1})
pipeline.configure(extra={"key2": 2}, merge=True)
pipeline.configuration.extra # {"key1": 1, "key2": 2}
* `merge==False`:
pipeline.configure(extra={"key1": 1})
pipeline.configure(extra={"key2": 2}, merge=False)
pipeline.configuration.extra # {"key2": 2}
Args:
enable_cache: If caching should be enabled for this pipeline.
enable_artifact_metadata: If artifact metadata should be enabled for
this pipeline.
enable_artifact_visualization: If artifact visualization should be
enabled for this pipeline.
enable_step_logs: If step logs should be enabled for this pipeline.
settings: settings for this pipeline.
extra: Extra configurations for this pipeline.
on_failure: Callback function in event of failure of the step. Can
be a function with a single argument of type `BaseException`, or
a source path to such a function (e.g. `module.my_function`).
on_success: Callback function in event of success of the step. Can
be a function with no arguments, or a source path to such a
function (e.g. `module.my_function`).
merge: If `True`, will merge the given dictionary configurations
like `extra` and `settings` with existing
configurations. If `False` the given configurations will
overwrite all existing ones. See the general description of this
method for an example.
model_version: configuration of the model version in the Model Control Plane.
parameters: input parameters for the pipeline.
Returns:
The pipeline instance that this method was called on.
"""
failure_hook_source = None
if on_failure:
# string of on_failure hook function to be used for this pipeline
failure_hook_source = resolve_and_validate_hook(on_failure)
success_hook_source = None
if on_success:
# string of on_success hook function to be used for this pipeline
success_hook_source = resolve_and_validate_hook(on_success)
values = dict_utils.remove_none_values(
{
"enable_cache": enable_cache,
"enable_artifact_metadata": enable_artifact_metadata,
"enable_artifact_visualization": enable_artifact_visualization,
"enable_step_logs": enable_step_logs,
"settings": settings,
"extra": extra,
"failure_hook_source": failure_hook_source,
"success_hook_source": success_hook_source,
"model_version": model_version,
"parameters": parameters,
}
)
if not self.__suppress_warnings_flag__:
to_be_reapplied = []
for param_, value_ in values.items():
if (
param_ in PipelineRunConfiguration.__fields__
and param_ in self._from_config_file
and value_ != self._from_config_file[param_]
):
to_be_reapplied.append(
(param_, self._from_config_file[param_], value_)
)
if to_be_reapplied:
msg = ""
reapply_during_run_warning = (
"The value of parameter '{name}' has changed from "
"'{file_value}' to '{new_value}' set in your configuration file.\n"
)
for name, file_value, new_value in to_be_reapplied:
msg += reapply_during_run_warning.format(
name=name, file_value=file_value, new_value=new_value
)
msg += (
"Configuration file value will be used during pipeline run, "
"so you change will not be efficient. Consider updating your "
"configuration file instead."
)
logger.warning(msg)
config = PipelineConfigurationUpdate(**values)
self._apply_configuration(config, merge=merge)
return self
copy(self)
Copies the pipeline.
Returns:
Type | Description |
---|---|
Pipeline |
The pipeline copy. |
Source code in zenml/new/pipelines/pipeline.py
def copy(self) -> "Pipeline":
"""Copies the pipeline.
Returns:
The pipeline copy.
"""
return copy.deepcopy(self)
from_model(model)
classmethod
Creates a pipeline instance from a model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
PipelineResponse |
The model to load the pipeline instance from. |
required |
Returns:
Type | Description |
---|---|
Pipeline |
The pipeline instance. |
Source code in zenml/new/pipelines/pipeline.py
@classmethod
def from_model(cls, model: "PipelineResponse") -> "Pipeline":
"""Creates a pipeline instance from a model.
Args:
model: The model to load the pipeline instance from.
Returns:
The pipeline instance.
"""
from zenml.new.pipelines.deserialization_utils import load_pipeline
return load_pipeline(model=model)
get_runs(self, **kwargs)
(Deprecated) Get runs of this pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs |
Any |
Further arguments for filtering or pagination that are
passed to |
{} |
Returns:
Type | Description |
---|---|
List[PipelineRunResponse] |
List of runs of this pipeline. |
Source code in zenml/new/pipelines/pipeline.py
def get_runs(self, **kwargs: Any) -> List["PipelineRunResponse"]:
"""(Deprecated) Get runs of this pipeline.
Args:
**kwargs: Further arguments for filtering or pagination that are
passed to `client.list_pipeline_runs()`.
Returns:
List of runs of this pipeline.
"""
logger.warning(
"`Pipeline.get_runs()` is deprecated and will be removed in a "
"future version. Please use `Pipeline.model.get_runs()` instead."
)
return self.model.get_runs(**kwargs)
log_pipeline_deployment_metadata(deployment_model)
staticmethod
Displays logs based on the deployment model upon running a pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment_model |
PipelineDeploymentResponse |
The model for the pipeline deployment |
required |
Source code in zenml/new/pipelines/pipeline.py
@staticmethod
def log_pipeline_deployment_metadata(
deployment_model: PipelineDeploymentResponse,
) -> None:
"""Displays logs based on the deployment model upon running a pipeline.
Args:
deployment_model: The model for the pipeline deployment
"""
try:
# Log about the schedule/run
if deployment_model.schedule:
logger.info(
"Scheduling a run with the schedule: "
f"`{deployment_model.schedule.name}`"
)
else:
logger.info("Executing a new run.")
# Log about the caching status
if deployment_model.pipeline_configuration.enable_cache is False:
logger.info(
f"Caching is disabled by default for "
f"`{deployment_model.pipeline_configuration.name}`."
)
# Log about the used builds
if deployment_model.build:
logger.info("Using a build:")
logger.info(
" Image(s): "
f"{', '.join([i.image for i in deployment_model.build.images.values()])}"
)
# Log about version mismatches between local and build
from zenml import __version__
if deployment_model.build.zenml_version != __version__:
logger.info(
f"ZenML version (different than the local version): "
f"{deployment_model.build.zenml_version}"
)
import platform
if (
deployment_model.build.python_version
!= platform.python_version()
):
logger.info(
f"Python version (different than the local version): "
f"{deployment_model.build.python_version}"
)
# Log about the user, stack and components
if deployment_model.user is not None:
logger.info(f"Using user: `{deployment_model.user.name}`")
if deployment_model.stack is not None:
logger.info(f"Using stack: `{deployment_model.stack.name}`")
for (
component_type,
component_models,
) in deployment_model.stack.components.items():
logger.info(
f" {component_type.value}: `{component_models[0].name}`"
)
except Exception as e:
logger.debug(f"Logging pipeline deployment metadata failed: {e}")
prepare(self, *args, **kwargs)
Prepares the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
Pipeline entrypoint input arguments. |
() |
**kwargs |
Any |
Pipeline entrypoint input keyword arguments. |
{} |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the pipeline has parameters configured differently in configuration file and code. |
Source code in zenml/new/pipelines/pipeline.py
def prepare(self, *args: Any, **kwargs: Any) -> None:
"""Prepares the pipeline.
Args:
*args: Pipeline entrypoint input arguments.
**kwargs: Pipeline entrypoint input keyword arguments.
Raises:
RuntimeError: If the pipeline has parameters configured differently in
configuration file and code.
"""
# Clear existing parameters and invocations
self._parameters = {}
self._invocations = {}
conflicting_parameters = {}
parameters_ = (self.configuration.parameters or {}).copy()
if from_file_ := self._from_config_file.get("parameters", None):
parameters_ = dict_utils.recursive_update(parameters_, from_file_)
if parameters_:
for k, v_runtime in kwargs.items():
if k in parameters_:
v_config = parameters_[k]
if v_config != v_runtime:
conflicting_parameters[k] = (v_config, v_runtime)
if conflicting_parameters:
is_plural = "s" if len(conflicting_parameters) > 1 else ""
msg = f"Configured parameter{is_plural} for the pipeline `{self.name}` conflict{'' if not is_plural else 's'} with parameter{is_plural} passed in runtime:\n"
for key, values in conflicting_parameters.items():
msg += f"`{key}`: config=`{values[0]}` | runtime=`{values[1]}`\n"
msg += """This happens, if you define values for pipeline parameters in configuration file and pass same parameters from the code. Example:
```
# config.yaml
parameters:
param_name: value1
# pipeline.py
@pipeline
def pipeline_(param_name: str):
step_name()
if __name__=="__main__":
pipeline_.with_options(config_file="config.yaml")(param_name="value2")
```
To avoid this consider setting pipeline parameters only in one place (config or code).
"""
raise RuntimeError(msg)
for k, v_config in parameters_.items():
if k not in kwargs:
kwargs[k] = v_config
with self:
# Enter the context manager, so we become the active pipeline. This
# means that all steps that get called while the entrypoint function
# is executed will be added as invocation to this pipeline instance.
self._call_entrypoint(*args, **kwargs)
prepare_model_versions(self, deployment)
Create model versions which are missing and validate existing ones that are used in the pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentBase |
The pipeline deployment configuration. |
required |
Source code in zenml/new/pipelines/pipeline.py
def prepare_model_versions(
self, deployment: "PipelineDeploymentBase"
) -> None:
"""Create model versions which are missing and validate existing ones that are used in the pipeline run.
Args:
deployment: The pipeline deployment configuration.
"""
new_versions_requested: Dict[
Tuple[str, Optional[str]], NewModelVersionRequest
] = defaultdict(NewModelVersionRequest)
other_model_versions: Set["ModelVersion"] = set()
all_steps_have_own_config = True
for step in deployment.step_configurations.values():
step_model_version = step.config.model_version
all_steps_have_own_config = (
all_steps_have_own_config
and step.config.model_version is not None
)
if step_model_version:
self._update_new_requesters(
model_version=step_model_version,
requester_name=step.config.name,
new_versions_requested=new_versions_requested,
other_model_versions=other_model_versions,
)
if not all_steps_have_own_config:
pipeline_model_version = (
deployment.pipeline_configuration.model_version
)
if pipeline_model_version:
self._update_new_requesters(
model_version=pipeline_model_version,
requester_name=self.name,
new_versions_requested=new_versions_requested,
other_model_versions=other_model_versions,
)
elif deployment.pipeline_configuration.model_version is not None:
logger.warning(
f"ModelConfig of pipeline `{self.name}` is overridden in all "
f"steps. "
)
self._validate_new_version_requests(new_versions_requested)
for other_model_version in other_model_versions:
other_model_version._validate_config_in_runtime()
register(self)
Register the pipeline in the server.
Returns:
Type | Description |
---|---|
PipelineResponse |
The registered pipeline model. |
Source code in zenml/new/pipelines/pipeline.py
def register(self) -> "PipelineResponse":
"""Register the pipeline in the server.
Returns:
The registered pipeline model.
"""
# Activating the built-in integrations to load all materializers
from zenml.integrations.registry import integration_registry
self._prepare_if_possible()
integration_registry.activate_integrations()
if self.configuration.dict(exclude_defaults=True, exclude={"name"}):
logger.warning(
f"The pipeline `{self.name}` that you're registering has "
"custom configurations applied to it. These will not be "
"registered with the pipeline and won't be set when you build "
"images or run the pipeline from the CLI. To provide these "
"configurations, use the `--config` option of the `zenml "
"pipeline build/run` commands."
)
pipeline_spec = Compiler().compile_spec(self)
return self._register(pipeline_spec=pipeline_spec)
resolve(self)
Resolves the pipeline.
Returns:
Type | Description |
---|---|
Source |
The pipeline source. |
Source code in zenml/new/pipelines/pipeline.py
def resolve(self) -> "Source":
"""Resolves the pipeline.
Returns:
The pipeline source.
"""
return source_utils.resolve(self.entrypoint, skip_validation=True)
with_options(self, run_name=None, schedule=None, build=None, step_configurations=None, config_path=None, unlisted=False, prevent_build_reuse=False, **kwargs)
Copies the pipeline and applies the given configurations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_name |
Optional[str] |
Name of the pipeline run. |
None |
schedule |
Optional[zenml.config.schedule.Schedule] |
Optional schedule to use for the run. |
None |
build |
Union[str, UUID, PipelineBuildBase] |
Optional build to use for the run. |
None |
step_configurations |
Optional[Mapping[str, StepConfigurationUpdateOrDict]] |
Configurations for steps of the pipeline. |
None |
config_path |
Optional[str] |
Path to a yaml configuration file. This file will
be parsed as a
|
None |
unlisted |
bool |
Whether the pipeline run should be unlisted (not assigned to any pipeline). |
False |
prevent_build_reuse |
bool |
Whether to prevent the reuse of a build. |
False |
**kwargs |
Any |
Pipeline configuration options. These will be passed
to the |
{} |
Returns:
Type | Description |
---|---|
Pipeline |
The copied pipeline instance. |
Source code in zenml/new/pipelines/pipeline.py
def with_options(
self,
run_name: Optional[str] = None,
schedule: Optional[Schedule] = None,
build: Union[str, "UUID", "PipelineBuildBase", None] = None,
step_configurations: Optional[
Mapping[str, "StepConfigurationUpdateOrDict"]
] = None,
config_path: Optional[str] = None,
unlisted: bool = False,
prevent_build_reuse: bool = False,
**kwargs: Any,
) -> "Pipeline":
"""Copies the pipeline and applies the given configurations.
Args:
run_name: Name of the pipeline run.
schedule: Optional schedule to use for the run.
build: Optional build to use for the run.
step_configurations: Configurations for steps of the pipeline.
config_path: Path to a yaml configuration file. This file will
be parsed as a
`zenml.config.pipeline_configurations.PipelineRunConfiguration`
object. Options provided in this file will be overwritten by
options provided in code using the other arguments of this
method.
unlisted: Whether the pipeline run should be unlisted (not assigned
to any pipeline).
prevent_build_reuse: Whether to prevent the reuse of a build.
**kwargs: Pipeline configuration options. These will be passed
to the `pipeline.configure(...)` method.
Returns:
The copied pipeline instance.
"""
pipeline_copy = self.copy()
pipeline_copy._parse_config_file(
config_path=config_path,
matcher=inspect.getfullargspec(self.configure)[0],
)
pipeline_copy._from_config_file = dict_utils.recursive_update(
pipeline_copy._from_config_file, kwargs
)
with pipeline_copy.__suppress_configure_warnings__():
pipeline_copy.configure(**pipeline_copy._from_config_file)
run_args = dict_utils.remove_none_values(
{
"run_name": run_name,
"schedule": schedule,
"build": build,
"step_configurations": step_configurations,
"config_path": config_path,
"unlisted": unlisted,
"prevent_build_reuse": prevent_build_reuse,
}
)
pipeline_copy._run_args.update(run_args)
return pipeline_copy
write_run_configuration_template(self, path, stack=None)
Writes a run configuration yaml template.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path where the template will be written. |
required |
stack |
Optional[Stack] |
The stack for which the template should be generated. If not given, the active stack will be used. |
None |
Source code in zenml/new/pipelines/pipeline.py
def write_run_configuration_template(
self, path: str, stack: Optional["Stack"] = None
) -> None:
"""Writes a run configuration yaml template.
Args:
path: The path where the template will be written.
stack: The stack for which the template should be generated. If
not given, the active stack will be used.
"""
from zenml.config.base_settings import ConfigurationLevel
from zenml.config.step_configurations import (
PartialArtifactConfiguration,
)
self._prepare_if_possible()
stack = stack or Client().active_stack
setting_classes = stack.setting_classes
setting_classes.update(settings_utils.get_general_settings())
pipeline_settings = {}
step_settings = {}
for key, setting_class in setting_classes.items():
fields = pydantic_utils.TemplateGenerator(setting_class).run()
if ConfigurationLevel.PIPELINE in setting_class.LEVEL:
pipeline_settings[key] = fields
if ConfigurationLevel.STEP in setting_class.LEVEL:
step_settings[key] = fields
steps = {}
for step_name, invocation in self.invocations.items():
step = invocation.step
parameters = (
pydantic_utils.TemplateGenerator(
step.entrypoint_definition.legacy_params.annotation
).run()
if step.entrypoint_definition.legacy_params
else {}
)
outputs = {
name: PartialArtifactConfiguration()
for name in step.entrypoint_definition.outputs
}
step_template = StepConfigurationUpdate(
parameters=parameters,
settings=step_settings,
outputs=outputs,
)
steps[step_name] = step_template
run_config = PipelineRunConfiguration(
settings=pipeline_settings, steps=steps
)
template = pydantic_utils.TemplateGenerator(run_config).run()
yaml_string = yaml.dump(template)
yaml_string = yaml_utils.comment_out_yaml(yaml_string)
with open(path, "w") as f:
f.write(yaml_string)
pipeline_context
Pipeline context class.
PipelineContext
Provides pipeline configuration during it's composition.
Usage example:
from zenml import get_pipeline_context
...
@pipeline(
extra={
"complex_parameter": [
("sklearn.tree", "DecisionTreeClassifier"),
("sklearn.ensemble", "RandomForestClassifier"),
]
}
)
def my_pipeline():
context = get_pipeline_context()
after = []
search_steps_prefix = "hp_tuning_search_"
for i, model_search_configuration in enumerate(
context.extra["complex_parameter"]
):
step_name = f"{search_steps_prefix}{i}"
cross_validation(
model_package=model_search_configuration[0],
model_class=model_search_configuration[1],
id=step_name
)
after.append(step_name)
select_best_model(
search_steps_prefix=search_steps_prefix,
after=after,
)
Source code in zenml/new/pipelines/pipeline_context.py
class PipelineContext:
"""Provides pipeline configuration during it's composition.
Usage example:
```python
from zenml import get_pipeline_context
...
@pipeline(
extra={
"complex_parameter": [
("sklearn.tree", "DecisionTreeClassifier"),
("sklearn.ensemble", "RandomForestClassifier"),
]
}
)
def my_pipeline():
context = get_pipeline_context()
after = []
search_steps_prefix = "hp_tuning_search_"
for i, model_search_configuration in enumerate(
context.extra["complex_parameter"]
):
step_name = f"{search_steps_prefix}{i}"
cross_validation(
model_package=model_search_configuration[0],
model_class=model_search_configuration[1],
id=step_name
)
after.append(step_name)
select_best_model(
search_steps_prefix=search_steps_prefix,
after=after,
)
```
"""
def __init__(self, pipeline_configuration: "PipelineConfiguration"):
"""Initialize the context of the currently composing pipeline.
Args:
pipeline_configuration: The configuration of the pipeline derived
from Pipeline class.
"""
self.name = pipeline_configuration.name
self.enable_cache = pipeline_configuration.enable_cache
self.enable_artifact_metadata = (
pipeline_configuration.enable_artifact_metadata
)
self.enable_artifact_visualization = (
pipeline_configuration.enable_artifact_visualization
)
self.enable_step_logs = pipeline_configuration.enable_step_logs
self.settings = pipeline_configuration.settings
self.extra = pipeline_configuration.extra
self.model_version = pipeline_configuration.model_version
__init__(self, pipeline_configuration)
special
Initialize the context of the currently composing pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_configuration |
PipelineConfiguration |
The configuration of the pipeline derived from Pipeline class. |
required |
Source code in zenml/new/pipelines/pipeline_context.py
def __init__(self, pipeline_configuration: "PipelineConfiguration"):
"""Initialize the context of the currently composing pipeline.
Args:
pipeline_configuration: The configuration of the pipeline derived
from Pipeline class.
"""
self.name = pipeline_configuration.name
self.enable_cache = pipeline_configuration.enable_cache
self.enable_artifact_metadata = (
pipeline_configuration.enable_artifact_metadata
)
self.enable_artifact_visualization = (
pipeline_configuration.enable_artifact_visualization
)
self.enable_step_logs = pipeline_configuration.enable_step_logs
self.settings = pipeline_configuration.settings
self.extra = pipeline_configuration.extra
self.model_version = pipeline_configuration.model_version
get_pipeline_context()
Get the context of the currently composing pipeline.
Returns:
Type | Description |
---|---|
PipelineContext |
The context of the currently composing pipeline. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If no active pipeline is found. |
RuntimeError |
If inside a running step. |
Source code in zenml/new/pipelines/pipeline_context.py
def get_pipeline_context() -> "PipelineContext":
"""Get the context of the currently composing pipeline.
Returns:
The context of the currently composing pipeline.
Raises:
RuntimeError: If no active pipeline is found.
RuntimeError: If inside a running step.
"""
from zenml.new.pipelines.pipeline import Pipeline
if Pipeline.ACTIVE_PIPELINE is None:
try:
from zenml.new.steps.step_context import get_step_context
get_step_context()
except RuntimeError:
raise RuntimeError("No active pipeline found.")
else:
raise RuntimeError(
"Inside a step use `from zenml import get_step_context` "
"instead."
)
return PipelineContext(
pipeline_configuration=Pipeline.ACTIVE_PIPELINE.configuration
)
pipeline_decorator
ZenML pipeline decorator definition.
pipeline(_func=None, *, name=None, enable_cache=None, enable_artifact_metadata=None, enable_step_logs=None, settings=None, extra=None, on_failure=None, on_success=None, model_version=None)
Decorator to create a pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
_func |
Optional[F] |
The decorated function. |
None |
name |
Optional[str] |
The name of the pipeline. If left empty, the name of the decorated function will be used as a fallback. |
None |
enable_cache |
Optional[bool] |
Whether to use caching or not. |
None |
enable_artifact_metadata |
Optional[bool] |
Whether to enable artifact metadata or not. |
None |
enable_step_logs |
Optional[bool] |
If step logs should be enabled for this pipeline. |
None |
settings |
Optional[Dict[str, SettingsOrDict]] |
Settings for this pipeline. |
None |
extra |
Optional[Dict[str, Any]] |
Extra configurations for this pipeline. |
None |
on_failure |
Optional[HookSpecification] |
Callback function in event of failure of the step. Can be a
function with a single argument of type |
None |
on_success |
Optional[HookSpecification] |
Callback function in event of success of the step. Can be a
function with no arguments, or a source path to such a function
(e.g. |
None |
model_version |
Optional[ModelVersion] |
configuration of the model version in the Model Control Plane. |
None |
Returns:
Type | Description |
---|---|
Union[Pipeline, Callable[[F], Pipeline]] |
A pipeline instance. |
Source code in zenml/new/pipelines/pipeline_decorator.py
def pipeline(
_func: Optional["F"] = None,
*,
name: Optional[str] = None,
enable_cache: Optional[bool] = None,
enable_artifact_metadata: Optional[bool] = None,
enable_step_logs: Optional[bool] = None,
settings: Optional[Dict[str, "SettingsOrDict"]] = None,
extra: Optional[Dict[str, Any]] = None,
on_failure: Optional["HookSpecification"] = None,
on_success: Optional["HookSpecification"] = None,
model_version: Optional["ModelVersion"] = None,
) -> Union["Pipeline", Callable[["F"], "Pipeline"]]:
"""Decorator to create a pipeline.
Args:
_func: The decorated function.
name: The name of the pipeline. If left empty, the name of the
decorated function will be used as a fallback.
enable_cache: Whether to use caching or not.
enable_artifact_metadata: Whether to enable artifact metadata or not.
enable_step_logs: If step logs should be enabled for this pipeline.
settings: Settings for this pipeline.
extra: Extra configurations for this pipeline.
on_failure: Callback function in event of failure of the step. Can be a
function with a single argument of type `BaseException`, or a source
path to such a function (e.g. `module.my_function`).
on_success: Callback function in event of success of the step. Can be a
function with no arguments, or a source path to such a function
(e.g. `module.my_function`).
model_version: configuration of the model version in the Model Control Plane.
Returns:
A pipeline instance.
"""
def inner_decorator(func: "F") -> "Pipeline":
from zenml.new.pipelines.pipeline import Pipeline
p = Pipeline(
name=name or func.__name__,
enable_cache=enable_cache,
enable_artifact_metadata=enable_artifact_metadata,
enable_step_logs=enable_step_logs,
settings=settings,
extra=extra,
on_failure=on_failure,
on_success=on_success,
model_version=model_version,
entrypoint=func,
)
p.__doc__ = func.__doc__
return p
return inner_decorator if _func is None else inner_decorator(_func)
steps
special
decorated_step
Internal BaseStep subclass used by the step decorator.
step_context
Step context class.
StepContext
Provides additional context inside a step function.
This singleton class is used to access information about the current run, step run, or its outputs inside a step function.
Usage example:
from zenml.steps import get_step_context
@step
def my_trainer_step() -> Any:
context = get_step_context()
# get info about the current pipeline run
current_pipeline_run = context.pipeline_run
# get info about the current step run
current_step_run = context.step_run
# get info about the future output artifacts of this step
output_artifact_uri = context.get_output_artifact_uri()
...
Source code in zenml/new/steps/step_context.py
class StepContext(metaclass=SingletonMetaClass):
"""Provides additional context inside a step function.
This singleton class is used to access information about the current run,
step run, or its outputs inside a step function.
Usage example:
```python
from zenml.steps import get_step_context
@step
def my_trainer_step() -> Any:
context = get_step_context()
# get info about the current pipeline run
current_pipeline_run = context.pipeline_run
# get info about the current step run
current_step_run = context.step_run
# get info about the future output artifacts of this step
output_artifact_uri = context.get_output_artifact_uri()
...
```
"""
def __init__(
self,
pipeline_run: "PipelineRunResponse",
step_run: "StepRunResponse",
output_materializers: Mapping[str, Sequence[Type["BaseMaterializer"]]],
output_artifact_uris: Mapping[str, str],
output_artifact_configs: Mapping[str, Optional["ArtifactConfig"]],
step_run_info: "StepRunInfo",
cache_enabled: bool,
) -> None:
"""Initialize the context of the currently running step.
Args:
pipeline_run: The model of the current pipeline run.
step_run: The model of the current step run.
output_materializers: The output materializers of the step that
this context is used in.
output_artifact_uris: The output artifacts of the step that this
context is used in.
output_artifact_configs: The outputs' ArtifactConfigs of the step that this
context is used in.
step_run_info: (Deprecated) info about the currently running step.
cache_enabled: (Deprecated) Whether caching is enabled for the step.
Raises:
StepContextError: If the keys of the output materializers and
output artifacts do not match.
"""
from zenml.client import Client
self.pipeline_run = pipeline_run
self.step_run = step_run
self._step_run_info = step_run_info
self._cache_enabled = cache_enabled
# Get the stack that we are running in
self._stack = Client().active_stack
self.step_name = self.step_run.name
# set outputs
if output_materializers.keys() != output_artifact_uris.keys():
raise StepContextError(
f"Mismatched keys in output materializers and output artifact "
f"URIs for step '{self.step_name}'. Output materializer "
f"keys: {set(output_materializers)}, output artifact URI "
f"keys: {set(output_artifact_uris)}"
)
self._outputs = {
key: StepContextOutput(
materializer_classes=output_materializers[key],
artifact_uri=output_artifact_uris[key],
artifact_config=output_artifact_configs[key],
)
for key in output_materializers.keys()
}
@property
def pipeline(self) -> "PipelineResponse":
"""Returns the current pipeline.
Returns:
The current pipeline or None.
Raises:
StepContextError: If the pipeline run does not have a pipeline.
"""
if self.pipeline_run.pipeline:
return self.pipeline_run.pipeline
raise StepContextError(
f"Unable to get pipeline in step '{self.step_name}' of pipeline "
f"run '{self.pipeline_run.id}': This pipeline run does not have "
f"a pipeline associated with it."
)
@property
def model_version(self) -> "ModelVersion":
"""Returns configured ModelVersion.
Order of resolution to search for ModelVersion is:
1. ModelVersion from @step
2. ModelVersion from @pipeline
Returns:
The `ModelVersion` object associated with the current step.
Raises:
StepContextError: If the `ModelVersion` object is not set in `@step` or `@pipeline`.
"""
if self.step_run.config.model_version is not None:
model_version = self.step_run.config.model_version
elif self.pipeline_run.config.model_version is not None:
model_version = self.pipeline_run.config.model_version
else:
raise StepContextError(
f"Unable to get ModelVersion in step '{self.step_name}' of pipeline "
f"run '{self.pipeline_run.id}': it was not set in `@step` or `@pipeline`."
)
# warm-up the model version
model_version._get_or_create_model_version()
return model_version
@property
def inputs(self) -> Dict[str, "ArtifactVersionResponse"]:
"""Returns the input artifacts of the current step.
Returns:
The input artifacts of the current step.
"""
return self.step_run.inputs
def _get_output(
self, output_name: Optional[str] = None
) -> "StepContextOutput":
"""Returns the materializer and artifact URI for a given step output.
Args:
output_name: Optional name of the output for which to get the
materializer and URI.
Returns:
Tuple containing the materializer and artifact URI for the
given output.
Raises:
StepContextError: If the step has no outputs, no output for
the given `output_name` or if no `output_name` was given but
the step has multiple outputs.
"""
output_count = len(self._outputs)
if output_count == 0:
raise StepContextError(
f"Unable to get step output for step '{self.step_name}': "
f"This step does not have any outputs."
)
if not output_name and output_count > 1:
raise StepContextError(
f"Unable to get step output for step '{self.step_name}': "
f"This step has multiple outputs ({set(self._outputs)}), "
f"please specify which output to return."
)
if output_name:
if output_name not in self._outputs:
raise StepContextError(
f"Unable to get step output '{output_name}' for "
f"step '{self.step_name}'. This step does not have an "
f"output with the given name, please specify one of the "
f"available outputs: {set(self._outputs)}."
)
return self._outputs[output_name]
else:
return next(iter(self._outputs.values()))
def get_output_materializer(
self,
output_name: Optional[str] = None,
custom_materializer_class: Optional[Type["BaseMaterializer"]] = None,
data_type: Optional[Type[Any]] = None,
) -> "BaseMaterializer":
"""Returns a materializer for a given step output.
Args:
output_name: Optional name of the output for which to get the
materializer. If no name is given and the step only has a
single output, the materializer of this output will be
returned. If the step has multiple outputs, an exception
will be raised.
custom_materializer_class: If given, this `BaseMaterializer`
subclass will be initialized with the output artifact instead
of the materializer that was registered for this step output.
data_type: If the output annotation is of type `Union` and the step
therefore has multiple materializers configured, you can provide
a data type for the output which will be used to select the
correct materializer. If not provided, the first materializer
will be used.
Returns:
A materializer initialized with the output artifact for
the given output.
"""
from zenml.utils import materializer_utils
output = self._get_output(output_name)
materializer_classes = output.materializer_classes
artifact_uri = output.artifact_uri
if custom_materializer_class:
materializer_class = custom_materializer_class
elif len(materializer_classes) == 1 or not data_type:
materializer_class = materializer_classes[0]
else:
materializer_class = materializer_utils.select_materializer(
data_type=data_type, materializer_classes=materializer_classes
)
return materializer_class(artifact_uri)
def get_output_artifact_uri(
self, output_name: Optional[str] = None
) -> str:
"""Returns the artifact URI for a given step output.
Args:
output_name: Optional name of the output for which to get the URI.
If no name is given and the step only has a single output,
the URI of this output will be returned. If the step has
multiple outputs, an exception will be raised.
Returns:
Artifact URI for the given output.
"""
return self._get_output(output_name).artifact_uri
def get_output_metadata(
self, output_name: Optional[str] = None
) -> Dict[str, "MetadataType"]:
"""Returns the metadata for a given step output.
Args:
output_name: Optional name of the output for which to get the
metadata. If no name is given and the step only has a single
output, the metadata of this output will be returned. If the
step has multiple outputs, an exception will be raised.
Returns:
Metadata for the given output.
"""
return self._get_output(output_name).run_metadata or {}
def add_output_metadata(
self,
metadata: Dict[str, "MetadataType"],
output_name: Optional[str] = None,
) -> None:
"""Adds metadata for a given step output.
Args:
metadata: The metadata to add.
output_name: Optional name of the output for which to add the
metadata. If no name is given and the step only has a single
output, the metadata of this output will be added. If the
step has multiple outputs, an exception will be raised.
"""
output = self._get_output(output_name)
if not output.run_metadata:
output.run_metadata = {}
output.run_metadata.update(**metadata)
def _set_artifact_config(
self,
artifact_config: "ArtifactConfig",
output_name: Optional[str] = None,
) -> None:
"""Adds artifact config for a given step output.
Args:
artifact_config: The artifact config of the output to set.
output_name: Optional name of the output for which to set the
output signature. If no name is given and the step only has a single
output, the metadata of this output will be added. If the
step has multiple outputs, an exception will be raised.
Raises:
EntityExistsError: If the output already has an output signature.
"""
output = self._get_output(output_name)
if output.artifact_config is None:
output.artifact_config = artifact_config
else:
raise EntityExistsError(
f"Output with name '{output_name}' already has artifact config."
)
inputs: Dict[str, ArtifactVersionResponse]
property
readonly
Returns the input artifacts of the current step.
Returns:
Type | Description |
---|---|
Dict[str, ArtifactVersionResponse] |
The input artifacts of the current step. |
model_version: ModelVersion
property
readonly
Returns configured ModelVersion.
Order of resolution to search for ModelVersion is: 1. ModelVersion from @step 2. ModelVersion from @pipeline
Returns:
Type | Description |
---|---|
ModelVersion |
The |
Exceptions:
Type | Description |
---|---|
StepContextError |
If the |
pipeline: PipelineResponse
property
readonly
Returns the current pipeline.
Returns:
Type | Description |
---|---|
PipelineResponse |
The current pipeline or None. |
Exceptions:
Type | Description |
---|---|
StepContextError |
If the pipeline run does not have a pipeline. |
__init__(self, pipeline_run, step_run, output_materializers, output_artifact_uris, output_artifact_configs, step_run_info, cache_enabled)
special
Initialize the context of the currently running step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_run |
PipelineRunResponse |
The model of the current pipeline run. |
required |
step_run |
StepRunResponse |
The model of the current step run. |
required |
output_materializers |
Mapping[str, Sequence[Type[BaseMaterializer]]] |
The output materializers of the step that this context is used in. |
required |
output_artifact_uris |
Mapping[str, str] |
The output artifacts of the step that this context is used in. |
required |
output_artifact_configs |
Mapping[str, Optional[ArtifactConfig]] |
The outputs' ArtifactConfigs of the step that this context is used in. |
required |
step_run_info |
StepRunInfo |
(Deprecated) info about the currently running step. |
required |
cache_enabled |
bool |
(Deprecated) Whether caching is enabled for the step. |
required |
Exceptions:
Type | Description |
---|---|
StepContextError |
If the keys of the output materializers and output artifacts do not match. |
Source code in zenml/new/steps/step_context.py
def __init__(
self,
pipeline_run: "PipelineRunResponse",
step_run: "StepRunResponse",
output_materializers: Mapping[str, Sequence[Type["BaseMaterializer"]]],
output_artifact_uris: Mapping[str, str],
output_artifact_configs: Mapping[str, Optional["ArtifactConfig"]],
step_run_info: "StepRunInfo",
cache_enabled: bool,
) -> None:
"""Initialize the context of the currently running step.
Args:
pipeline_run: The model of the current pipeline run.
step_run: The model of the current step run.
output_materializers: The output materializers of the step that
this context is used in.
output_artifact_uris: The output artifacts of the step that this
context is used in.
output_artifact_configs: The outputs' ArtifactConfigs of the step that this
context is used in.
step_run_info: (Deprecated) info about the currently running step.
cache_enabled: (Deprecated) Whether caching is enabled for the step.
Raises:
StepContextError: If the keys of the output materializers and
output artifacts do not match.
"""
from zenml.client import Client
self.pipeline_run = pipeline_run
self.step_run = step_run
self._step_run_info = step_run_info
self._cache_enabled = cache_enabled
# Get the stack that we are running in
self._stack = Client().active_stack
self.step_name = self.step_run.name
# set outputs
if output_materializers.keys() != output_artifact_uris.keys():
raise StepContextError(
f"Mismatched keys in output materializers and output artifact "
f"URIs for step '{self.step_name}'. Output materializer "
f"keys: {set(output_materializers)}, output artifact URI "
f"keys: {set(output_artifact_uris)}"
)
self._outputs = {
key: StepContextOutput(
materializer_classes=output_materializers[key],
artifact_uri=output_artifact_uris[key],
artifact_config=output_artifact_configs[key],
)
for key in output_materializers.keys()
}
add_output_metadata(self, metadata, output_name=None)
Adds metadata for a given step output.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
metadata |
Dict[str, MetadataType] |
The metadata to add. |
required |
output_name |
Optional[str] |
Optional name of the output for which to add the metadata. If no name is given and the step only has a single output, the metadata of this output will be added. If the step has multiple outputs, an exception will be raised. |
None |
Source code in zenml/new/steps/step_context.py
def add_output_metadata(
self,
metadata: Dict[str, "MetadataType"],
output_name: Optional[str] = None,
) -> None:
"""Adds metadata for a given step output.
Args:
metadata: The metadata to add.
output_name: Optional name of the output for which to add the
metadata. If no name is given and the step only has a single
output, the metadata of this output will be added. If the
step has multiple outputs, an exception will be raised.
"""
output = self._get_output(output_name)
if not output.run_metadata:
output.run_metadata = {}
output.run_metadata.update(**metadata)
get_output_artifact_uri(self, output_name=None)
Returns the artifact URI for a given step output.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_name |
Optional[str] |
Optional name of the output for which to get the URI. If no name is given and the step only has a single output, the URI of this output will be returned. If the step has multiple outputs, an exception will be raised. |
None |
Returns:
Type | Description |
---|---|
str |
Artifact URI for the given output. |
Source code in zenml/new/steps/step_context.py
def get_output_artifact_uri(
self, output_name: Optional[str] = None
) -> str:
"""Returns the artifact URI for a given step output.
Args:
output_name: Optional name of the output for which to get the URI.
If no name is given and the step only has a single output,
the URI of this output will be returned. If the step has
multiple outputs, an exception will be raised.
Returns:
Artifact URI for the given output.
"""
return self._get_output(output_name).artifact_uri
get_output_materializer(self, output_name=None, custom_materializer_class=None, data_type=None)
Returns a materializer for a given step output.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_name |
Optional[str] |
Optional name of the output for which to get the materializer. If no name is given and the step only has a single output, the materializer of this output will be returned. If the step has multiple outputs, an exception will be raised. |
None |
custom_materializer_class |
Optional[Type[BaseMaterializer]] |
If given, this |
None |
data_type |
Optional[Type[Any]] |
If the output annotation is of type |
None |
Returns:
Type | Description |
---|---|
BaseMaterializer |
A materializer initialized with the output artifact for the given output. |
Source code in zenml/new/steps/step_context.py
def get_output_materializer(
self,
output_name: Optional[str] = None,
custom_materializer_class: Optional[Type["BaseMaterializer"]] = None,
data_type: Optional[Type[Any]] = None,
) -> "BaseMaterializer":
"""Returns a materializer for a given step output.
Args:
output_name: Optional name of the output for which to get the
materializer. If no name is given and the step only has a
single output, the materializer of this output will be
returned. If the step has multiple outputs, an exception
will be raised.
custom_materializer_class: If given, this `BaseMaterializer`
subclass will be initialized with the output artifact instead
of the materializer that was registered for this step output.
data_type: If the output annotation is of type `Union` and the step
therefore has multiple materializers configured, you can provide
a data type for the output which will be used to select the
correct materializer. If not provided, the first materializer
will be used.
Returns:
A materializer initialized with the output artifact for
the given output.
"""
from zenml.utils import materializer_utils
output = self._get_output(output_name)
materializer_classes = output.materializer_classes
artifact_uri = output.artifact_uri
if custom_materializer_class:
materializer_class = custom_materializer_class
elif len(materializer_classes) == 1 or not data_type:
materializer_class = materializer_classes[0]
else:
materializer_class = materializer_utils.select_materializer(
data_type=data_type, materializer_classes=materializer_classes
)
return materializer_class(artifact_uri)
get_output_metadata(self, output_name=None)
Returns the metadata for a given step output.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_name |
Optional[str] |
Optional name of the output for which to get the metadata. If no name is given and the step only has a single output, the metadata of this output will be returned. If the step has multiple outputs, an exception will be raised. |
None |
Returns:
Type | Description |
---|---|
Dict[str, MetadataType] |
Metadata for the given output. |
Source code in zenml/new/steps/step_context.py
def get_output_metadata(
self, output_name: Optional[str] = None
) -> Dict[str, "MetadataType"]:
"""Returns the metadata for a given step output.
Args:
output_name: Optional name of the output for which to get the
metadata. If no name is given and the step only has a single
output, the metadata of this output will be returned. If the
step has multiple outputs, an exception will be raised.
Returns:
Metadata for the given output.
"""
return self._get_output(output_name).run_metadata or {}
StepContextOutput
Represents a step output in the step context.
Source code in zenml/new/steps/step_context.py
class StepContextOutput:
"""Represents a step output in the step context."""
materializer_classes: Sequence[Type["BaseMaterializer"]]
artifact_uri: str
run_metadata: Optional[Dict[str, "MetadataType"]] = None
artifact_config: Optional["ArtifactConfig"]
def __init__(
self,
materializer_classes: Sequence[Type["BaseMaterializer"]],
artifact_uri: str,
artifact_config: Optional["ArtifactConfig"],
):
"""Initialize the step output.
Args:
materializer_classes: The materializer classes for the output.
artifact_uri: The artifact URI for the output.
artifact_config: The ArtifactConfig object of the output.
"""
self.materializer_classes = materializer_classes
self.artifact_uri = artifact_uri
self.artifact_config = artifact_config
__init__(self, materializer_classes, artifact_uri, artifact_config)
special
Initialize the step output.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
materializer_classes |
Sequence[Type[BaseMaterializer]] |
The materializer classes for the output. |
required |
artifact_uri |
str |
The artifact URI for the output. |
required |
artifact_config |
Optional[ArtifactConfig] |
The ArtifactConfig object of the output. |
required |
Source code in zenml/new/steps/step_context.py
def __init__(
self,
materializer_classes: Sequence[Type["BaseMaterializer"]],
artifact_uri: str,
artifact_config: Optional["ArtifactConfig"],
):
"""Initialize the step output.
Args:
materializer_classes: The materializer classes for the output.
artifact_uri: The artifact URI for the output.
artifact_config: The ArtifactConfig object of the output.
"""
self.materializer_classes = materializer_classes
self.artifact_uri = artifact_uri
self.artifact_config = artifact_config
get_step_context()
Get the context of the currently running step.
Returns:
Type | Description |
---|---|
StepContext |
The context of the currently running step. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If no step is currently running. |
Source code in zenml/new/steps/step_context.py
def get_step_context() -> "StepContext":
"""Get the context of the currently running step.
Returns:
The context of the currently running step.
Raises:
RuntimeError: If no step is currently running.
"""
if StepContext._exists():
return StepContext() # type: ignore
raise RuntimeError(
"The step context is only available inside a step function."
)
step_decorator
Step decorator function.
step(_func=None, *, name=None, enable_cache=None, enable_artifact_metadata=None, enable_artifact_visualization=None, enable_step_logs=None, experiment_tracker=None, step_operator=None, output_materializers=None, settings=None, extra=None, on_failure=None, on_success=None, model_version=None)
Decorator to create a ZenML step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
_func |
Optional[F] |
The decorated function. |
None |
name |
Optional[str] |
The name of the step. If left empty, the name of the decorated function will be used as a fallback. |
None |
enable_cache |
Optional[bool] |
Specify whether caching is enabled for this step. If no value is passed, caching is enabled by default. |
None |
enable_artifact_metadata |
Optional[bool] |
Specify whether metadata is enabled for this step. If no value is passed, metadata is enabled by default. |
None |
enable_artifact_visualization |
Optional[bool] |
Specify whether visualization is enabled for this step. If no value is passed, visualization is enabled by default. |
None |
enable_step_logs |
Optional[bool] |
Specify whether step logs are enabled for this step. |
None |
experiment_tracker |
Optional[str] |
The experiment tracker to use for this step. |
None |
step_operator |
Optional[str] |
The step operator to use for this step. |
None |
output_materializers |
Optional[OutputMaterializersSpecification] |
Output materializers for this step. If given as a dict, the keys must be a subset of the output names of this step. If a single value (type or string) is given, the materializer will be used for all outputs. |
None |
settings |
Optional[Dict[str, SettingsOrDict]] |
Settings for this step. |
None |
extra |
Optional[Dict[str, Any]] |
Extra configurations for this step. |
None |
on_failure |
Optional[HookSpecification] |
Callback function in event of failure of the step. Can be a
function with a single argument of type |
None |
on_success |
Optional[HookSpecification] |
Callback function in event of success of the step. Can be a
function with no arguments, or a source path to such a function
(e.g. |
None |
model_version |
Optional[ModelVersion] |
configuration of the model version in the Model Control Plane. |
None |
Returns:
Type | Description |
---|---|
Union[BaseStep, Callable[[F], BaseStep]] |
The step instance. |
Source code in zenml/new/steps/step_decorator.py
def step(
_func: Optional["F"] = None,
*,
name: Optional[str] = None,
enable_cache: Optional[bool] = None,
enable_artifact_metadata: Optional[bool] = None,
enable_artifact_visualization: Optional[bool] = None,
enable_step_logs: Optional[bool] = None,
experiment_tracker: Optional[str] = None,
step_operator: Optional[str] = None,
output_materializers: Optional["OutputMaterializersSpecification"] = None,
settings: Optional[Dict[str, "SettingsOrDict"]] = None,
extra: Optional[Dict[str, Any]] = None,
on_failure: Optional["HookSpecification"] = None,
on_success: Optional["HookSpecification"] = None,
model_version: Optional["ModelVersion"] = None,
) -> Union["BaseStep", Callable[["F"], "BaseStep"]]:
"""Decorator to create a ZenML step.
Args:
_func: The decorated function.
name: The name of the step. If left empty, the name of the decorated
function will be used as a fallback.
enable_cache: Specify whether caching is enabled for this step. If no
value is passed, caching is enabled by default.
enable_artifact_metadata: Specify whether metadata is enabled for this
step. If no value is passed, metadata is enabled by default.
enable_artifact_visualization: Specify whether visualization is enabled
for this step. If no value is passed, visualization is enabled by
default.
enable_step_logs: Specify whether step logs are enabled for this step.
experiment_tracker: The experiment tracker to use for this step.
step_operator: The step operator to use for this step.
output_materializers: Output materializers for this step. If
given as a dict, the keys must be a subset of the output names
of this step. If a single value (type or string) is given, the
materializer will be used for all outputs.
settings: Settings for this step.
extra: Extra configurations for this step.
on_failure: Callback function in event of failure of the step. Can be a
function with a single argument of type `BaseException`, or a source
path to such a function (e.g. `module.my_function`).
on_success: Callback function in event of success of the step. Can be a
function with no arguments, or a source path to such a function
(e.g. `module.my_function`).
model_version: configuration of the model version in the Model Control Plane.
Returns:
The step instance.
"""
def inner_decorator(func: "F") -> "BaseStep":
from zenml.new.steps.decorated_step import _DecoratedStep
class_: Type["BaseStep"] = type(
func.__name__,
(_DecoratedStep,),
{
"entrypoint": staticmethod(func),
"__module__": func.__module__,
"__doc__": func.__doc__,
},
)
step_instance = class_(
name=name or func.__name__,
enable_cache=enable_cache,
enable_artifact_metadata=enable_artifact_metadata,
enable_artifact_visualization=enable_artifact_visualization,
enable_step_logs=enable_step_logs,
experiment_tracker=experiment_tracker,
step_operator=step_operator,
output_materializers=output_materializers,
settings=settings,
extra=extra,
on_failure=on_failure,
on_success=on_success,
model_version=model_version,
)
return step_instance
if _func is None:
return inner_decorator
else:
return inner_decorator(_func)