New
zenml.new
special
pipelines
special
build_utils
Pipeline build utilities.
compute_build_checksum(items, stack, code_repository=None)
Compute an overall checksum for a pipeline build.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
items |
List[BuildConfiguration] |
Items of the build. |
required |
stack |
Stack |
The stack associated with the build. Will be used to gather its requirements. |
required |
code_repository |
Optional[BaseCodeRepository] |
The code repository that will be used to download files inside the build. Will be used for its dependency specification. |
None |
Returns:
Type | Description |
---|---|
str |
The build checksum. |
Source code in zenml/new/pipelines/build_utils.py
def compute_build_checksum(
items: List["BuildConfiguration"],
stack: "Stack",
code_repository: Optional["BaseCodeRepository"] = None,
) -> str:
"""Compute an overall checksum for a pipeline build.
Args:
items: Items of the build.
stack: The stack associated with the build. Will be used to gather
its requirements.
code_repository: The code repository that will be used to download
files inside the build. Will be used for its dependency
specification.
Returns:
The build checksum.
"""
hash_ = hashlib.md5() # nosec
for item in items:
key = PipelineBuildBaseModel.get_image_key(
component_key=item.key, step=item.step_name
)
settings_checksum = item.compute_settings_checksum(
stack=stack,
code_repository=code_repository,
)
hash_.update(key.encode())
hash_.update(settings_checksum.encode())
return hash_.hexdigest()
create_pipeline_build(deployment, pipeline_id=None, code_repository=None)
Builds images and registers the output in the server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentBaseModel |
The pipeline deployment. |
required |
pipeline_id |
Optional[uuid.UUID] |
The ID of the pipeline. |
None |
code_repository |
Optional[BaseCodeRepository] |
If provided, this code repository will be used to download inside the build images. |
None |
Returns:
Type | Description |
---|---|
Optional[PipelineBuildResponseModel] |
The build output. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If multiple builds with the same key but different settings were specified. |
Source code in zenml/new/pipelines/build_utils.py
def create_pipeline_build(
deployment: "PipelineDeploymentBaseModel",
pipeline_id: Optional[UUID] = None,
code_repository: Optional["BaseCodeRepository"] = None,
) -> Optional["PipelineBuildResponseModel"]:
"""Builds images and registers the output in the server.
Args:
deployment: The pipeline deployment.
pipeline_id: The ID of the pipeline.
code_repository: If provided, this code repository will be used to
download inside the build images.
Returns:
The build output.
Raises:
RuntimeError: If multiple builds with the same key but different
settings were specified.
"""
client = Client()
stack = client.active_stack
required_builds = stack.get_docker_builds(deployment=deployment)
if not required_builds:
logger.debug("No docker builds required.")
return None
logger.info(
"Building Docker image(s) for pipeline `%s`.",
deployment.pipeline_configuration.name,
)
docker_image_builder = PipelineDockerImageBuilder()
images: Dict[str, BuildItem] = {}
checksums: Dict[str, str] = {}
for build_config in required_builds:
combined_key = PipelineBuildBaseModel.get_image_key(
component_key=build_config.key, step=build_config.step_name
)
checksum = build_config.compute_settings_checksum(
stack=stack, code_repository=code_repository
)
if combined_key in images:
previous_checksum = images[combined_key].settings_checksum
if previous_checksum != checksum:
raise RuntimeError(
f"Trying to build image for key `{combined_key}` but "
"an image for this key was already built with a "
"different configuration. This happens if multiple "
"stack components specified Docker builds for the same "
"key in the `StackComponent.get_docker_builds(...)` "
"method. If you're using custom components, make sure "
"to provide unique keys when returning your build "
"configurations to avoid this error."
)
else:
continue
if checksum in checksums:
item_key = checksums[checksum]
image_name_or_digest = images[item_key].image
contains_code = images[item_key].contains_code
dockerfile = images[item_key].dockerfile
requirements = images[item_key].requirements
else:
tag = deployment.pipeline_configuration.name
if build_config.step_name:
tag += f"-{build_config.step_name}"
tag += f"-{build_config.key}"
include_files = build_config.should_include_files(
code_repository=code_repository,
)
download_files = build_config.should_download_files(
code_repository=code_repository,
)
(
image_name_or_digest,
dockerfile,
requirements,
) = docker_image_builder.build_docker_image(
docker_settings=build_config.settings,
tag=tag,
stack=stack,
include_files=include_files,
download_files=download_files,
entrypoint=build_config.entrypoint,
extra_files=build_config.extra_files,
code_repository=code_repository,
)
contains_code = include_files
images[combined_key] = BuildItem(
image=image_name_or_digest,
dockerfile=dockerfile,
requirements=requirements,
settings_checksum=checksum,
contains_code=contains_code,
requires_code_download=download_files,
)
checksums[checksum] = combined_key
logger.info("Finished building Docker image(s).")
is_local = stack.container_registry is None
contains_code = any(item.contains_code for item in images.values())
build_checksum = compute_build_checksum(
required_builds, stack=stack, code_repository=code_repository
)
build_request = PipelineBuildRequestModel(
user=client.active_user.id,
workspace=client.active_workspace.id,
stack=client.active_stack_model.id,
pipeline=pipeline_id,
is_local=is_local,
contains_code=contains_code,
images=images,
zenml_version=zenml.__version__,
python_version=platform.python_version(),
checksum=build_checksum,
)
return client.zen_store.create_build(build_request)
find_existing_build(deployment, code_repository)
Find an existing build for a deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentBaseModel |
The deployment for which to find an existing build. |
required |
code_repository |
BaseCodeRepository |
The code repository that will be used to download files in the images. |
required |
Returns:
Type | Description |
---|---|
Optional[PipelineBuildResponseModel] |
The existing build to reuse if found. |
Source code in zenml/new/pipelines/build_utils.py
def find_existing_build(
deployment: "PipelineDeploymentBaseModel",
code_repository: "BaseCodeRepository",
) -> Optional["PipelineBuildResponseModel"]:
"""Find an existing build for a deployment.
Args:
deployment: The deployment for which to find an existing build.
code_repository: The code repository that will be used to download
files in the images.
Returns:
The existing build to reuse if found.
"""
client = Client()
stack = client.active_stack
python_version_prefix = ".".join(platform.python_version_tuple()[:2])
required_builds = stack.get_docker_builds(deployment=deployment)
if not required_builds:
return None
build_checksum = compute_build_checksum(
required_builds, stack=stack, code_repository=code_repository
)
matches = client.list_builds(
sort_by="desc:created",
size=1,
stack_id=stack.id,
# The build is local and it's not clear whether the images
# exist on the current machine or if they've been overwritten.
# TODO: Should we support this by storing the unique Docker ID for
# the image and checking if an image with that ID exists locally?
is_local=False,
# The build contains some code which might be different than the
# local code the user is expecting to run
contains_code=False,
zenml_version=zenml.__version__,
# Match all patch versions of the same Python major + minor
python_version=f"startswith:{python_version_prefix}",
checksum=build_checksum,
)
if not matches.items:
return None
return matches[0]
reuse_or_create_pipeline_build(deployment, allow_build_reuse, pipeline_id=None, build=None, code_repository=None)
Loads or creates a pipeline build.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentBaseModel |
The pipeline deployment for which to load or create the build. |
required |
allow_build_reuse |
bool |
If True, the build is allowed to reuse an existing build. |
required |
pipeline_id |
Optional[uuid.UUID] |
Optional ID of the pipeline to reference in the build. |
None |
build |
Union[UUID, PipelineBuildBaseModel] |
Optional existing build. If given, the build will be fetched (or registered) in the database. If not given, a new build will be created. |
None |
code_repository |
Optional[BaseCodeRepository] |
If provided, this code repository will be used to download inside the build images. |
None |
Returns:
Type | Description |
---|---|
Optional[PipelineBuildResponseModel] |
The build response. |
Source code in zenml/new/pipelines/build_utils.py
def reuse_or_create_pipeline_build(
deployment: "PipelineDeploymentBaseModel",
allow_build_reuse: bool,
pipeline_id: Optional[UUID] = None,
build: Union["UUID", "PipelineBuildBaseModel", None] = None,
code_repository: Optional["BaseCodeRepository"] = None,
) -> Optional["PipelineBuildResponseModel"]:
"""Loads or creates a pipeline build.
Args:
deployment: The pipeline deployment for which to load or create the
build.
allow_build_reuse: If True, the build is allowed to reuse an
existing build.
pipeline_id: Optional ID of the pipeline to reference in the build.
build: Optional existing build. If given, the build will be fetched
(or registered) in the database. If not given, a new build will
be created.
code_repository: If provided, this code repository will be used to
download inside the build images.
Returns:
The build response.
"""
if not build:
if (
allow_build_reuse
and code_repository
and not deployment.requires_included_files
):
existing_build = find_existing_build(
deployment=deployment, code_repository=code_repository
)
if existing_build:
logger.info(
"Reusing existing build `%s` for stack `%s`.",
existing_build.id,
Client().active_stack.name,
)
return existing_build
return create_pipeline_build(
deployment=deployment,
pipeline_id=pipeline_id,
code_repository=code_repository,
)
build_model = None
if isinstance(build, UUID):
build_model = Client().zen_store.get_build(build_id=build)
else:
build_request = PipelineBuildRequestModel(
user=Client().active_user.id,
workspace=Client().active_workspace.id,
stack=Client().active_stack_model.id,
pipeline=pipeline_id,
**build.dict(),
)
build_model = Client().zen_store.create_build(build=build_request)
verify_custom_build(
build=build_model,
deployment=deployment,
code_repository=code_repository,
)
return build_model
verify_custom_build(build, deployment, code_repository=None)
Verify a custom build for a pipeline deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
build |
PipelineBuildResponseModel |
The build to verify. |
required |
deployment |
PipelineDeploymentBaseModel |
The deployment for which to verify the build. |
required |
code_repository |
Optional[BaseCodeRepository] |
Code repository that will be used to download files for the deployment. |
None |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the build can't be used for the deployment. |
Source code in zenml/new/pipelines/build_utils.py
def verify_custom_build(
build: "PipelineBuildResponseModel",
deployment: "PipelineDeploymentBaseModel",
code_repository: Optional["BaseCodeRepository"] = None,
) -> None:
"""Verify a custom build for a pipeline deployment.
Args:
build: The build to verify.
deployment: The deployment for which to verify the build.
code_repository: Code repository that will be used to download files
for the deployment.
Raises:
RuntimeError: If the build can't be used for the deployment.
"""
stack = Client().active_stack
required_builds = stack.get_docker_builds(deployment=deployment)
if build.stack and build.stack.id != stack.id:
logger.warning(
"The stack `%s` used for the build `%s` is not the same as the "
"stack `%s` that the pipeline will run on. This could lead "
"to issues if the stacks have different build requirements.",
build.stack.name,
build.id,
stack.name,
)
if build.contains_code:
logger.warning(
"The build you specified for this run contains code and will run "
"with the step code that was included in the Docker images which "
"might differ from the local code in your client environment."
)
if build.requires_code_download and not code_repository:
raise RuntimeError(
"The build you specified does not include code but code download "
"not possible. This might be because you don't have a code "
"repository registered or the code repository contains local "
"changes."
)
if build.checksum:
build_checksum = compute_build_checksum(
required_builds, stack=stack, code_repository=code_repository
)
if build_checksum != build.checksum:
logger.warning(
"The Docker settings used for the build `%s` are "
"not the same as currently specified for your pipeline. "
"This means that the build you specified to run this "
"pipeline might be outdated and most likely contains "
"outdated requirements.",
build.id,
)
else:
# No checksum given for the entire build, we manually check that
# all the images exist and the setting match
for build_config in required_builds:
try:
image = build.get_image(
component_key=build_config.key,
step=build_config.step_name,
)
except KeyError:
raise RuntimeError(
"The build you specified is missing an image for key: "
f"{build_config.key}."
)
if build_config.compute_settings_checksum(
stack=stack, code_repository=code_repository
) != build.get_settings_checksum(
component_key=build_config.key, step=build_config.step_name
):
logger.warning(
"The Docker settings used to build the image `%s` are "
"not the same as currently specified for your pipeline. "
"This means that the build you specified to run this "
"pipeline might be outdated and most likely contains "
"outdated code or requirements.",
image,
)
if build.is_local:
logger.warning(
"You manually specified a local build to run your pipeline. "
"This might lead to errors if the images don't exist on "
"your local machine or the image tags have been "
"overwritten since the original build happened."
)
verify_local_repository_context(deployment, local_repo_context)
Verifies the local repository.
If the local repository exists and has no local changes, code download inside the images is possible.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentBaseModel |
The pipeline deployment. |
required |
local_repo_context |
Optional[LocalRepositoryContext] |
The local repository active at the source root. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the deployment requires code download but code download is not possible. |
Returns:
Type | Description |
---|---|
Optional[zenml.code_repositories.base_code_repository.BaseCodeRepository] |
The code repository from which to download files for the runs of the deployment, or None if code download is not possible. |
Source code in zenml/new/pipelines/build_utils.py
def verify_local_repository_context(
deployment: "PipelineDeploymentBaseModel",
local_repo_context: Optional["LocalRepositoryContext"],
) -> Optional[BaseCodeRepository]:
"""Verifies the local repository.
If the local repository exists and has no local changes, code download
inside the images is possible.
Args:
deployment: The pipeline deployment.
local_repo_context: The local repository active at the source root.
Raises:
RuntimeError: If the deployment requires code download but code download
is not possible.
Returns:
The code repository from which to download files for the runs of the
deployment, or None if code download is not possible.
"""
if deployment.requires_code_download:
if not local_repo_context:
raise RuntimeError(
"The `DockerSettings` of the pipeline or one of its "
"steps specify that code should be included in the "
"Docker image (`source_files='download'`), but there is no "
"code repository active at your current source root "
f"`{source_utils.get_source_root()}`."
)
elif local_repo_context.is_dirty:
raise RuntimeError(
"The `DockerSettings` of the pipeline or one of its "
"steps specify that code should be included in the "
"Docker image (`source_files='download'`), but the code "
"repository active at your current source root "
f"`{source_utils.get_source_root()}` has uncommitted "
"changes."
)
elif local_repo_context.has_local_changes:
raise RuntimeError(
"The `DockerSettings` of the pipeline or one of its "
"steps specify that code should be included in the "
"Docker image (`source_files='download'`), but the code "
"repository active at your current source root "
f"`{source_utils.get_source_root()}` has unpushed "
"changes."
)
code_repository = None
if local_repo_context and not local_repo_context.has_local_changes:
model = Client().get_code_repository(
local_repo_context.code_repository_id
)
code_repository = BaseCodeRepository.from_model(model)
return code_repository
deserialization_utils
Pipeline deserialization utilities.
load_pipeline(model)
Load a pipeline from a model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
PipelineResponseModel |
The pipeline model to load. |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
If the pipeline can't be loaded due to an old model spec (version <0.2). |
Returns:
Type | Description |
---|---|
Pipeline |
The loaded pipeline. |
Source code in zenml/new/pipelines/deserialization_utils.py
def load_pipeline(model: "PipelineResponseModel") -> "Pipeline":
"""Load a pipeline from a model.
Args:
model: The pipeline model to load.
Raises:
ValueError: If the pipeline can't be loaded due to an old model spec
(version <0.2).
Returns:
The loaded pipeline.
"""
model_version = version.parse(model.spec.version)
if model_version < version.parse("0.2"):
raise ValueError(
"Loading a pipeline is only possible for pipeline specs with "
"version 0.2 or higher."
)
elif model_version == version.parse("0.2"):
pipeline_instance = load_pipeline_v_0_2(model=model)
elif model_version == version.parse("0.3"):
pipeline_instance = load_pipeline_v_0_3(model=model)
else:
pipeline_instance = load_pipeline_v_0_4(model=model)
version_hash = pipeline_instance._compute_unique_identifier(
pipeline_spec=model.spec
)
if version_hash != model.version_hash:
logger.warning(
"Trying to load pipeline version `%s`, but the local step code "
"changed since this pipeline version was registered. Using "
"this pipeline instance will result in a different pipeline "
"version being registered or reused.",
model.version,
)
return pipeline_instance
load_pipeline_v_0_2(model)
Load a pipeline from a model with spec version 0.2.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
PipelineResponseModel |
The pipeline model to load. |
required |
Returns:
Type | Description |
---|---|
Pipeline |
The loaded pipeline. |
Source code in zenml/new/pipelines/deserialization_utils.py
def load_pipeline_v_0_2(model: "PipelineResponseModel") -> "Pipeline":
"""Load a pipeline from a model with spec version 0.2.
Args:
model: The pipeline model to load.
Returns:
The loaded pipeline.
"""
return _load_legacy_pipeline(
model=model, use_pipeline_parameter_name=False
)
load_pipeline_v_0_3(model)
Load a pipeline from a model with spec version 0.3.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
PipelineResponseModel |
The pipeline model to load. |
required |
Returns:
Type | Description |
---|---|
Pipeline |
The loaded pipeline. |
Source code in zenml/new/pipelines/deserialization_utils.py
def load_pipeline_v_0_3(model: "PipelineResponseModel") -> "Pipeline":
"""Load a pipeline from a model with spec version 0.3.
Args:
model: The pipeline model to load.
Returns:
The loaded pipeline.
"""
return _load_legacy_pipeline(model=model, use_pipeline_parameter_name=True)
load_pipeline_v_0_4(model)
Load a pipeline from a model with spec version 0.4.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
PipelineResponseModel |
The pipeline model to load. |
required |
Exceptions:
Type | Description |
---|---|
TypeError |
If the pipeline source does not refer to a pipeline instance. |
Returns:
Type | Description |
---|---|
Pipeline |
The loaded pipeline. |
Source code in zenml/new/pipelines/deserialization_utils.py
def load_pipeline_v_0_4(model: "PipelineResponseModel") -> "Pipeline":
"""Load a pipeline from a model with spec version 0.4.
Args:
model: The pipeline model to load.
Raises:
TypeError: If the pipeline source does not refer to a pipeline instance.
Returns:
The loaded pipeline.
"""
pipeline_source = model.spec.source
assert pipeline_source
pipeline = source_utils.load(pipeline_source)
if not isinstance(pipeline, Pipeline):
raise TypeError("Not a pipeline")
pipeline.prepare(**model.spec.parameters)
return pipeline
model_utils
Pipeline utilities to support Model Control Plane.
NewModelVersionRequest (BaseModel)
pydantic-model
Request to create a new model version.
Source code in zenml/new/pipelines/model_utils.py
class NewModelVersionRequest(BaseModel):
"""Request to create a new model version."""
class Requester(BaseModel):
"""Requester of a new model version."""
source: str
name: str
def __repr__(self) -> str:
"""Return a string representation of the requester.
Returns:
A string representation of the requester.
"""
return f"{self.source}::{self.name}"
requesters: List[Requester] = []
_model_config: Optional[ModelConfig] = PrivateAttr(default=None)
@property
def model_config(self) -> ModelConfig:
"""Model config getter.
Returns:
The model config.
Raises:
RuntimeError: If the model config is not set.
"""
if self._model_config is None:
raise RuntimeError("Model config is not set.")
return self._model_config
def update_request(
self,
model_config: ModelConfigModel,
requester: "NewModelVersionRequest.Requester",
) -> None:
"""Update from Model Config Model object in place.
Args:
model_config: Model Config Model object.
requester: Requester of a new model version.
Raises:
ValueError: If the model version name is configured differently by different requesters.
"""
self.requesters.append(requester)
if self._model_config is None:
self._model_config = ModelConfig.parse_obj(model_config)
if self._model_config.version != model_config.version:
raise ValueError(
f"A mismatch of `version` name in model configurations provided for `{model_config.name} detected."
"Since a new model version is requested for this model, all `version` names must match or left default."
)
self._model_config._merge_with_config(model_config)
model_config: ModelConfig
property
readonly
Model config getter.
Returns:
Type | Description |
---|---|
ModelConfig |
The model config. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the model config is not set. |
Requester (BaseModel)
pydantic-model
Requester of a new model version.
Source code in zenml/new/pipelines/model_utils.py
class Requester(BaseModel):
"""Requester of a new model version."""
source: str
name: str
def __repr__(self) -> str:
"""Return a string representation of the requester.
Returns:
A string representation of the requester.
"""
return f"{self.source}::{self.name}"
__repr__(self)
special
Return a string representation of the requester.
Returns:
Type | Description |
---|---|
str |
A string representation of the requester. |
Source code in zenml/new/pipelines/model_utils.py
def __repr__(self) -> str:
"""Return a string representation of the requester.
Returns:
A string representation of the requester.
"""
return f"{self.source}::{self.name}"
update_request(self, model_config, requester)
Update from Model Config Model object in place.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_config |
ModelConfigModel |
Model Config Model object. |
required |
requester |
NewModelVersionRequest.Requester |
Requester of a new model version. |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
If the model version name is configured differently by different requesters. |
Source code in zenml/new/pipelines/model_utils.py
def update_request(
self,
model_config: ModelConfigModel,
requester: "NewModelVersionRequest.Requester",
) -> None:
"""Update from Model Config Model object in place.
Args:
model_config: Model Config Model object.
requester: Requester of a new model version.
Raises:
ValueError: If the model version name is configured differently by different requesters.
"""
self.requesters.append(requester)
if self._model_config is None:
self._model_config = ModelConfig.parse_obj(model_config)
if self._model_config.version != model_config.version:
raise ValueError(
f"A mismatch of `version` name in model configurations provided for `{model_config.name} detected."
"Since a new model version is requested for this model, all `version` names must match or left default."
)
self._model_config._merge_with_config(model_config)
pipeline
Definition of a ZenML pipeline.
Pipeline
ZenML pipeline class.
Source code in zenml/new/pipelines/pipeline.py
class Pipeline:
"""ZenML pipeline class."""
# The active pipeline is the pipeline to which step invocations will be
# added when a step is called. It is set using a context manager when a
# pipeline is called (see Pipeline.__call__ for more context)
ACTIVE_PIPELINE: ClassVar[Optional["Pipeline"]] = None
def __init__(
self,
name: str,
entrypoint: F,
enable_cache: Optional[bool] = None,
enable_artifact_metadata: Optional[bool] = None,
enable_artifact_visualization: Optional[bool] = None,
enable_step_logs: Optional[bool] = None,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
extra: Optional[Dict[str, Any]] = None,
on_failure: Optional["HookSpecification"] = None,
on_success: Optional["HookSpecification"] = None,
model_config: Optional["ModelConfig"] = None,
) -> None:
"""Initializes a pipeline.
Args:
name: The name of the pipeline.
entrypoint: The entrypoint function of the pipeline.
enable_cache: If caching should be enabled for this pipeline.
enable_artifact_metadata: If artifact metadata should be enabled for
this pipeline.
enable_artifact_visualization: If artifact visualization should be
enabled for this pipeline.
enable_step_logs: If step logs should be enabled for this pipeline.
settings: settings for this pipeline.
extra: Extra configurations for this pipeline.
on_failure: Callback function in event of failure of the step. Can
be a function with a single argument of type `BaseException`, or
a source path to such a function (e.g. `module.my_function`).
on_success: Callback function in event of success of the step. Can
be a function with no arguments, or a source path to such a
function (e.g. `module.my_function`).
model_config: Model(Version) configuration for this step as `ModelConfig` instance.
"""
self._invocations: Dict[str, StepInvocation] = {}
self._run_args: Dict[str, Any] = {}
self._configuration = PipelineConfiguration(
name=name,
)
self.configure(
enable_cache=enable_cache,
enable_artifact_metadata=enable_artifact_metadata,
enable_artifact_visualization=enable_artifact_visualization,
enable_step_logs=enable_step_logs,
settings=settings,
extra=extra,
on_failure=on_failure,
on_success=on_success,
model_config=model_config,
)
self.entrypoint = entrypoint
self._parameters: Dict[str, Any] = {}
@property
def name(self) -> str:
"""The name of the pipeline.
Returns:
The name of the pipeline.
"""
return self.configuration.name
@property
def enable_cache(self) -> Optional[bool]:
"""If caching is enabled for the pipeline.
Returns:
If caching is enabled for the pipeline.
"""
return self.configuration.enable_cache
@property
def configuration(self) -> PipelineConfiguration:
"""The configuration of the pipeline.
Returns:
The configuration of the pipeline.
"""
return self._configuration
@property
def invocations(self) -> Dict[str, StepInvocation]:
"""Returns the step invocations of this pipeline.
This dictionary will only be populated once the pipeline has been
called.
Returns:
The step invocations.
"""
return self._invocations
def resolve(self) -> "Source":
"""Resolves the pipeline.
Returns:
The pipeline source.
"""
return source_utils.resolve(self.entrypoint, skip_validation=True)
@property
def source_object(self) -> Any:
"""The source object of this pipeline.
Returns:
The source object of this pipeline.
"""
return self.entrypoint
@property
def source_code(self) -> str:
"""The source code of this pipeline.
Returns:
The source code of this pipeline.
"""
return inspect.getsource(self.source_object)
@classmethod
def from_model(cls, model: "PipelineResponseModel") -> "Pipeline":
"""Creates a pipeline instance from a model.
Args:
model: The model to load the pipeline instance from.
Returns:
The pipeline instance.
"""
from zenml.new.pipelines.deserialization_utils import load_pipeline
return load_pipeline(model=model)
@property
def model(self) -> "PipelineResponseModel":
"""Gets the registered pipeline model for this instance.
Returns:
The registered pipeline model.
Raises:
RuntimeError: If the pipeline has not been registered yet.
"""
self._prepare_if_possible()
pipeline_spec = Compiler().compile_spec(self)
version_hash = self._compute_unique_identifier(
pipeline_spec=pipeline_spec
)
pipelines = Client().list_pipelines(
name=self.name, version_hash=version_hash
)
if len(pipelines) == 1:
return pipelines.items[0]
raise RuntimeError(
f"Cannot get the model of pipeline '{self.name}' because it has "
f"not been registered yet. Please ensure that the pipeline has "
f"been run and that at least one step has been executed."
)
def configure(
self: T,
enable_cache: Optional[bool] = None,
enable_artifact_metadata: Optional[bool] = None,
enable_artifact_visualization: Optional[bool] = None,
enable_step_logs: Optional[bool] = None,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
extra: Optional[Dict[str, Any]] = None,
on_failure: Optional["HookSpecification"] = None,
on_success: Optional["HookSpecification"] = None,
model_config: Optional["ModelConfig"] = None,
merge: bool = True,
) -> T:
"""Configures the pipeline.
Configuration merging example:
* `merge==True`:
pipeline.configure(extra={"key1": 1})
pipeline.configure(extra={"key2": 2}, merge=True)
pipeline.configuration.extra # {"key1": 1, "key2": 2}
* `merge==False`:
pipeline.configure(extra={"key1": 1})
pipeline.configure(extra={"key2": 2}, merge=False)
pipeline.configuration.extra # {"key2": 2}
Args:
enable_cache: If caching should be enabled for this pipeline.
enable_artifact_metadata: If artifact metadata should be enabled for
this pipeline.
enable_artifact_visualization: If artifact visualization should be
enabled for this pipeline.
enable_step_logs: If step logs should be enabled for this pipeline.
settings: settings for this pipeline.
extra: Extra configurations for this pipeline.
on_failure: Callback function in event of failure of the step. Can
be a function with a single argument of type `BaseException`, or
a source path to such a function (e.g. `module.my_function`).
on_success: Callback function in event of success of the step. Can
be a function with no arguments, or a source path to such a
function (e.g. `module.my_function`).
merge: If `True`, will merge the given dictionary configurations
like `extra` and `settings` with existing
configurations. If `False` the given configurations will
overwrite all existing ones. See the general description of this
method for an example.
model_config: Model(Version) configuration for this step as `ModelConfig` instance.
Returns:
The pipeline instance that this method was called on.
"""
failure_hook_source = None
if on_failure:
# string of on_failure hook function to be used for this pipeline
failure_hook_source = resolve_and_validate_hook(on_failure)
success_hook_source = None
if on_success:
# string of on_success hook function to be used for this pipeline
success_hook_source = resolve_and_validate_hook(on_success)
if model_config:
model_config.suppress_warnings = True
values = dict_utils.remove_none_values(
{
"enable_cache": enable_cache,
"enable_artifact_metadata": enable_artifact_metadata,
"enable_artifact_visualization": enable_artifact_visualization,
"enable_step_logs": enable_step_logs,
"settings": settings,
"extra": extra,
"failure_hook_source": failure_hook_source,
"success_hook_source": success_hook_source,
"model_config_model": ModelConfigModel.parse_obj(
model_config.dict()
)
if model_config is not None
else None,
}
)
config = PipelineConfigurationUpdate(**values)
self._apply_configuration(config, merge=merge)
return self
@property
def requires_parameters(self) -> bool:
"""If the pipeline entrypoint requires parameters.
Returns:
If the pipeline entrypoint requires parameters.
"""
signature = inspect.signature(self.entrypoint, follow_wrapped=True)
return any(
parameter.default is inspect.Parameter.empty
for parameter in signature.parameters.values()
)
@property
def is_prepared(self) -> bool:
"""If the pipeline is prepared.
Prepared means that the pipeline entrypoint has been called and the
pipeline is fully defined.
Returns:
If the pipeline is prepared.
"""
return len(self.invocations) > 0
def prepare(self, *args: Any, **kwargs: Any) -> None:
"""Prepares the pipeline.
Args:
*args: Pipeline entrypoint input arguments.
**kwargs: Pipeline entrypoint input keyword arguments.
"""
# Clear existing parameters and invocations
self._parameters = {}
self._invocations = {}
with self:
# Enter the context manager, so we become the active pipeline. This
# means that all steps that get called while the entrypoint function
# is executed will be added as invocation to this pipeline instance.
self._call_entrypoint(*args, **kwargs)
def register(self) -> "PipelineResponseModel":
"""Register the pipeline in the server.
Returns:
The registered pipeline model.
"""
# Activating the built-in integrations to load all materializers
from zenml.integrations.registry import integration_registry
self._prepare_if_possible()
integration_registry.activate_integrations()
if self.configuration.dict(exclude_defaults=True, exclude={"name"}):
logger.warning(
f"The pipeline `{self.name}` that you're registering has "
"custom configurations applied to it. These will not be "
"registered with the pipeline and won't be set when you build "
"images or run the pipeline from the CLI. To provide these "
"configurations, use the `--config` option of the `zenml "
"pipeline build/run` commands."
)
pipeline_spec = Compiler().compile_spec(self)
return self._register(pipeline_spec=pipeline_spec)
def build(
self,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
step_configurations: Optional[
Mapping[str, "StepConfigurationUpdateOrDict"]
] = None,
config_path: Optional[str] = None,
) -> Optional["PipelineBuildResponseModel"]:
"""Builds Docker images for the pipeline.
Args:
settings: Settings for the pipeline.
step_configurations: Configurations for steps of the pipeline.
config_path: Path to a yaml configuration file. This file will
be parsed as a
`zenml.config.pipeline_configurations.PipelineRunConfiguration`
object. Options provided in this file will be overwritten by
options provided in code using the other arguments of this
method.
Returns:
The build output.
"""
with track_handler(event=AnalyticsEvent.BUILD_PIPELINE):
self._prepare_if_possible()
deployment, pipeline_spec, _, _ = self._compile(
config_path=config_path,
steps=step_configurations,
settings=settings,
)
pipeline_id = self._register(pipeline_spec=pipeline_spec).id
local_repo = code_repository_utils.find_active_code_repository()
code_repository = build_utils.verify_local_repository_context(
deployment=deployment, local_repo_context=local_repo
)
return build_utils.create_pipeline_build(
deployment=deployment,
pipeline_id=pipeline_id,
code_repository=code_repository,
)
def _run(
self,
*,
run_name: Optional[str] = None,
enable_cache: Optional[bool] = None,
enable_artifact_metadata: Optional[bool] = None,
enable_artifact_visualization: Optional[bool] = None,
enable_step_logs: Optional[bool] = None,
schedule: Optional[Schedule] = None,
build: Union[str, "UUID", "PipelineBuildBaseModel", None] = None,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
step_configurations: Optional[
Mapping[str, "StepConfigurationUpdateOrDict"]
] = None,
extra: Optional[Dict[str, Any]] = None,
config_path: Optional[str] = None,
unlisted: bool = False,
prevent_build_reuse: bool = False,
) -> None:
"""Runs the pipeline on the active stack.
Args:
run_name: Name of the pipeline run.
enable_cache: If caching should be enabled for this pipeline run.
enable_artifact_metadata: If artifact metadata should be enabled
for this pipeline run.
enable_artifact_visualization: If artifact visualization should be
enabled for this pipeline run.
enable_step_logs: If step logs should be enabled for this pipeline.
schedule: Optional schedule to use for the run.
build: Optional build to use for the run.
settings: Settings for this pipeline run.
step_configurations: Configurations for steps of the pipeline.
extra: Extra configurations for this pipeline run.
config_path: Path to a yaml configuration file. This file will
be parsed as a
`zenml.config.pipeline_configurations.PipelineRunConfiguration`
object. Options provided in this file will be overwritten by
options provided in code using the other arguments of this
method.
unlisted: Whether the pipeline run should be unlisted (not assigned
to any pipeline).
prevent_build_reuse: Whether to prevent the reuse of a build.
Raises:
Exception: bypass any exception from pipeline up.
"""
if constants.SHOULD_PREVENT_PIPELINE_EXECUTION:
# An environment variable was set to stop the execution of
# pipelines. This is done to prevent execution of module-level
# pipeline.run() calls when importing modules needed to run a step.
logger.info(
"Preventing execution of pipeline '%s'. If this is not "
"intended behavior, make sure to unset the environment "
"variable '%s'.",
self.name,
constants.ENV_ZENML_PREVENT_PIPELINE_EXECUTION,
)
return
logger.info(f"Initiating a new run for the pipeline: `{self.name}`.")
with track_handler(AnalyticsEvent.RUN_PIPELINE) as analytics_handler:
deployment, pipeline_spec, schedule, build = self._compile(
config_path=config_path,
run_name=run_name,
enable_cache=enable_cache,
enable_artifact_metadata=enable_artifact_metadata,
enable_artifact_visualization=enable_artifact_visualization,
enable_step_logs=enable_step_logs,
steps=step_configurations,
settings=settings,
schedule=schedule,
build=build,
extra=extra,
)
skip_pipeline_registration = constants.handle_bool_env_var(
constants.ENV_ZENML_SKIP_PIPELINE_REGISTRATION,
default=False,
)
register_pipeline = not (skip_pipeline_registration or unlisted)
pipeline_id = None
if register_pipeline:
pipeline_id = self._register(pipeline_spec=pipeline_spec).id
else:
logger.debug(f"Pipeline {self.name} is unlisted.")
# TODO: check whether orchestrator even support scheduling before
# registering the schedule
schedule_id = None
if schedule:
if schedule.name:
schedule_name = schedule.name
else:
date = datetime.utcnow().strftime("%Y_%m_%d")
time = datetime.utcnow().strftime("%H_%M_%S_%f")
schedule_name = deployment.run_name_template.format(
date=date, time=time
)
components = Client().active_stack_model.components
orchestrator = components[StackComponentType.ORCHESTRATOR][0]
schedule_model = ScheduleRequestModel(
workspace=Client().active_workspace.id,
user=Client().active_user.id,
pipeline_id=pipeline_id,
orchestrator_id=orchestrator.id,
name=schedule_name,
active=True,
cron_expression=schedule.cron_expression,
start_time=schedule.start_time,
end_time=schedule.end_time,
interval_second=schedule.interval_second,
catchup=schedule.catchup,
)
schedule_id = (
Client().zen_store.create_schedule(schedule_model).id
)
logger.info(
f"Created schedule `{schedule_name}` for pipeline "
f"`{deployment.pipeline_configuration.name}`."
)
stack = Client().active_stack
new_version_requests = self.get_new_version_requests(deployment)
deployment = self.update_new_versions_requests(
deployment, new_version_requests
)
local_repo_context = (
code_repository_utils.find_active_code_repository()
)
code_repository = build_utils.verify_local_repository_context(
deployment=deployment, local_repo_context=local_repo_context
)
build_model = build_utils.reuse_or_create_pipeline_build(
deployment=deployment,
pipeline_id=pipeline_id,
allow_build_reuse=not prevent_build_reuse,
build=build,
code_repository=code_repository,
)
build_id = build_model.id if build_model else None
code_reference = None
if local_repo_context and not local_repo_context.is_dirty:
source_root = source_utils.get_source_root()
subdirectory = (
Path(source_root)
.resolve()
.relative_to(local_repo_context.root)
)
code_reference = CodeReferenceRequestModel(
commit=local_repo_context.current_commit,
subdirectory=subdirectory.as_posix(),
code_repository=local_repo_context.code_repository_id,
)
deployment_request = PipelineDeploymentRequestModel(
user=Client().active_user.id,
workspace=Client().active_workspace.id,
stack=stack.id,
pipeline=pipeline_id,
build=build_id,
schedule=schedule_id,
code_reference=code_reference,
**deployment.dict(),
)
deployment_model = Client().zen_store.create_deployment(
deployment=deployment_request
)
analytics_handler.metadata = self._get_pipeline_analytics_metadata(
deployment=deployment_model, stack=stack
)
stack.prepare_pipeline_deployment(deployment=deployment_model)
self.log_pipeline_deployment_metadata(deployment_model)
# Prevent execution of nested pipelines which might lead to
# unexpected behavior
constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True
try:
stack.deploy_pipeline(deployment=deployment_model)
except Exception as e:
self.delete_running_versions_without_recovery(
new_version_requests
)
raise e
finally:
constants.SHOULD_PREVENT_PIPELINE_EXECUTION = False
runs = Client().list_pipeline_runs(
deployment_id=deployment_model.id,
sort_by="desc:start_time",
size=1,
)
if runs.items:
self.register_running_versions(new_version_requests)
run_url = dashboard_utils.get_run_url(runs[0])
if run_url:
logger.info(f"Dashboard URL: {run_url}")
else:
logger.info(
"You can visualize your pipeline runs in the `ZenML "
"Dashboard`. In order to try it locally, please run "
"`zenml up`."
)
else:
logger.warning(
f"Your orchestrator '{stack.orchestrator.name}' is "
f"running remotely. Note that the pipeline run will "
f"only show up on the ZenML dashboard once the first "
f"step has started executing on the remote "
f"infrastructure.",
)
@staticmethod
def log_pipeline_deployment_metadata(
deployment_model: PipelineDeploymentResponseModel,
) -> None:
"""Displays logs based on the deployment model upon running a pipeline.
Args:
deployment_model: The model for the pipeline deployment
"""
try:
# Log about the schedule/run
if deployment_model.schedule:
logger.info(
"Scheduling a run with the schedule: "
f"`{deployment_model.schedule.name}`"
)
else:
logger.info("Executing a new run.")
# Log about the caching status
if deployment_model.pipeline_configuration.enable_cache is False:
logger.info(
f"Caching is disabled by default for "
f"`{deployment_model.pipeline_configuration.name}`."
)
# Log about the used builds
if deployment_model.build:
logger.info("Using a build:")
logger.info(
" Image(s): "
f"{', '.join([i.image for i in deployment_model.build.images.values()])}"
)
# Log about version mismatches between local and build
from zenml import __version__
if deployment_model.build.zenml_version != __version__:
logger.info(
f"ZenML version (different than the local version): "
f"{deployment_model.build.zenml_version}"
)
import platform
if (
deployment_model.build.python_version
!= platform.python_version()
):
logger.info(
f"Python version (different than the local version): "
f"{deployment_model.build.python_version}"
)
# Log about the user, stack and components
logger.info(f"Using user: `{deployment_model.user.name}`")
if deployment_model.stack is not None:
logger.info(f"Using stack: `{deployment_model.stack.name}`")
for (
component_type,
component_models,
) in deployment_model.stack.components.items():
logger.info(
f" {component_type.value}: `{component_models[0].name}`"
)
except Exception as e:
logger.debug(f"Logging pipeline deployment metadata failed: {e}")
def get_new_version_requests(
self, deployment: "PipelineDeploymentBaseModel"
) -> Dict[str, NewModelVersionRequest]:
"""Get the running versions of the models that are used in the pipeline run.
Args:
deployment: The pipeline deployment configuration.
Returns:
A dict of new model version request objects.
"""
new_versions_requested: Dict[
str, NewModelVersionRequest
] = defaultdict(NewModelVersionRequest)
all_steps_have_own_config = True
for step in deployment.step_configurations.values():
step_model_config = step.config.model_config_model
all_steps_have_own_config = (
all_steps_have_own_config
and step.config.model_config_model is not None
)
if (
step_model_config
and step_model_config.create_new_model_version
):
new_versions_requested[step_model_config.name].update_request(
step_model_config,
NewModelVersionRequest.Requester(
source="step", name=step.config.name
),
)
if not all_steps_have_own_config:
pipeline_model_config = (
deployment.pipeline_configuration.model_config_model
)
if (
pipeline_model_config
and pipeline_model_config.create_new_model_version
):
new_versions_requested[
pipeline_model_config.name
].update_request(
pipeline_model_config,
NewModelVersionRequest.Requester(
source="pipeline", name=self.name
),
)
elif deployment.pipeline_configuration.model_config_model is not None:
logger.warning(
f"ModelConfig of pipeline `{self.name}` is overridden in all steps. "
)
self._validate_new_version_requests(new_versions_requested)
return new_versions_requested
def _validate_new_version_requests(
self,
new_versions_requested: Dict[str, NewModelVersionRequest],
) -> None:
"""Validate the model configurations that are used in the pipeline run.
Args:
new_versions_requested: A dict of new model version request objects.
Raises:
RuntimeError: If there is unfinished pipeline run for requested model version.
RuntimeError: If recovery not requested, but model version already exists.
"""
for model_name, data in new_versions_requested.items():
if len(data.requesters) > 1:
logger.warning(
f"New version of model `{model_name}` requested in multiple decorators:\n"
f"{data.requesters}\n We recommend that `create_new_model_version` is configured "
"only in one place of the pipeline."
)
try:
model_version = data.model_config._get_model_version()
for run_name, run in model_version.pipeline_runs.items():
if run.status == ExecutionStatus.RUNNING:
raise RuntimeError(
f"New model version was requested, but pipeline run `{run_name}` "
f"is still running with version `{model_version.name}`."
)
if (
data.model_config.version
and data.model_config.delete_new_version_on_failure
):
raise RuntimeError(
f"Cannot create version `{data.model_config.version}` "
f"for model `{data.model_config.name}` since it already exists"
)
except KeyError:
pass
def update_new_versions_requests(
self,
deployment: "PipelineDeploymentBaseModel",
new_version_requests: Dict[str, NewModelVersionRequest],
) -> "PipelineDeploymentBaseModel":
"""Update model configurations that are used in the pipeline run.
This method is updating create_new_model_version for all model configurations in the pipeline,
who deal with model name with existing request to create a new mode version.
Args:
deployment: The pipeline deployment configuration.
new_version_requests: Dict of models requesting new versions and their definition points.
Returns:
Updated pipeline deployment configuration.
"""
for step_name in deployment.step_configurations:
step_model_config = deployment.step_configurations[
step_name
].config.model_config_model
if (
step_model_config is not None
and step_model_config.name in new_version_requests
):
step_model_config.version = new_version_requests[
step_model_config.name
].model_config.version
step_model_config.create_new_model_version = True
pipeline_model_config = (
deployment.pipeline_configuration.model_config_model
)
if (
pipeline_model_config is not None
and pipeline_model_config.name in new_version_requests
):
pipeline_model_config.version = new_version_requests[
pipeline_model_config.name
].model_config.version
pipeline_model_config.create_new_model_version = True
return deployment
def register_running_versions(
self, new_version_requests: Dict[str, NewModelVersionRequest]
) -> None:
"""Registers the running versions of the models used in the given pipeline run.
Args:
new_version_requests: Dict of models requesting new versions and their definition points.
"""
for model_name, new_version_request in new_version_requests.items():
if new_version_request.model_config.delete_new_version_on_failure:
mv = Client().get_model_version(
model_name_or_id=model_name,
model_version_name_or_number_or_id=new_version_request.model_config.version,
)
mv._update_default_running_version_name()
def delete_running_versions_without_recovery(
self, new_version_requests: Dict[str, NewModelVersionRequest]
) -> None:
"""Delete the running versions of the models without `restore` after fail.
Args:
new_version_requests: Dict of models requesting new versions and their definition points.
"""
for model_name, new_version_request in new_version_requests.items():
if (
new_version_request.model_config.delete_new_version_on_failure
and new_version_request.model_config.version is not None
):
model = Client().get_model_version(
model_name_or_id=model_name,
model_version_name_or_number_or_id=new_version_request.model_config.version,
)
Client().delete_model_version(
model_name_or_id=model_name,
model_version_name_or_id=model.id,
)
def get_runs(self, **kwargs: Any) -> List[PipelineRunResponseModel]:
"""(Deprecated) Get runs of this pipeline.
Args:
**kwargs: Further arguments for filtering or pagination that are
passed to `client.list_pipeline_runs()`.
Returns:
List of runs of this pipeline.
"""
logger.warning(
"`Pipeline.get_runs()` is deprecated and will be removed in a "
"future version. Please use `Pipeline.model.get_runs()` instead."
)
return self.model.get_runs(**kwargs)
def write_run_configuration_template(
self, path: str, stack: Optional["Stack"] = None
) -> None:
"""Writes a run configuration yaml template.
Args:
path: The path where the template will be written.
stack: The stack for which the template should be generated. If
not given, the active stack will be used.
"""
from zenml.config.base_settings import ConfigurationLevel
from zenml.config.step_configurations import (
PartialArtifactConfiguration,
)
self._prepare_if_possible()
stack = stack or Client().active_stack
setting_classes = stack.setting_classes
setting_classes.update(settings_utils.get_general_settings())
pipeline_settings = {}
step_settings = {}
for key, setting_class in setting_classes.items():
fields = pydantic_utils.TemplateGenerator(setting_class).run()
if ConfigurationLevel.PIPELINE in setting_class.LEVEL:
pipeline_settings[key] = fields
if ConfigurationLevel.STEP in setting_class.LEVEL:
step_settings[key] = fields
steps = {}
for step_name, invocation in self.invocations.items():
step = invocation.step
parameters = (
pydantic_utils.TemplateGenerator(
step.entrypoint_definition.legacy_params.annotation
).run()
if step.entrypoint_definition.legacy_params
else {}
)
outputs = {
name: PartialArtifactConfiguration()
for name in step.entrypoint_definition.outputs
}
step_template = StepConfigurationUpdate(
parameters=parameters,
settings=step_settings,
outputs=outputs,
)
steps[step_name] = step_template
run_config = PipelineRunConfiguration(
settings=pipeline_settings, steps=steps
)
template = pydantic_utils.TemplateGenerator(run_config).run()
yaml_string = yaml.dump(template)
yaml_string = yaml_utils.comment_out_yaml(yaml_string)
with open(path, "w") as f:
f.write(yaml_string)
def _apply_configuration(
self,
config: PipelineConfigurationUpdate,
merge: bool = True,
) -> None:
"""Applies an update to the pipeline configuration.
Args:
config: The configuration update.
merge: Whether to merge the updates with the existing configuration
or not. See the `BasePipeline.configure(...)` method for a
detailed explanation.
"""
self._validate_configuration(config)
self._configuration = pydantic_utils.update_model(
self._configuration, update=config, recursive=merge
)
logger.debug("Updated pipeline configuration:")
logger.debug(self._configuration)
@staticmethod
def _validate_configuration(config: PipelineConfigurationUpdate) -> None:
"""Validates a configuration update.
Args:
config: The configuration update to validate.
"""
settings_utils.validate_setting_keys(list(config.settings))
def _get_pipeline_analytics_metadata(
self,
deployment: "PipelineDeploymentResponseModel",
stack: "Stack",
) -> Dict[str, Any]:
"""Returns the pipeline deployment metadata.
Args:
deployment: The pipeline deployment to track.
stack: The stack on which the pipeline will be deployed.
Returns:
the metadata about the pipeline deployment
"""
custom_materializer = False
for step in deployment.step_configurations.values():
for output in step.config.outputs.values():
for source in output.materializer_source:
if not source.is_internal:
custom_materializer = True
stack_creator = Client().get_stack(stack.id).user
active_user = Client().active_user
own_stack = stack_creator and stack_creator.id == active_user.id
stack_metadata = {
component_type.value: component.flavor
for component_type, component in stack.components.items()
}
return {
"store_type": Client().zen_store.type.value,
**stack_metadata,
"total_steps": len(self.invocations),
"schedule": bool(deployment.schedule),
"custom_materializer": custom_materializer,
"own_stack": own_stack,
}
def _compile(
self, config_path: Optional[str] = None, **run_configuration_args: Any
) -> Tuple[
"PipelineDeploymentBaseModel",
"PipelineSpec",
Optional["Schedule"],
Union["PipelineBuildBaseModel", UUID, None],
]:
"""Compiles the pipeline.
Args:
config_path: Path to a config file.
**run_configuration_args: Configurations for the pipeline run.
Returns:
A tuple containing the deployment, spec, schedule and build of
the compiled pipeline.
"""
# Activating the built-in integrations to load all materializers
from zenml.integrations.registry import integration_registry
integration_registry.activate_integrations()
if config_path:
run_config = PipelineRunConfiguration.from_yaml(config_path)
else:
run_config = PipelineRunConfiguration()
new_values = dict_utils.remove_none_values(run_configuration_args)
update = PipelineRunConfiguration.parse_obj(new_values)
# Update with the values in code so they take precedence
run_config = pydantic_utils.update_model(run_config, update=update)
deployment, pipeline_spec = Compiler().compile(
pipeline=self,
stack=Client().active_stack,
run_configuration=run_config,
)
return deployment, pipeline_spec, run_config.schedule, run_config.build
def _register(
self, pipeline_spec: "PipelineSpec"
) -> "PipelineResponseModel":
"""Register the pipeline in the server.
Args:
pipeline_spec: The pipeline spec to register.
Returns:
The registered pipeline model.
"""
version_hash = self._compute_unique_identifier(
pipeline_spec=pipeline_spec
)
client = Client()
matching_pipelines = client.list_pipelines(
name=self.name,
version_hash=version_hash,
size=1,
sort_by="desc:created",
)
if matching_pipelines.total:
registered_pipeline = matching_pipelines.items[0]
logger.info(
"Reusing registered version: `(version: %s)`.",
registered_pipeline.version,
)
return registered_pipeline
latest_version = self._get_latest_version() or 0
version = str(latest_version + 1)
request = PipelineRequestModel(
workspace=client.active_workspace.id,
user=client.active_user.id,
name=self.name,
version=version,
version_hash=version_hash,
spec=pipeline_spec,
docstring=self.__doc__,
)
registered_pipeline = client.zen_store.create_pipeline(
pipeline=request
)
logger.info(
"Registered new version: `(version %s)`.",
registered_pipeline.version,
)
return registered_pipeline
def _compute_unique_identifier(self, pipeline_spec: PipelineSpec) -> str:
"""Computes a unique identifier from the pipeline spec and steps.
Args:
pipeline_spec: Compiled spec of the pipeline.
Returns:
The unique identifier of the pipeline.
"""
from packaging import version
hash_ = hashlib.md5() # nosec
hash_.update(pipeline_spec.json_with_string_sources.encode())
if version.parse(pipeline_spec.version) >= version.parse("0.4"):
# Only add this for newer versions to keep backwards compatibility
hash_.update(self.source_code.encode())
for step_spec in pipeline_spec.steps:
invocation = self.invocations[step_spec.pipeline_parameter_name]
step_source = invocation.step.source_code
hash_.update(step_source.encode())
return hash_.hexdigest()
def _get_latest_version(self) -> Optional[int]:
"""Gets the latest version of this pipeline.
Returns:
The latest version or `None` if no version exists.
"""
all_pipelines = Client().list_pipelines(
name=self.name, sort_by="desc:created", size=1
)
if not all_pipelines.total:
return None
pipeline = all_pipelines.items[0]
if pipeline.version == "UNVERSIONED":
return None
return int(all_pipelines.items[0].version)
def add_step_invocation(
self,
step: "BaseStep",
input_artifacts: Dict[str, StepArtifact],
external_artifacts: Dict[str, "ExternalArtifact"],
parameters: Dict[str, Any],
upstream_steps: Set[str],
custom_id: Optional[str] = None,
allow_id_suffix: bool = True,
) -> str:
"""Adds a step invocation to the pipeline.
Args:
step: The step for which to add an invocation.
input_artifacts: The input artifacts for the invocation.
external_artifacts: The external artifacts for the invocation.
parameters: The parameters for the invocation.
upstream_steps: The upstream steps for the invocation.
custom_id: Custom ID to use for the invocation.
allow_id_suffix: Whether a suffix can be appended to the invocation
ID.
Raises:
RuntimeError: If the method is called on an inactive pipeline.
RuntimeError: If the invocation was called with an artifact from
a different pipeline.
Returns:
The step invocation ID.
"""
if Pipeline.ACTIVE_PIPELINE != self:
raise RuntimeError(
"A step invocation can only be added to an active pipeline."
)
for artifact in input_artifacts.values():
if artifact.pipeline is not self:
raise RuntimeError(
"Got invalid input artifact for invocation of step "
f"{step.name}: The input artifact was produced by a step "
f"inside a different pipeline {artifact.pipeline.name}."
)
invocation_id = self._compute_invocation_id(
step=step, custom_id=custom_id, allow_suffix=allow_id_suffix
)
invocation = StepInvocation(
id=invocation_id,
step=step,
input_artifacts=input_artifacts,
external_artifacts=external_artifacts,
parameters=parameters,
upstream_steps=upstream_steps,
pipeline=self,
)
self._invocations[invocation_id] = invocation
return invocation_id
def _compute_invocation_id(
self,
step: "BaseStep",
custom_id: Optional[str] = None,
allow_suffix: bool = True,
) -> str:
"""Compute the invocation ID.
Args:
step: The step for which to compute the ID.
custom_id: Custom ID to use for the invocation.
allow_suffix: Whether a suffix can be appended to the invocation
ID.
Raises:
RuntimeError: If no ID suffix is allowed and an invocation for the
same ID already exists.
RuntimeError: If no unique invocation ID can be found.
Returns:
The invocation ID.
"""
base_id = id_ = custom_id or step.name
if id_ not in self.invocations:
return id_
if not allow_suffix:
raise RuntimeError("Duplicate step ID")
for index in range(2, 10000):
id_ = f"{base_id}_{index}"
if id_ not in self.invocations:
return id_
raise RuntimeError("Unable to find step ID")
def __enter__(self: T) -> T:
"""Activate the pipeline context.
Raises:
RuntimeError: If a different pipeline is already active.
Returns:
The pipeline instance.
"""
if Pipeline.ACTIVE_PIPELINE:
raise RuntimeError(
"Unable to enter pipeline context. A different pipeline "
f"{Pipeline.ACTIVE_PIPELINE.name} is already active."
)
Pipeline.ACTIVE_PIPELINE = self
return self
def __exit__(self, *args: Any) -> None:
"""Deactivates the pipeline context.
Args:
*args: The arguments passed to the context exit handler.
"""
Pipeline.ACTIVE_PIPELINE = None
def with_options(
self,
run_name: Optional[str] = None,
schedule: Optional[Schedule] = None,
build: Union[str, "UUID", "PipelineBuildBaseModel", None] = None,
step_configurations: Optional[
Mapping[str, "StepConfigurationUpdateOrDict"]
] = None,
config_path: Optional[str] = None,
unlisted: bool = False,
prevent_build_reuse: bool = False,
**kwargs: Any,
) -> "Pipeline":
"""Copies the pipeline and applies the given configurations.
Args:
run_name: Name of the pipeline run.
schedule: Optional schedule to use for the run.
build: Optional build to use for the run.
step_configurations: Configurations for steps of the pipeline.
config_path: Path to a yaml configuration file. This file will
be parsed as a
`zenml.config.pipeline_configurations.PipelineRunConfiguration`
object. Options provided in this file will be overwritten by
options provided in code using the other arguments of this
method.
unlisted: Whether the pipeline run should be unlisted (not assigned
to any pipeline).
prevent_build_reuse: Whether to prevent the reuse of a build.
**kwargs: Pipeline configuration options. These will be passed
to the `pipeline.configure(...)` method.
Returns:
The copied pipeline instance.
"""
pipeline_copy = self.copy()
pipeline_copy.configure(**kwargs)
run_args = dict_utils.remove_none_values(
{
"run_name": run_name,
"schedule": schedule,
"build": build,
"step_configurations": step_configurations,
"config_path": config_path,
"unlisted": unlisted,
"prevent_build_reuse": prevent_build_reuse,
}
)
pipeline_copy._run_args.update(run_args)
return pipeline_copy
def copy(self) -> "Pipeline":
"""Copies the pipeline.
Returns:
The pipeline copy.
"""
return copy.deepcopy(self)
def __call__(self, *args: Any, **kwargs: Any) -> Any:
"""Handle a call of the pipeline.
This method does one of two things:
* If there is an active pipeline context, it calls the pipeline
entrypoint function within that context and the step invocations
will be added to the active pipeline.
* If no pipeline is active, it activates this pipeline before calling
the entrypoint function.
Args:
*args: Entrypoint function arguments.
**kwargs: Entrypoint function keyword arguments.
Returns:
The outputs of the entrypoint function call.
"""
if Pipeline.ACTIVE_PIPELINE:
# Calling a pipeline inside a pipeline, we return the potential
# outputs of the entrypoint function
# TODO: This currently ignores the configuration of the pipeline
# and instead applies the configuration of the previously active
# pipeline. Is this what we want?
return self.entrypoint(*args, **kwargs)
self.prepare(*args, **kwargs)
return self._run(**self._run_args)
def _call_entrypoint(self, *args: Any, **kwargs: Any) -> None:
"""Calls the pipeline entrypoint function with the given arguments.
Args:
*args: Entrypoint function arguments.
**kwargs: Entrypoint function keyword arguments.
Raises:
ValueError: If an input argument is missing or not JSON
serializable.
"""
try:
validated_args = pydantic_utils.validate_function_args(
self.entrypoint,
{"arbitrary_types_allowed": False, "smart_union": True},
*args,
**kwargs,
)
except ValidationError as e:
raise ValueError(
"Invalid or missing inputs for pipeline entrypoint function. "
"Only JSON serializable inputs are allowed as pipeline inputs."
) from e
self._parameters = validated_args
self.entrypoint(**validated_args)
def _prepare_if_possible(self) -> None:
"""Prepares the pipeline if possible.
Raises:
RuntimeError: If the pipeline is not prepared and the preparation
requires parameters.
"""
if not self.is_prepared:
if self.requires_parameters:
raise RuntimeError(
f"Failed while trying to prepare pipeline {self.name}. "
"The entrypoint function of the pipeline requires "
"arguments. Please prepare the pipeline by calling "
"`pipeline_instance.prepare(...)` and try again."
)
else:
self.prepare()
configuration: PipelineConfiguration
property
readonly
The configuration of the pipeline.
Returns:
Type | Description |
---|---|
PipelineConfiguration |
The configuration of the pipeline. |
enable_cache: Optional[bool]
property
readonly
If caching is enabled for the pipeline.
Returns:
Type | Description |
---|---|
Optional[bool] |
If caching is enabled for the pipeline. |
invocations: Dict[str, zenml.steps.step_invocation.StepInvocation]
property
readonly
Returns the step invocations of this pipeline.
This dictionary will only be populated once the pipeline has been called.
Returns:
Type | Description |
---|---|
Dict[str, zenml.steps.step_invocation.StepInvocation] |
The step invocations. |
is_prepared: bool
property
readonly
If the pipeline is prepared.
Prepared means that the pipeline entrypoint has been called and the pipeline is fully defined.
Returns:
Type | Description |
---|---|
bool |
If the pipeline is prepared. |
model: PipelineResponseModel
property
readonly
Gets the registered pipeline model for this instance.
Returns:
Type | Description |
---|---|
PipelineResponseModel |
The registered pipeline model. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the pipeline has not been registered yet. |
name: str
property
readonly
The name of the pipeline.
Returns:
Type | Description |
---|---|
str |
The name of the pipeline. |
requires_parameters: bool
property
readonly
If the pipeline entrypoint requires parameters.
Returns:
Type | Description |
---|---|
bool |
If the pipeline entrypoint requires parameters. |
source_code: str
property
readonly
The source code of this pipeline.
Returns:
Type | Description |
---|---|
str |
The source code of this pipeline. |
source_object: Any
property
readonly
The source object of this pipeline.
Returns:
Type | Description |
---|---|
Any |
The source object of this pipeline. |
__call__(self, *args, **kwargs)
special
Handle a call of the pipeline.
This method does one of two things: * If there is an active pipeline context, it calls the pipeline entrypoint function within that context and the step invocations will be added to the active pipeline. * If no pipeline is active, it activates this pipeline before calling the entrypoint function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
Entrypoint function arguments. |
() |
**kwargs |
Any |
Entrypoint function keyword arguments. |
{} |
Returns:
Type | Description |
---|---|
Any |
The outputs of the entrypoint function call. |
Source code in zenml/new/pipelines/pipeline.py
def __call__(self, *args: Any, **kwargs: Any) -> Any:
"""Handle a call of the pipeline.
This method does one of two things:
* If there is an active pipeline context, it calls the pipeline
entrypoint function within that context and the step invocations
will be added to the active pipeline.
* If no pipeline is active, it activates this pipeline before calling
the entrypoint function.
Args:
*args: Entrypoint function arguments.
**kwargs: Entrypoint function keyword arguments.
Returns:
The outputs of the entrypoint function call.
"""
if Pipeline.ACTIVE_PIPELINE:
# Calling a pipeline inside a pipeline, we return the potential
# outputs of the entrypoint function
# TODO: This currently ignores the configuration of the pipeline
# and instead applies the configuration of the previously active
# pipeline. Is this what we want?
return self.entrypoint(*args, **kwargs)
self.prepare(*args, **kwargs)
return self._run(**self._run_args)
__enter__(self)
special
Activate the pipeline context.
Exceptions:
Type | Description |
---|---|
RuntimeError |
If a different pipeline is already active. |
Returns:
Type | Description |
---|---|
~T |
The pipeline instance. |
Source code in zenml/new/pipelines/pipeline.py
def __enter__(self: T) -> T:
"""Activate the pipeline context.
Raises:
RuntimeError: If a different pipeline is already active.
Returns:
The pipeline instance.
"""
if Pipeline.ACTIVE_PIPELINE:
raise RuntimeError(
"Unable to enter pipeline context. A different pipeline "
f"{Pipeline.ACTIVE_PIPELINE.name} is already active."
)
Pipeline.ACTIVE_PIPELINE = self
return self
__exit__(self, *args)
special
Deactivates the pipeline context.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
The arguments passed to the context exit handler. |
() |
Source code in zenml/new/pipelines/pipeline.py
def __exit__(self, *args: Any) -> None:
"""Deactivates the pipeline context.
Args:
*args: The arguments passed to the context exit handler.
"""
Pipeline.ACTIVE_PIPELINE = None
__init__(self, name, entrypoint, enable_cache=None, enable_artifact_metadata=None, enable_artifact_visualization=None, enable_step_logs=None, settings=None, extra=None, on_failure=None, on_success=None, model_config=None)
special
Initializes a pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the pipeline. |
required |
entrypoint |
~F |
The entrypoint function of the pipeline. |
required |
enable_cache |
Optional[bool] |
If caching should be enabled for this pipeline. |
None |
enable_artifact_metadata |
Optional[bool] |
If artifact metadata should be enabled for this pipeline. |
None |
enable_artifact_visualization |
Optional[bool] |
If artifact visualization should be enabled for this pipeline. |
None |
enable_step_logs |
Optional[bool] |
If step logs should be enabled for this pipeline. |
None |
settings |
Optional[Mapping[str, SettingsOrDict]] |
settings for this pipeline. |
None |
extra |
Optional[Dict[str, Any]] |
Extra configurations for this pipeline. |
None |
on_failure |
Optional[HookSpecification] |
Callback function in event of failure of the step. Can
be a function with a single argument of type |
None |
on_success |
Optional[HookSpecification] |
Callback function in event of success of the step. Can
be a function with no arguments, or a source path to such a
function (e.g. |
None |
model_config |
Optional[ModelConfig] |
Model(Version) configuration for this step as |
None |
Source code in zenml/new/pipelines/pipeline.py
def __init__(
self,
name: str,
entrypoint: F,
enable_cache: Optional[bool] = None,
enable_artifact_metadata: Optional[bool] = None,
enable_artifact_visualization: Optional[bool] = None,
enable_step_logs: Optional[bool] = None,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
extra: Optional[Dict[str, Any]] = None,
on_failure: Optional["HookSpecification"] = None,
on_success: Optional["HookSpecification"] = None,
model_config: Optional["ModelConfig"] = None,
) -> None:
"""Initializes a pipeline.
Args:
name: The name of the pipeline.
entrypoint: The entrypoint function of the pipeline.
enable_cache: If caching should be enabled for this pipeline.
enable_artifact_metadata: If artifact metadata should be enabled for
this pipeline.
enable_artifact_visualization: If artifact visualization should be
enabled for this pipeline.
enable_step_logs: If step logs should be enabled for this pipeline.
settings: settings for this pipeline.
extra: Extra configurations for this pipeline.
on_failure: Callback function in event of failure of the step. Can
be a function with a single argument of type `BaseException`, or
a source path to such a function (e.g. `module.my_function`).
on_success: Callback function in event of success of the step. Can
be a function with no arguments, or a source path to such a
function (e.g. `module.my_function`).
model_config: Model(Version) configuration for this step as `ModelConfig` instance.
"""
self._invocations: Dict[str, StepInvocation] = {}
self._run_args: Dict[str, Any] = {}
self._configuration = PipelineConfiguration(
name=name,
)
self.configure(
enable_cache=enable_cache,
enable_artifact_metadata=enable_artifact_metadata,
enable_artifact_visualization=enable_artifact_visualization,
enable_step_logs=enable_step_logs,
settings=settings,
extra=extra,
on_failure=on_failure,
on_success=on_success,
model_config=model_config,
)
self.entrypoint = entrypoint
self._parameters: Dict[str, Any] = {}
add_step_invocation(self, step, input_artifacts, external_artifacts, parameters, upstream_steps, custom_id=None, allow_id_suffix=True)
Adds a step invocation to the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step |
BaseStep |
The step for which to add an invocation. |
required |
input_artifacts |
Dict[str, zenml.steps.entrypoint_function_utils.StepArtifact] |
The input artifacts for the invocation. |
required |
external_artifacts |
Dict[str, ExternalArtifact] |
The external artifacts for the invocation. |
required |
parameters |
Dict[str, Any] |
The parameters for the invocation. |
required |
upstream_steps |
Set[str] |
The upstream steps for the invocation. |
required |
custom_id |
Optional[str] |
Custom ID to use for the invocation. |
None |
allow_id_suffix |
bool |
Whether a suffix can be appended to the invocation ID. |
True |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the method is called on an inactive pipeline. |
RuntimeError |
If the invocation was called with an artifact from a different pipeline. |
Returns:
Type | Description |
---|---|
str |
The step invocation ID. |
Source code in zenml/new/pipelines/pipeline.py
def add_step_invocation(
self,
step: "BaseStep",
input_artifacts: Dict[str, StepArtifact],
external_artifacts: Dict[str, "ExternalArtifact"],
parameters: Dict[str, Any],
upstream_steps: Set[str],
custom_id: Optional[str] = None,
allow_id_suffix: bool = True,
) -> str:
"""Adds a step invocation to the pipeline.
Args:
step: The step for which to add an invocation.
input_artifacts: The input artifacts for the invocation.
external_artifacts: The external artifacts for the invocation.
parameters: The parameters for the invocation.
upstream_steps: The upstream steps for the invocation.
custom_id: Custom ID to use for the invocation.
allow_id_suffix: Whether a suffix can be appended to the invocation
ID.
Raises:
RuntimeError: If the method is called on an inactive pipeline.
RuntimeError: If the invocation was called with an artifact from
a different pipeline.
Returns:
The step invocation ID.
"""
if Pipeline.ACTIVE_PIPELINE != self:
raise RuntimeError(
"A step invocation can only be added to an active pipeline."
)
for artifact in input_artifacts.values():
if artifact.pipeline is not self:
raise RuntimeError(
"Got invalid input artifact for invocation of step "
f"{step.name}: The input artifact was produced by a step "
f"inside a different pipeline {artifact.pipeline.name}."
)
invocation_id = self._compute_invocation_id(
step=step, custom_id=custom_id, allow_suffix=allow_id_suffix
)
invocation = StepInvocation(
id=invocation_id,
step=step,
input_artifacts=input_artifacts,
external_artifacts=external_artifacts,
parameters=parameters,
upstream_steps=upstream_steps,
pipeline=self,
)
self._invocations[invocation_id] = invocation
return invocation_id
build(self, settings=None, step_configurations=None, config_path=None)
Builds Docker images for the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
settings |
Optional[Mapping[str, SettingsOrDict]] |
Settings for the pipeline. |
None |
step_configurations |
Optional[Mapping[str, StepConfigurationUpdateOrDict]] |
Configurations for steps of the pipeline. |
None |
config_path |
Optional[str] |
Path to a yaml configuration file. This file will
be parsed as a
|
None |
Returns:
Type | Description |
---|---|
Optional[PipelineBuildResponseModel] |
The build output. |
Source code in zenml/new/pipelines/pipeline.py
def build(
self,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
step_configurations: Optional[
Mapping[str, "StepConfigurationUpdateOrDict"]
] = None,
config_path: Optional[str] = None,
) -> Optional["PipelineBuildResponseModel"]:
"""Builds Docker images for the pipeline.
Args:
settings: Settings for the pipeline.
step_configurations: Configurations for steps of the pipeline.
config_path: Path to a yaml configuration file. This file will
be parsed as a
`zenml.config.pipeline_configurations.PipelineRunConfiguration`
object. Options provided in this file will be overwritten by
options provided in code using the other arguments of this
method.
Returns:
The build output.
"""
with track_handler(event=AnalyticsEvent.BUILD_PIPELINE):
self._prepare_if_possible()
deployment, pipeline_spec, _, _ = self._compile(
config_path=config_path,
steps=step_configurations,
settings=settings,
)
pipeline_id = self._register(pipeline_spec=pipeline_spec).id
local_repo = code_repository_utils.find_active_code_repository()
code_repository = build_utils.verify_local_repository_context(
deployment=deployment, local_repo_context=local_repo
)
return build_utils.create_pipeline_build(
deployment=deployment,
pipeline_id=pipeline_id,
code_repository=code_repository,
)
configure(self, enable_cache=None, enable_artifact_metadata=None, enable_artifact_visualization=None, enable_step_logs=None, settings=None, extra=None, on_failure=None, on_success=None, model_config=None, merge=True)
Configures the pipeline.
Configuration merging example:
* merge==True
:
pipeline.configure(extra={"key1": 1})
pipeline.configure(extra={"key2": 2}, merge=True)
pipeline.configuration.extra # {"key1": 1, "key2": 2}
* merge==False
:
pipeline.configure(extra={"key1": 1})
pipeline.configure(extra={"key2": 2}, merge=False)
pipeline.configuration.extra # {"key2": 2}
Parameters:
Name | Type | Description | Default |
---|---|---|---|
enable_cache |
Optional[bool] |
If caching should be enabled for this pipeline. |
None |
enable_artifact_metadata |
Optional[bool] |
If artifact metadata should be enabled for this pipeline. |
None |
enable_artifact_visualization |
Optional[bool] |
If artifact visualization should be enabled for this pipeline. |
None |
enable_step_logs |
Optional[bool] |
If step logs should be enabled for this pipeline. |
None |
settings |
Optional[Mapping[str, SettingsOrDict]] |
settings for this pipeline. |
None |
extra |
Optional[Dict[str, Any]] |
Extra configurations for this pipeline. |
None |
on_failure |
Optional[HookSpecification] |
Callback function in event of failure of the step. Can
be a function with a single argument of type |
None |
on_success |
Optional[HookSpecification] |
Callback function in event of success of the step. Can
be a function with no arguments, or a source path to such a
function (e.g. |
None |
merge |
bool |
If |
True |
model_config |
Optional[ModelConfig] |
Model(Version) configuration for this step as |
None |
Returns:
Type | Description |
---|---|
~T |
The pipeline instance that this method was called on. |
Source code in zenml/new/pipelines/pipeline.py
def configure(
self: T,
enable_cache: Optional[bool] = None,
enable_artifact_metadata: Optional[bool] = None,
enable_artifact_visualization: Optional[bool] = None,
enable_step_logs: Optional[bool] = None,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
extra: Optional[Dict[str, Any]] = None,
on_failure: Optional["HookSpecification"] = None,
on_success: Optional["HookSpecification"] = None,
model_config: Optional["ModelConfig"] = None,
merge: bool = True,
) -> T:
"""Configures the pipeline.
Configuration merging example:
* `merge==True`:
pipeline.configure(extra={"key1": 1})
pipeline.configure(extra={"key2": 2}, merge=True)
pipeline.configuration.extra # {"key1": 1, "key2": 2}
* `merge==False`:
pipeline.configure(extra={"key1": 1})
pipeline.configure(extra={"key2": 2}, merge=False)
pipeline.configuration.extra # {"key2": 2}
Args:
enable_cache: If caching should be enabled for this pipeline.
enable_artifact_metadata: If artifact metadata should be enabled for
this pipeline.
enable_artifact_visualization: If artifact visualization should be
enabled for this pipeline.
enable_step_logs: If step logs should be enabled for this pipeline.
settings: settings for this pipeline.
extra: Extra configurations for this pipeline.
on_failure: Callback function in event of failure of the step. Can
be a function with a single argument of type `BaseException`, or
a source path to such a function (e.g. `module.my_function`).
on_success: Callback function in event of success of the step. Can
be a function with no arguments, or a source path to such a
function (e.g. `module.my_function`).
merge: If `True`, will merge the given dictionary configurations
like `extra` and `settings` with existing
configurations. If `False` the given configurations will
overwrite all existing ones. See the general description of this
method for an example.
model_config: Model(Version) configuration for this step as `ModelConfig` instance.
Returns:
The pipeline instance that this method was called on.
"""
failure_hook_source = None
if on_failure:
# string of on_failure hook function to be used for this pipeline
failure_hook_source = resolve_and_validate_hook(on_failure)
success_hook_source = None
if on_success:
# string of on_success hook function to be used for this pipeline
success_hook_source = resolve_and_validate_hook(on_success)
if model_config:
model_config.suppress_warnings = True
values = dict_utils.remove_none_values(
{
"enable_cache": enable_cache,
"enable_artifact_metadata": enable_artifact_metadata,
"enable_artifact_visualization": enable_artifact_visualization,
"enable_step_logs": enable_step_logs,
"settings": settings,
"extra": extra,
"failure_hook_source": failure_hook_source,
"success_hook_source": success_hook_source,
"model_config_model": ModelConfigModel.parse_obj(
model_config.dict()
)
if model_config is not None
else None,
}
)
config = PipelineConfigurationUpdate(**values)
self._apply_configuration(config, merge=merge)
return self
copy(self)
Copies the pipeline.
Returns:
Type | Description |
---|---|
Pipeline |
The pipeline copy. |
Source code in zenml/new/pipelines/pipeline.py
def copy(self) -> "Pipeline":
"""Copies the pipeline.
Returns:
The pipeline copy.
"""
return copy.deepcopy(self)
delete_running_versions_without_recovery(self, new_version_requests)
Delete the running versions of the models without restore
after fail.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
new_version_requests |
Dict[str, zenml.new.pipelines.model_utils.NewModelVersionRequest] |
Dict of models requesting new versions and their definition points. |
required |
Source code in zenml/new/pipelines/pipeline.py
def delete_running_versions_without_recovery(
self, new_version_requests: Dict[str, NewModelVersionRequest]
) -> None:
"""Delete the running versions of the models without `restore` after fail.
Args:
new_version_requests: Dict of models requesting new versions and their definition points.
"""
for model_name, new_version_request in new_version_requests.items():
if (
new_version_request.model_config.delete_new_version_on_failure
and new_version_request.model_config.version is not None
):
model = Client().get_model_version(
model_name_or_id=model_name,
model_version_name_or_number_or_id=new_version_request.model_config.version,
)
Client().delete_model_version(
model_name_or_id=model_name,
model_version_name_or_id=model.id,
)
from_model(model)
classmethod
Creates a pipeline instance from a model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
PipelineResponseModel |
The model to load the pipeline instance from. |
required |
Returns:
Type | Description |
---|---|
Pipeline |
The pipeline instance. |
Source code in zenml/new/pipelines/pipeline.py
@classmethod
def from_model(cls, model: "PipelineResponseModel") -> "Pipeline":
"""Creates a pipeline instance from a model.
Args:
model: The model to load the pipeline instance from.
Returns:
The pipeline instance.
"""
from zenml.new.pipelines.deserialization_utils import load_pipeline
return load_pipeline(model=model)
get_new_version_requests(self, deployment)
Get the running versions of the models that are used in the pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentBaseModel |
The pipeline deployment configuration. |
required |
Returns:
Type | Description |
---|---|
Dict[str, zenml.new.pipelines.model_utils.NewModelVersionRequest] |
A dict of new model version request objects. |
Source code in zenml/new/pipelines/pipeline.py
def get_new_version_requests(
self, deployment: "PipelineDeploymentBaseModel"
) -> Dict[str, NewModelVersionRequest]:
"""Get the running versions of the models that are used in the pipeline run.
Args:
deployment: The pipeline deployment configuration.
Returns:
A dict of new model version request objects.
"""
new_versions_requested: Dict[
str, NewModelVersionRequest
] = defaultdict(NewModelVersionRequest)
all_steps_have_own_config = True
for step in deployment.step_configurations.values():
step_model_config = step.config.model_config_model
all_steps_have_own_config = (
all_steps_have_own_config
and step.config.model_config_model is not None
)
if (
step_model_config
and step_model_config.create_new_model_version
):
new_versions_requested[step_model_config.name].update_request(
step_model_config,
NewModelVersionRequest.Requester(
source="step", name=step.config.name
),
)
if not all_steps_have_own_config:
pipeline_model_config = (
deployment.pipeline_configuration.model_config_model
)
if (
pipeline_model_config
and pipeline_model_config.create_new_model_version
):
new_versions_requested[
pipeline_model_config.name
].update_request(
pipeline_model_config,
NewModelVersionRequest.Requester(
source="pipeline", name=self.name
),
)
elif deployment.pipeline_configuration.model_config_model is not None:
logger.warning(
f"ModelConfig of pipeline `{self.name}` is overridden in all steps. "
)
self._validate_new_version_requests(new_versions_requested)
return new_versions_requested
get_runs(self, **kwargs)
(Deprecated) Get runs of this pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs |
Any |
Further arguments for filtering or pagination that are
passed to |
{} |
Returns:
Type | Description |
---|---|
List[zenml.models.pipeline_run_models.PipelineRunResponseModel] |
List of runs of this pipeline. |
Source code in zenml/new/pipelines/pipeline.py
def get_runs(self, **kwargs: Any) -> List[PipelineRunResponseModel]:
"""(Deprecated) Get runs of this pipeline.
Args:
**kwargs: Further arguments for filtering or pagination that are
passed to `client.list_pipeline_runs()`.
Returns:
List of runs of this pipeline.
"""
logger.warning(
"`Pipeline.get_runs()` is deprecated and will be removed in a "
"future version. Please use `Pipeline.model.get_runs()` instead."
)
return self.model.get_runs(**kwargs)
log_pipeline_deployment_metadata(deployment_model)
staticmethod
Displays logs based on the deployment model upon running a pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment_model |
PipelineDeploymentResponseModel |
The model for the pipeline deployment |
required |
Source code in zenml/new/pipelines/pipeline.py
@staticmethod
def log_pipeline_deployment_metadata(
deployment_model: PipelineDeploymentResponseModel,
) -> None:
"""Displays logs based on the deployment model upon running a pipeline.
Args:
deployment_model: The model for the pipeline deployment
"""
try:
# Log about the schedule/run
if deployment_model.schedule:
logger.info(
"Scheduling a run with the schedule: "
f"`{deployment_model.schedule.name}`"
)
else:
logger.info("Executing a new run.")
# Log about the caching status
if deployment_model.pipeline_configuration.enable_cache is False:
logger.info(
f"Caching is disabled by default for "
f"`{deployment_model.pipeline_configuration.name}`."
)
# Log about the used builds
if deployment_model.build:
logger.info("Using a build:")
logger.info(
" Image(s): "
f"{', '.join([i.image for i in deployment_model.build.images.values()])}"
)
# Log about version mismatches between local and build
from zenml import __version__
if deployment_model.build.zenml_version != __version__:
logger.info(
f"ZenML version (different than the local version): "
f"{deployment_model.build.zenml_version}"
)
import platform
if (
deployment_model.build.python_version
!= platform.python_version()
):
logger.info(
f"Python version (different than the local version): "
f"{deployment_model.build.python_version}"
)
# Log about the user, stack and components
logger.info(f"Using user: `{deployment_model.user.name}`")
if deployment_model.stack is not None:
logger.info(f"Using stack: `{deployment_model.stack.name}`")
for (
component_type,
component_models,
) in deployment_model.stack.components.items():
logger.info(
f" {component_type.value}: `{component_models[0].name}`"
)
except Exception as e:
logger.debug(f"Logging pipeline deployment metadata failed: {e}")
prepare(self, *args, **kwargs)
Prepares the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
Pipeline entrypoint input arguments. |
() |
**kwargs |
Any |
Pipeline entrypoint input keyword arguments. |
{} |
Source code in zenml/new/pipelines/pipeline.py
def prepare(self, *args: Any, **kwargs: Any) -> None:
"""Prepares the pipeline.
Args:
*args: Pipeline entrypoint input arguments.
**kwargs: Pipeline entrypoint input keyword arguments.
"""
# Clear existing parameters and invocations
self._parameters = {}
self._invocations = {}
with self:
# Enter the context manager, so we become the active pipeline. This
# means that all steps that get called while the entrypoint function
# is executed will be added as invocation to this pipeline instance.
self._call_entrypoint(*args, **kwargs)
register(self)
Register the pipeline in the server.
Returns:
Type | Description |
---|---|
PipelineResponseModel |
The registered pipeline model. |
Source code in zenml/new/pipelines/pipeline.py
def register(self) -> "PipelineResponseModel":
"""Register the pipeline in the server.
Returns:
The registered pipeline model.
"""
# Activating the built-in integrations to load all materializers
from zenml.integrations.registry import integration_registry
self._prepare_if_possible()
integration_registry.activate_integrations()
if self.configuration.dict(exclude_defaults=True, exclude={"name"}):
logger.warning(
f"The pipeline `{self.name}` that you're registering has "
"custom configurations applied to it. These will not be "
"registered with the pipeline and won't be set when you build "
"images or run the pipeline from the CLI. To provide these "
"configurations, use the `--config` option of the `zenml "
"pipeline build/run` commands."
)
pipeline_spec = Compiler().compile_spec(self)
return self._register(pipeline_spec=pipeline_spec)
register_running_versions(self, new_version_requests)
Registers the running versions of the models used in the given pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
new_version_requests |
Dict[str, zenml.new.pipelines.model_utils.NewModelVersionRequest] |
Dict of models requesting new versions and their definition points. |
required |
Source code in zenml/new/pipelines/pipeline.py
def register_running_versions(
self, new_version_requests: Dict[str, NewModelVersionRequest]
) -> None:
"""Registers the running versions of the models used in the given pipeline run.
Args:
new_version_requests: Dict of models requesting new versions and their definition points.
"""
for model_name, new_version_request in new_version_requests.items():
if new_version_request.model_config.delete_new_version_on_failure:
mv = Client().get_model_version(
model_name_or_id=model_name,
model_version_name_or_number_or_id=new_version_request.model_config.version,
)
mv._update_default_running_version_name()
resolve(self)
Resolves the pipeline.
Returns:
Type | Description |
---|---|
Source |
The pipeline source. |
Source code in zenml/new/pipelines/pipeline.py
def resolve(self) -> "Source":
"""Resolves the pipeline.
Returns:
The pipeline source.
"""
return source_utils.resolve(self.entrypoint, skip_validation=True)
update_new_versions_requests(self, deployment, new_version_requests)
Update model configurations that are used in the pipeline run.
This method is updating create_new_model_version for all model configurations in the pipeline, who deal with model name with existing request to create a new mode version.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentBaseModel |
The pipeline deployment configuration. |
required |
new_version_requests |
Dict[str, zenml.new.pipelines.model_utils.NewModelVersionRequest] |
Dict of models requesting new versions and their definition points. |
required |
Returns:
Type | Description |
---|---|
PipelineDeploymentBaseModel |
Updated pipeline deployment configuration. |
Source code in zenml/new/pipelines/pipeline.py
def update_new_versions_requests(
self,
deployment: "PipelineDeploymentBaseModel",
new_version_requests: Dict[str, NewModelVersionRequest],
) -> "PipelineDeploymentBaseModel":
"""Update model configurations that are used in the pipeline run.
This method is updating create_new_model_version for all model configurations in the pipeline,
who deal with model name with existing request to create a new mode version.
Args:
deployment: The pipeline deployment configuration.
new_version_requests: Dict of models requesting new versions and their definition points.
Returns:
Updated pipeline deployment configuration.
"""
for step_name in deployment.step_configurations:
step_model_config = deployment.step_configurations[
step_name
].config.model_config_model
if (
step_model_config is not None
and step_model_config.name in new_version_requests
):
step_model_config.version = new_version_requests[
step_model_config.name
].model_config.version
step_model_config.create_new_model_version = True
pipeline_model_config = (
deployment.pipeline_configuration.model_config_model
)
if (
pipeline_model_config is not None
and pipeline_model_config.name in new_version_requests
):
pipeline_model_config.version = new_version_requests[
pipeline_model_config.name
].model_config.version
pipeline_model_config.create_new_model_version = True
return deployment
with_options(self, run_name=None, schedule=None, build=None, step_configurations=None, config_path=None, unlisted=False, prevent_build_reuse=False, **kwargs)
Copies the pipeline and applies the given configurations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_name |
Optional[str] |
Name of the pipeline run. |
None |
schedule |
Optional[zenml.config.schedule.Schedule] |
Optional schedule to use for the run. |
None |
build |
Union[str, UUID, PipelineBuildBaseModel] |
Optional build to use for the run. |
None |
step_configurations |
Optional[Mapping[str, StepConfigurationUpdateOrDict]] |
Configurations for steps of the pipeline. |
None |
config_path |
Optional[str] |
Path to a yaml configuration file. This file will
be parsed as a
|
None |
unlisted |
bool |
Whether the pipeline run should be unlisted (not assigned to any pipeline). |
False |
prevent_build_reuse |
bool |
Whether to prevent the reuse of a build. |
False |
**kwargs |
Any |
Pipeline configuration options. These will be passed
to the |
{} |
Returns:
Type | Description |
---|---|
Pipeline |
The copied pipeline instance. |
Source code in zenml/new/pipelines/pipeline.py
def with_options(
self,
run_name: Optional[str] = None,
schedule: Optional[Schedule] = None,
build: Union[str, "UUID", "PipelineBuildBaseModel", None] = None,
step_configurations: Optional[
Mapping[str, "StepConfigurationUpdateOrDict"]
] = None,
config_path: Optional[str] = None,
unlisted: bool = False,
prevent_build_reuse: bool = False,
**kwargs: Any,
) -> "Pipeline":
"""Copies the pipeline and applies the given configurations.
Args:
run_name: Name of the pipeline run.
schedule: Optional schedule to use for the run.
build: Optional build to use for the run.
step_configurations: Configurations for steps of the pipeline.
config_path: Path to a yaml configuration file. This file will
be parsed as a
`zenml.config.pipeline_configurations.PipelineRunConfiguration`
object. Options provided in this file will be overwritten by
options provided in code using the other arguments of this
method.
unlisted: Whether the pipeline run should be unlisted (not assigned
to any pipeline).
prevent_build_reuse: Whether to prevent the reuse of a build.
**kwargs: Pipeline configuration options. These will be passed
to the `pipeline.configure(...)` method.
Returns:
The copied pipeline instance.
"""
pipeline_copy = self.copy()
pipeline_copy.configure(**kwargs)
run_args = dict_utils.remove_none_values(
{
"run_name": run_name,
"schedule": schedule,
"build": build,
"step_configurations": step_configurations,
"config_path": config_path,
"unlisted": unlisted,
"prevent_build_reuse": prevent_build_reuse,
}
)
pipeline_copy._run_args.update(run_args)
return pipeline_copy
write_run_configuration_template(self, path, stack=None)
Writes a run configuration yaml template.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path where the template will be written. |
required |
stack |
Optional[Stack] |
The stack for which the template should be generated. If not given, the active stack will be used. |
None |
Source code in zenml/new/pipelines/pipeline.py
def write_run_configuration_template(
self, path: str, stack: Optional["Stack"] = None
) -> None:
"""Writes a run configuration yaml template.
Args:
path: The path where the template will be written.
stack: The stack for which the template should be generated. If
not given, the active stack will be used.
"""
from zenml.config.base_settings import ConfigurationLevel
from zenml.config.step_configurations import (
PartialArtifactConfiguration,
)
self._prepare_if_possible()
stack = stack or Client().active_stack
setting_classes = stack.setting_classes
setting_classes.update(settings_utils.get_general_settings())
pipeline_settings = {}
step_settings = {}
for key, setting_class in setting_classes.items():
fields = pydantic_utils.TemplateGenerator(setting_class).run()
if ConfigurationLevel.PIPELINE in setting_class.LEVEL:
pipeline_settings[key] = fields
if ConfigurationLevel.STEP in setting_class.LEVEL:
step_settings[key] = fields
steps = {}
for step_name, invocation in self.invocations.items():
step = invocation.step
parameters = (
pydantic_utils.TemplateGenerator(
step.entrypoint_definition.legacy_params.annotation
).run()
if step.entrypoint_definition.legacy_params
else {}
)
outputs = {
name: PartialArtifactConfiguration()
for name in step.entrypoint_definition.outputs
}
step_template = StepConfigurationUpdate(
parameters=parameters,
settings=step_settings,
outputs=outputs,
)
steps[step_name] = step_template
run_config = PipelineRunConfiguration(
settings=pipeline_settings, steps=steps
)
template = pydantic_utils.TemplateGenerator(run_config).run()
yaml_string = yaml.dump(template)
yaml_string = yaml_utils.comment_out_yaml(yaml_string)
with open(path, "w") as f:
f.write(yaml_string)
pipeline_context
Pipeline context class.
PipelineContext
Provides pipeline configuration during its' composition.
Usage example:
from zenml import get_pipeline_context
...
@pipeline(
extra={
"complex_parameter": [
("sklearn.tree", "DecisionTreeClassifier"),
("sklearn.ensemble", "RandomForestClassifier"),
]
}
)
def my_pipeline():
context = get_pipeline_context()
after = []
search_steps_prefix = "hp_tuning_search_"
for i, model_search_configuration in enumerate(
context.extra["complex_parameter"]
):
step_name = f"{search_steps_prefix}{i}"
cross_validation(
model_package=model_search_configuration[0],
model_class=model_search_configuration[1],
id=step_name
)
after.append(step_name)
select_best_model(
search_steps_prefix=search_steps_prefix,
after=after,
)
Source code in zenml/new/pipelines/pipeline_context.py
class PipelineContext:
"""Provides pipeline configuration during its' composition.
Usage example:
```python
from zenml import get_pipeline_context
...
@pipeline(
extra={
"complex_parameter": [
("sklearn.tree", "DecisionTreeClassifier"),
("sklearn.ensemble", "RandomForestClassifier"),
]
}
)
def my_pipeline():
context = get_pipeline_context()
after = []
search_steps_prefix = "hp_tuning_search_"
for i, model_search_configuration in enumerate(
context.extra["complex_parameter"]
):
step_name = f"{search_steps_prefix}{i}"
cross_validation(
model_package=model_search_configuration[0],
model_class=model_search_configuration[1],
id=step_name
)
after.append(step_name)
select_best_model(
search_steps_prefix=search_steps_prefix,
after=after,
)
```
"""
def __init__(self, pipeline_configuration: "PipelineConfiguration"):
"""Initialize the context of the currently composing pipeline.
Args:
pipeline_configuration: The configuration of the pipeline derived from Pipeline class.
"""
self.name = pipeline_configuration.name
self.enable_cache = pipeline_configuration.enable_cache
self.enable_artifact_metadata = (
pipeline_configuration.enable_artifact_metadata
)
self.enable_artifact_visualization = (
pipeline_configuration.enable_artifact_visualization
)
self.enable_step_logs = pipeline_configuration.enable_step_logs
self.settings = pipeline_configuration.settings
self.extra = pipeline_configuration.extra
__init__(self, pipeline_configuration)
special
Initialize the context of the currently composing pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_configuration |
PipelineConfiguration |
The configuration of the pipeline derived from Pipeline class. |
required |
Source code in zenml/new/pipelines/pipeline_context.py
def __init__(self, pipeline_configuration: "PipelineConfiguration"):
"""Initialize the context of the currently composing pipeline.
Args:
pipeline_configuration: The configuration of the pipeline derived from Pipeline class.
"""
self.name = pipeline_configuration.name
self.enable_cache = pipeline_configuration.enable_cache
self.enable_artifact_metadata = (
pipeline_configuration.enable_artifact_metadata
)
self.enable_artifact_visualization = (
pipeline_configuration.enable_artifact_visualization
)
self.enable_step_logs = pipeline_configuration.enable_step_logs
self.settings = pipeline_configuration.settings
self.extra = pipeline_configuration.extra
get_pipeline_context()
Get the context of the currently composing pipeline.
Returns:
Type | Description |
---|---|
PipelineContext |
The context of the currently composing pipeline. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If no active pipeline is found. |
RuntimeError |
If inside a running step. |
Source code in zenml/new/pipelines/pipeline_context.py
def get_pipeline_context() -> "PipelineContext":
"""Get the context of the currently composing pipeline.
Returns:
The context of the currently composing pipeline.
Raises:
RuntimeError: If no active pipeline is found.
RuntimeError: If inside a running step.
"""
from zenml.new.pipelines.pipeline import Pipeline
if Pipeline.ACTIVE_PIPELINE is None:
try:
from zenml.new.steps.step_context import get_step_context
get_step_context()
except RuntimeError:
raise RuntimeError("No active pipeline found.")
else:
raise RuntimeError(
"Inside a step use `from zenml import get_step_context` instead."
)
return PipelineContext(
pipeline_configuration=Pipeline.ACTIVE_PIPELINE.configuration
)
pipeline_decorator
ZenML pipeline decorator definition.
pipeline(_func=None, *, name=None, enable_cache=None, enable_artifact_metadata=None, enable_step_logs=None, settings=None, extra=None, on_failure=None, on_success=None, model_config=None)
Decorator to create a pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
_func |
Optional[F] |
The decorated function. |
None |
name |
Optional[str] |
The name of the pipeline. If left empty, the name of the decorated function will be used as a fallback. |
None |
enable_cache |
Optional[bool] |
Whether to use caching or not. |
None |
enable_artifact_metadata |
Optional[bool] |
Whether to enable artifact metadata or not. |
None |
enable_step_logs |
Optional[bool] |
If step logs should be enabled for this pipeline. |
None |
settings |
Optional[Dict[str, SettingsOrDict]] |
Settings for this pipeline. |
None |
extra |
Optional[Dict[str, Any]] |
Extra configurations for this pipeline. |
None |
on_failure |
Optional[HookSpecification] |
Callback function in event of failure of the step. Can be a
function with a single argument of type |
None |
on_success |
Optional[HookSpecification] |
Callback function in event of success of the step. Can be a
function with no arguments, or a source path to such a function
(e.g. |
None |
model_config |
Optional[ModelConfig] |
Model(Version) configuration for this step as |
None |
Returns:
Type | Description |
---|---|
Union[Pipeline, Callable[[F], Pipeline]] |
A pipeline instance. |
Source code in zenml/new/pipelines/pipeline_decorator.py
def pipeline(
_func: Optional["F"] = None,
*,
name: Optional[str] = None,
enable_cache: Optional[bool] = None,
enable_artifact_metadata: Optional[bool] = None,
enable_step_logs: Optional[bool] = None,
settings: Optional[Dict[str, "SettingsOrDict"]] = None,
extra: Optional[Dict[str, Any]] = None,
on_failure: Optional["HookSpecification"] = None,
on_success: Optional["HookSpecification"] = None,
model_config: Optional["ModelConfig"] = None,
) -> Union["Pipeline", Callable[["F"], "Pipeline"]]:
"""Decorator to create a pipeline.
Args:
_func: The decorated function.
name: The name of the pipeline. If left empty, the name of the
decorated function will be used as a fallback.
enable_cache: Whether to use caching or not.
enable_artifact_metadata: Whether to enable artifact metadata or not.
enable_step_logs: If step logs should be enabled for this pipeline.
settings: Settings for this pipeline.
extra: Extra configurations for this pipeline.
on_failure: Callback function in event of failure of the step. Can be a
function with a single argument of type `BaseException`, or a source
path to such a function (e.g. `module.my_function`).
on_success: Callback function in event of success of the step. Can be a
function with no arguments, or a source path to such a function
(e.g. `module.my_function`).
model_config: Model(Version) configuration for this step as `ModelConfig` instance.
Returns:
A pipeline instance.
"""
def inner_decorator(func: "F") -> "Pipeline":
from zenml.new.pipelines.pipeline import Pipeline
p = Pipeline(
name=name or func.__name__,
enable_cache=enable_cache,
enable_artifact_metadata=enable_artifact_metadata,
enable_step_logs=enable_step_logs,
settings=settings,
extra=extra,
on_failure=on_failure,
on_success=on_success,
model_config=model_config,
entrypoint=func,
)
p.__doc__ = func.__doc__
return p
return inner_decorator if _func is None else inner_decorator(_func)
steps
special
decorated_step
Internal BaseStep subclass used by the step decorator.
log_artifact_metadata
Utility functions for logging artifact metadata.
log_artifact_metadata(output_name=None, **kwargs)
Log artifact metadata.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_name |
Optional[str] |
The output name of the artifact to log metadata for. Can be omitted if there is only one output artifact. |
None |
**kwargs |
Union[str, int, float, bool, Dict[Any, Any], List[Any], Set[Any], Tuple[Any, ...], zenml.metadata.metadata_types.Uri, zenml.metadata.metadata_types.Path, zenml.metadata.metadata_types.DType, zenml.metadata.metadata_types.StorageSize] |
Metadata to log. |
{} |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the function is called outside of a step. |
ValueError |
If no output name is provided and there is more than one output or if the output name is does not exist. |
Source code in zenml/new/steps/log_artifact_metadata.py
def log_artifact_metadata(
output_name: Optional[str] = None,
**kwargs: MetadataType,
) -> None:
"""Log artifact metadata.
Args:
output_name: The output name of the artifact to log metadata for. Can
be omitted if there is only one output artifact.
**kwargs: Metadata to log.
Raises:
RuntimeError: If the function is called outside of a step.
ValueError: If no output name is provided and there is more than one
output or if the output name is does not exist.
"""
if not kwargs:
return
try:
step_context = get_step_context()
except StepContextError:
raise RuntimeError("Cannot log artifact metadata outside of a step.")
try:
step_context.add_output_metadata(output_name=output_name, **kwargs)
except StepContextError as e:
raise ValueError(e)
log_deployment_metadata(output_name=None, description=None, predict_url=None, explain_url=None, healthcheck_url=None, deployer_ui_url=None, **kwargs)
Log metadata for a deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_name |
Optional[str] |
The output name of the artifact to log metadata for. Can be omitted if there is only one output artifact. |
None |
description |
Optional[str] |
A description of the deployment. |
None |
predict_url |
Optional[str] |
The predict URL of the deployment. |
None |
explain_url |
Optional[str] |
The explain URL of the deployment. |
None |
healthcheck_url |
Optional[str] |
The healthcheck URL of the deployment. |
None |
deployer_ui_url |
Optional[str] |
The deployer UI URL of the deployment. |
None |
**kwargs |
Union[str, int, float, bool, Dict[Any, Any], List[Any], Set[Any], Tuple[Any, ...], zenml.metadata.metadata_types.Uri, zenml.metadata.metadata_types.Path, zenml.metadata.metadata_types.DType, zenml.metadata.metadata_types.StorageSize] |
Other metadata to log. |
{} |
Source code in zenml/new/steps/log_artifact_metadata.py
def log_deployment_metadata(
output_name: Optional[str] = None,
description: Optional[str] = None,
predict_url: Optional[str] = None,
explain_url: Optional[str] = None,
healthcheck_url: Optional[str] = None,
deployer_ui_url: Optional[str] = None,
**kwargs: MetadataType,
) -> None:
"""Log metadata for a deployment.
Args:
output_name: The output name of the artifact to log metadata for. Can
be omitted if there is only one output artifact.
description: A description of the deployment.
predict_url: The predict URL of the deployment.
explain_url: The explain URL of the deployment.
healthcheck_url: The healthcheck URL of the deployment.
deployer_ui_url: The deployer UI URL of the deployment.
**kwargs: Other metadata to log.
"""
if description:
kwargs["description"] = description
if predict_url:
kwargs["predict_url"] = predict_url
if explain_url:
kwargs["explain_url"] = explain_url
if healthcheck_url:
kwargs["healthcheck_url"] = healthcheck_url
if deployer_ui_url:
kwargs["deployer_ui_url"] = deployer_ui_url
log_artifact_metadata(
output_name=output_name,
**kwargs,
)
log_model_object_metadata(output_name=None, description=None, metrics=None, hyperparameters=None, **kwargs)
Log metadata for a model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_name |
Optional[str] |
The output name of the artifact to log metadata for. Can be omitted if there is only one output artifact. |
None |
description |
Optional[str] |
A description of the model. |
None |
metrics |
Optional[Dict[str, Union[str, int, float, bool, Dict[Any, Any], List[Any], Set[Any], Tuple[Any, ...], zenml.metadata.metadata_types.Uri, zenml.metadata.metadata_types.Path, zenml.metadata.metadata_types.DType, zenml.metadata.metadata_types.StorageSize]]] |
The metrics to log. |
None |
hyperparameters |
Optional[Dict[str, Union[str, int, float, bool, Dict[Any, Any], List[Any], Set[Any], Tuple[Any, ...], zenml.metadata.metadata_types.Uri, zenml.metadata.metadata_types.Path, zenml.metadata.metadata_types.DType, zenml.metadata.metadata_types.StorageSize]]] |
The hyperparameters to log. |
None |
**kwargs |
Union[str, int, float, bool, Dict[Any, Any], List[Any], Set[Any], Tuple[Any, ...], zenml.metadata.metadata_types.Uri, zenml.metadata.metadata_types.Path, zenml.metadata.metadata_types.DType, zenml.metadata.metadata_types.StorageSize] |
Other metadata to log. |
{} |
Source code in zenml/new/steps/log_artifact_metadata.py
def log_model_object_metadata(
output_name: Optional[str] = None,
description: Optional[str] = None,
metrics: Optional[Dict[str, MetadataType]] = None,
hyperparameters: Optional[Dict[str, MetadataType]] = None,
**kwargs: MetadataType,
) -> None:
"""Log metadata for a model.
Args:
output_name: The output name of the artifact to log metadata for. Can
be omitted if there is only one output artifact.
description: A description of the model.
metrics: The metrics to log.
hyperparameters: The hyperparameters to log.
**kwargs: Other metadata to log.
"""
if description:
kwargs["description"] = description
if metrics:
kwargs["metrics"] = metrics
if hyperparameters:
kwargs["hyperparameters"] = hyperparameters
log_artifact_metadata(
output_name=output_name,
**kwargs,
)
step_context
Step context class.
StepContext
Provides additional context inside a step function.
This singleton class is used to access information about the current run, step run, or its outputs inside a step function.
Usage example:
from zenml.steps import get_step_context
@step
def my_trainer_step() -> Any:
context = get_step_context()
# get info about the current pipeline run
current_pipeline_run = context.pipeline_run
# get info about the current step run
current_step_run = context.step_run
# get info about the future output artifacts of this step
output_artifact_uri = context.get_output_artifact_uri()
...
Source code in zenml/new/steps/step_context.py
class StepContext(metaclass=SingletonMetaClass):
"""Provides additional context inside a step function.
This singleton class is used to access information about the current run,
step run, or its outputs inside a step function.
Usage example:
```python
from zenml.steps import get_step_context
@step
def my_trainer_step() -> Any:
context = get_step_context()
# get info about the current pipeline run
current_pipeline_run = context.pipeline_run
# get info about the current step run
current_step_run = context.step_run
# get info about the future output artifacts of this step
output_artifact_uri = context.get_output_artifact_uri()
...
```
"""
def __init__(
self,
pipeline_run: "PipelineRunResponseModel",
step_run: "StepRunResponseModel",
output_materializers: Mapping[str, Sequence[Type["BaseMaterializer"]]],
output_artifact_uris: Mapping[str, str],
output_artifact_configs: Mapping[str, Optional["ArtifactConfig"]],
step_run_info: "StepRunInfo",
cache_enabled: bool,
) -> None:
"""Initialize the context of the currently running step.
Args:
pipeline_run: The model of the current pipeline run.
step_run: The model of the current step run.
output_materializers: The output materializers of the step that
this context is used in.
output_artifact_uris: The output artifacts of the step that this
context is used in.
output_artifact_configs: The outputs' ArtifactConfigs of the step that this
context is used in.
step_run_info: (Deprecated) info about the currently running step.
cache_enabled: (Deprecated) Whether caching is enabled for the step.
Raises:
StepContextError: If the keys of the output materializers and
output artifacts do not match.
"""
from zenml.client import Client
self.pipeline_run = pipeline_run
self.step_run = step_run
self._step_run_info = step_run_info
self._cache_enabled = cache_enabled
# Get the stack that we are running in
self._stack = Client().active_stack
self.step_name = self.step_run.name
# set outputs
if output_materializers.keys() != output_artifact_uris.keys():
raise StepContextError(
f"Mismatched keys in output materializers and output artifact "
f"URIs for step '{self.step_name}'. Output materializer "
f"keys: {set(output_materializers)}, output artifact URI "
f"keys: {set(output_artifact_uris)}"
)
self._outputs = {
key: StepContextOutput(
materializer_classes=output_materializers[key],
artifact_uri=output_artifact_uris[key],
artifact_config=output_artifact_configs[key],
)
for key in output_materializers.keys()
}
def _get_output(
self, output_name: Optional[str] = None
) -> "StepContextOutput":
"""Returns the materializer and artifact URI for a given step output.
Args:
output_name: Optional name of the output for which to get the
materializer and URI.
Returns:
Tuple containing the materializer and artifact URI for the
given output.
Raises:
StepContextError: If the step has no outputs, no output for
the given `output_name` or if no `output_name` was given but
the step has multiple outputs.
"""
output_count = len(self._outputs)
if output_count == 0:
raise StepContextError(
f"Unable to get step output for step '{self.step_name}': "
f"This step does not have any outputs."
)
if not output_name and output_count > 1:
raise StepContextError(
f"Unable to get step output for step '{self.step_name}': "
f"This step has multiple outputs ({set(self._outputs)}), "
f"please specify which output to return."
)
if output_name:
if output_name not in self._outputs:
raise StepContextError(
f"Unable to get step output '{output_name}' for "
f"step '{self.step_name}'. This step does not have an "
f"output with the given name, please specify one of the "
f"available outputs: {set(self._outputs)}."
)
return self._outputs[output_name]
else:
return next(iter(self._outputs.values()))
@property
def pipeline(self) -> "PipelineResponseModel":
"""Returns the current pipeline.
Returns:
The current pipeline or None.
Raises:
StepContextError: If the pipeline run does not have a pipeline.
"""
if self.pipeline_run.pipeline:
return self.pipeline_run.pipeline
raise StepContextError(
f"Unable to get pipeline in step '{self.step_name}' of pipeline "
f"run '{self.pipeline_run.id}': This pipeline run does not have "
f"a pipeline associated with it."
)
@property
def model_config(self) -> "ModelConfig":
"""Returns configured ModelConfig.
Order of resolution to search for ModelConfig is:
1. ModelConfig from @step
2. ModelConfig from @pipeline
Returns:
The `ModelConfig` object associated with the current step.
Raises:
StepContextError: If the `ModelConfig` object is not set in `@step` or `@pipeline`.
"""
if self.step_run.config.model_config is not None:
return self.step_run.config.model_config
if self.pipeline_run.config.model_config is not None:
return self.pipeline_run.config.model_config
raise StepContextError(
f"Unable to get ModelConfig in step '{self.step_name}' of pipeline "
f"run '{self.pipeline_run.id}': It was not set in `@step` or `@pipeline`."
)
@property
def stack(self) -> Optional["Stack"]:
"""(Deprecated) Returns the current active stack.
Returns:
The current active stack or None.
"""
logger.warning(
"`StepContext.stack` is deprecated and will be removed in a "
"future release. Please use `Client().active_stack` instead."
)
return self._stack
@property
def pipeline_name(self) -> str:
"""(Deprecated) Returns the current pipeline name.
Returns:
The current pipeline name or None.
Raises:
StepContextError: If the pipeline run does not have a pipeline.
"""
logger.warning(
"`StepContext.pipeline_name` is deprecated and will be removed in "
"a future release. Please use `StepContext.pipeline.name` instead."
)
if not self.pipeline:
raise StepContextError(
f"Unable to get pipeline name in step '{self.step_name}' of "
f"pipeline run '{self.pipeline_run.name}': The pipeline run "
f"does not have a pipeline associated with it."
)
return self.pipeline.name
@property
def run_name(self) -> Optional[str]:
"""(Deprecated) Returns the current run name.
Returns:
The current run name or None.
"""
logger.warning(
"`StepContext.run_name` is deprecated and will be removed in a "
"future release. Please use `StepContext.pipeline_run.name` "
"instead."
)
return self.pipeline_run.name
@property
def parameters(self) -> Dict[str, Any]:
"""(Deprecated) The step parameters.
Returns:
The step parameters.
"""
logger.warning(
"`StepContext.parameters` is deprecated and will be removed in "
"a future release. Please use "
"`StepContext.step_run.config.parameters` instead."
)
return self.step_run.config.parameters
@property
def step_run_info(self) -> "StepRunInfo":
"""(Deprecated) Info about the currently running step.
Returns:
Info about the currently running step.
"""
logger.warning(
"`StepContext.step_run_info` is deprecated and will be removed in "
"a future release. Please use `StepContext.step_run` or "
"`StepContext.pipeline_run` to access information about the "
"current run instead."
)
return self._step_run_info
@property
def cache_enabled(self) -> bool:
"""(Deprecated) Returns whether cache is enabled for the step.
Returns:
True if cache is enabled for the step, otherwise False.
"""
logger.warning(
"`StepContext.cache_enabled` is deprecated and will be removed in "
"a future release."
)
return self._cache_enabled
def get_output_materializer(
self,
output_name: Optional[str] = None,
custom_materializer_class: Optional[Type["BaseMaterializer"]] = None,
data_type: Optional[Type[Any]] = None,
) -> "BaseMaterializer":
"""Returns a materializer for a given step output.
Args:
output_name: Optional name of the output for which to get the
materializer. If no name is given and the step only has a
single output, the materializer of this output will be
returned. If the step has multiple outputs, an exception
will be raised.
custom_materializer_class: If given, this `BaseMaterializer`
subclass will be initialized with the output artifact instead
of the materializer that was registered for this step output.
data_type: If the output annotation is of type `Union` and the step
therefore has multiple materializers configured, you can provide
a data type for the output which will be used to select the
correct materializer. If not provided, the first materializer
will be used.
Returns:
A materializer initialized with the output artifact for
the given output.
"""
from zenml.utils import materializer_utils
output = self._get_output(output_name)
materializer_classes = output.materializer_classes
artifact_uri = output.artifact_uri
if custom_materializer_class:
materializer_class = custom_materializer_class
elif len(materializer_classes) == 1 or not data_type:
materializer_class = materializer_classes[0]
else:
materializer_class = materializer_utils.select_materializer(
data_type=data_type, materializer_classes=materializer_classes
)
return materializer_class(artifact_uri)
def get_output_artifact_uri(
self, output_name: Optional[str] = None
) -> str:
"""Returns the artifact URI for a given step output.
Args:
output_name: Optional name of the output for which to get the URI.
If no name is given and the step only has a single output,
the URI of this output will be returned. If the step has
multiple outputs, an exception will be raised.
Returns:
Artifact URI for the given output.
"""
return self._get_output(output_name).artifact_uri
def get_output_metadata(
self, output_name: Optional[str] = None
) -> Dict[str, "MetadataType"]:
"""Returns the metadata for a given step output.
Args:
output_name: Optional name of the output for which to get the
metadata. If no name is given and the step only has a single
output, the metadata of this output will be returned. If the
step has multiple outputs, an exception will be raised.
Returns:
Metadata for the given output.
"""
return self._get_output(output_name).metadata or {}
def add_output_metadata(
self, output_name: Optional[str] = None, **metadata: "MetadataType"
) -> None:
"""Adds metadata for a given step output.
Args:
output_name: Optional name of the output for which to add the
metadata. If no name is given and the step only has a single
output, the metadata of this output will be added. If the
step has multiple outputs, an exception will be raised.
**metadata: The metadata to add.
"""
output = self._get_output(output_name)
if not output.metadata:
output.metadata = {}
output.metadata.update(**metadata)
def _set_artifact_config(
self,
artifact_config: "ArtifactConfig",
output_name: Optional[str] = None,
) -> None:
"""Adds artifact config for a given step output.
Args:
artifact_config: The artifact config of the output to set.
output_name: Optional name of the output for which to set the
output signature. If no name is given and the step only has a single
output, the metadata of this output will be added. If the
step has multiple outputs, an exception will be raised.
Raises:
EntityExistsError: If the output already has an output signature.
"""
output = self._get_output(output_name)
if output.artifact_config is None:
output.artifact_config = artifact_config
else:
raise EntityExistsError(
f"Output with name '{output_name}' already has artifact config."
)
cache_enabled: bool
property
readonly
(Deprecated) Returns whether cache is enabled for the step.
Returns:
Type | Description |
---|---|
bool |
True if cache is enabled for the step, otherwise False. |
model_config: ModelConfig
property
readonly
Returns configured ModelConfig.
Order of resolution to search for ModelConfig is: 1. ModelConfig from @step 2. ModelConfig from @pipeline
Returns:
Type | Description |
---|---|
ModelConfig |
The |
Exceptions:
Type | Description |
---|---|
StepContextError |
If the |
parameters: Dict[str, Any]
property
readonly
(Deprecated) The step parameters.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The step parameters. |
pipeline: PipelineResponseModel
property
readonly
Returns the current pipeline.
Returns:
Type | Description |
---|---|
PipelineResponseModel |
The current pipeline or None. |
Exceptions:
Type | Description |
---|---|
StepContextError |
If the pipeline run does not have a pipeline. |
pipeline_name: str
property
readonly
(Deprecated) Returns the current pipeline name.
Returns:
Type | Description |
---|---|
str |
The current pipeline name or None. |
Exceptions:
Type | Description |
---|---|
StepContextError |
If the pipeline run does not have a pipeline. |
run_name: Optional[str]
property
readonly
(Deprecated) Returns the current run name.
Returns:
Type | Description |
---|---|
Optional[str] |
The current run name or None. |
stack: Optional[Stack]
property
readonly
(Deprecated) Returns the current active stack.
Returns:
Type | Description |
---|---|
Optional[Stack] |
The current active stack or None. |
step_run_info: StepRunInfo
property
readonly
(Deprecated) Info about the currently running step.
Returns:
Type | Description |
---|---|
StepRunInfo |
Info about the currently running step. |
__init__(self, pipeline_run, step_run, output_materializers, output_artifact_uris, output_artifact_configs, step_run_info, cache_enabled)
special
Initialize the context of the currently running step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_run |
PipelineRunResponseModel |
The model of the current pipeline run. |
required |
step_run |
StepRunResponseModel |
The model of the current step run. |
required |
output_materializers |
Mapping[str, Sequence[Type[BaseMaterializer]]] |
The output materializers of the step that this context is used in. |
required |
output_artifact_uris |
Mapping[str, str] |
The output artifacts of the step that this context is used in. |
required |
output_artifact_configs |
Mapping[str, Optional[ArtifactConfig]] |
The outputs' ArtifactConfigs of the step that this context is used in. |
required |
step_run_info |
StepRunInfo |
(Deprecated) info about the currently running step. |
required |
cache_enabled |
bool |
(Deprecated) Whether caching is enabled for the step. |
required |
Exceptions:
Type | Description |
---|---|
StepContextError |
If the keys of the output materializers and output artifacts do not match. |
Source code in zenml/new/steps/step_context.py
def __init__(
self,
pipeline_run: "PipelineRunResponseModel",
step_run: "StepRunResponseModel",
output_materializers: Mapping[str, Sequence[Type["BaseMaterializer"]]],
output_artifact_uris: Mapping[str, str],
output_artifact_configs: Mapping[str, Optional["ArtifactConfig"]],
step_run_info: "StepRunInfo",
cache_enabled: bool,
) -> None:
"""Initialize the context of the currently running step.
Args:
pipeline_run: The model of the current pipeline run.
step_run: The model of the current step run.
output_materializers: The output materializers of the step that
this context is used in.
output_artifact_uris: The output artifacts of the step that this
context is used in.
output_artifact_configs: The outputs' ArtifactConfigs of the step that this
context is used in.
step_run_info: (Deprecated) info about the currently running step.
cache_enabled: (Deprecated) Whether caching is enabled for the step.
Raises:
StepContextError: If the keys of the output materializers and
output artifacts do not match.
"""
from zenml.client import Client
self.pipeline_run = pipeline_run
self.step_run = step_run
self._step_run_info = step_run_info
self._cache_enabled = cache_enabled
# Get the stack that we are running in
self._stack = Client().active_stack
self.step_name = self.step_run.name
# set outputs
if output_materializers.keys() != output_artifact_uris.keys():
raise StepContextError(
f"Mismatched keys in output materializers and output artifact "
f"URIs for step '{self.step_name}'. Output materializer "
f"keys: {set(output_materializers)}, output artifact URI "
f"keys: {set(output_artifact_uris)}"
)
self._outputs = {
key: StepContextOutput(
materializer_classes=output_materializers[key],
artifact_uri=output_artifact_uris[key],
artifact_config=output_artifact_configs[key],
)
for key in output_materializers.keys()
}
add_output_metadata(self, output_name=None, **metadata)
Adds metadata for a given step output.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_name |
Optional[str] |
Optional name of the output for which to add the metadata. If no name is given and the step only has a single output, the metadata of this output will be added. If the step has multiple outputs, an exception will be raised. |
None |
**metadata |
MetadataType |
The metadata to add. |
{} |
Source code in zenml/new/steps/step_context.py
def add_output_metadata(
self, output_name: Optional[str] = None, **metadata: "MetadataType"
) -> None:
"""Adds metadata for a given step output.
Args:
output_name: Optional name of the output for which to add the
metadata. If no name is given and the step only has a single
output, the metadata of this output will be added. If the
step has multiple outputs, an exception will be raised.
**metadata: The metadata to add.
"""
output = self._get_output(output_name)
if not output.metadata:
output.metadata = {}
output.metadata.update(**metadata)
get_output_artifact_uri(self, output_name=None)
Returns the artifact URI for a given step output.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_name |
Optional[str] |
Optional name of the output for which to get the URI. If no name is given and the step only has a single output, the URI of this output will be returned. If the step has multiple outputs, an exception will be raised. |
None |
Returns:
Type | Description |
---|---|
str |
Artifact URI for the given output. |
Source code in zenml/new/steps/step_context.py
def get_output_artifact_uri(
self, output_name: Optional[str] = None
) -> str:
"""Returns the artifact URI for a given step output.
Args:
output_name: Optional name of the output for which to get the URI.
If no name is given and the step only has a single output,
the URI of this output will be returned. If the step has
multiple outputs, an exception will be raised.
Returns:
Artifact URI for the given output.
"""
return self._get_output(output_name).artifact_uri
get_output_materializer(self, output_name=None, custom_materializer_class=None, data_type=None)
Returns a materializer for a given step output.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_name |
Optional[str] |
Optional name of the output for which to get the materializer. If no name is given and the step only has a single output, the materializer of this output will be returned. If the step has multiple outputs, an exception will be raised. |
None |
custom_materializer_class |
Optional[Type[BaseMaterializer]] |
If given, this |
None |
data_type |
Optional[Type[Any]] |
If the output annotation is of type |
None |
Returns:
Type | Description |
---|---|
BaseMaterializer |
A materializer initialized with the output artifact for the given output. |
Source code in zenml/new/steps/step_context.py
def get_output_materializer(
self,
output_name: Optional[str] = None,
custom_materializer_class: Optional[Type["BaseMaterializer"]] = None,
data_type: Optional[Type[Any]] = None,
) -> "BaseMaterializer":
"""Returns a materializer for a given step output.
Args:
output_name: Optional name of the output for which to get the
materializer. If no name is given and the step only has a
single output, the materializer of this output will be
returned. If the step has multiple outputs, an exception
will be raised.
custom_materializer_class: If given, this `BaseMaterializer`
subclass will be initialized with the output artifact instead
of the materializer that was registered for this step output.
data_type: If the output annotation is of type `Union` and the step
therefore has multiple materializers configured, you can provide
a data type for the output which will be used to select the
correct materializer. If not provided, the first materializer
will be used.
Returns:
A materializer initialized with the output artifact for
the given output.
"""
from zenml.utils import materializer_utils
output = self._get_output(output_name)
materializer_classes = output.materializer_classes
artifact_uri = output.artifact_uri
if custom_materializer_class:
materializer_class = custom_materializer_class
elif len(materializer_classes) == 1 or not data_type:
materializer_class = materializer_classes[0]
else:
materializer_class = materializer_utils.select_materializer(
data_type=data_type, materializer_classes=materializer_classes
)
return materializer_class(artifact_uri)
get_output_metadata(self, output_name=None)
Returns the metadata for a given step output.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_name |
Optional[str] |
Optional name of the output for which to get the metadata. If no name is given and the step only has a single output, the metadata of this output will be returned. If the step has multiple outputs, an exception will be raised. |
None |
Returns:
Type | Description |
---|---|
Dict[str, MetadataType] |
Metadata for the given output. |
Source code in zenml/new/steps/step_context.py
def get_output_metadata(
self, output_name: Optional[str] = None
) -> Dict[str, "MetadataType"]:
"""Returns the metadata for a given step output.
Args:
output_name: Optional name of the output for which to get the
metadata. If no name is given and the step only has a single
output, the metadata of this output will be returned. If the
step has multiple outputs, an exception will be raised.
Returns:
Metadata for the given output.
"""
return self._get_output(output_name).metadata or {}
StepContextOutput
Represents a step output in the step context.
Source code in zenml/new/steps/step_context.py
class StepContextOutput:
"""Represents a step output in the step context."""
materializer_classes: Sequence[Type["BaseMaterializer"]]
artifact_uri: str
metadata: Optional[Dict[str, "MetadataType"]] = None
artifact_config: Optional["ArtifactConfig"]
def __init__(
self,
materializer_classes: Sequence[Type["BaseMaterializer"]],
artifact_uri: str,
artifact_config: Optional["ArtifactConfig"],
):
"""Initialize the step output.
Args:
materializer_classes: The materializer classes for the output.
artifact_uri: The artifact URI for the output.
artifact_config: The ArtifactConfig object of the output.
"""
self.materializer_classes = materializer_classes
self.artifact_uri = artifact_uri
self.artifact_config = artifact_config
__init__(self, materializer_classes, artifact_uri, artifact_config)
special
Initialize the step output.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
materializer_classes |
Sequence[Type[BaseMaterializer]] |
The materializer classes for the output. |
required |
artifact_uri |
str |
The artifact URI for the output. |
required |
artifact_config |
Optional[ArtifactConfig] |
The ArtifactConfig object of the output. |
required |
Source code in zenml/new/steps/step_context.py
def __init__(
self,
materializer_classes: Sequence[Type["BaseMaterializer"]],
artifact_uri: str,
artifact_config: Optional["ArtifactConfig"],
):
"""Initialize the step output.
Args:
materializer_classes: The materializer classes for the output.
artifact_uri: The artifact URI for the output.
artifact_config: The ArtifactConfig object of the output.
"""
self.materializer_classes = materializer_classes
self.artifact_uri = artifact_uri
self.artifact_config = artifact_config
get_step_context()
Get the context of the currently running step.
Returns:
Type | Description |
---|---|
StepContext |
The context of the currently running step. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If no step is currently running. |
Source code in zenml/new/steps/step_context.py
def get_step_context() -> "StepContext":
"""Get the context of the currently running step.
Returns:
The context of the currently running step.
Raises:
RuntimeError: If no step is currently running.
"""
if StepContext._exists():
return StepContext() # type: ignore
raise RuntimeError(
"The step context is only available inside a step function."
)
step_decorator
Step decorator function.
step(_func=None, *, name=None, enable_cache=None, enable_artifact_metadata=None, enable_artifact_visualization=None, enable_step_logs=None, experiment_tracker=None, step_operator=None, output_materializers=None, settings=None, extra=None, on_failure=None, on_success=None, model_config=None)
Decorator to create a ZenML step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
_func |
Optional[F] |
The decorated function. |
None |
name |
Optional[str] |
The name of the step. If left empty, the name of the decorated function will be used as a fallback. |
None |
enable_cache |
Optional[bool] |
Specify whether caching is enabled for this step. If no value is passed, caching is enabled by default. |
None |
enable_artifact_metadata |
Optional[bool] |
Specify whether metadata is enabled for this step. If no value is passed, metadata is enabled by default. |
None |
enable_artifact_visualization |
Optional[bool] |
Specify whether visualization is enabled for this step. If no value is passed, visualization is enabled by default. |
None |
enable_step_logs |
Optional[bool] |
Specify whether step logs are enabled for this step. |
None |
experiment_tracker |
Optional[str] |
The experiment tracker to use for this step. |
None |
step_operator |
Optional[str] |
The step operator to use for this step. |
None |
output_materializers |
Optional[OutputMaterializersSpecification] |
Output materializers for this step. If given as a dict, the keys must be a subset of the output names of this step. If a single value (type or string) is given, the materializer will be used for all outputs. |
None |
settings |
Optional[Dict[str, SettingsOrDict]] |
Settings for this step. |
None |
extra |
Optional[Dict[str, Any]] |
Extra configurations for this step. |
None |
on_failure |
Optional[HookSpecification] |
Callback function in event of failure of the step. Can be a
function with a single argument of type |
None |
on_success |
Optional[HookSpecification] |
Callback function in event of success of the step. Can be a
function with no arguments, or a source path to such a function
(e.g. |
None |
model_config |
Optional[ModelConfig] |
Model(Version) configuration for this step as |
None |
Returns:
Type | Description |
---|---|
Union[BaseStep, Callable[[F], BaseStep]] |
The step instance. |
Source code in zenml/new/steps/step_decorator.py
def step(
_func: Optional["F"] = None,
*,
name: Optional[str] = None,
enable_cache: Optional[bool] = None,
enable_artifact_metadata: Optional[bool] = None,
enable_artifact_visualization: Optional[bool] = None,
enable_step_logs: Optional[bool] = None,
experiment_tracker: Optional[str] = None,
step_operator: Optional[str] = None,
output_materializers: Optional["OutputMaterializersSpecification"] = None,
settings: Optional[Dict[str, "SettingsOrDict"]] = None,
extra: Optional[Dict[str, Any]] = None,
on_failure: Optional["HookSpecification"] = None,
on_success: Optional["HookSpecification"] = None,
model_config: Optional["ModelConfig"] = None,
) -> Union["BaseStep", Callable[["F"], "BaseStep"]]:
"""Decorator to create a ZenML step.
Args:
_func: The decorated function.
name: The name of the step. If left empty, the name of the decorated
function will be used as a fallback.
enable_cache: Specify whether caching is enabled for this step. If no
value is passed, caching is enabled by default.
enable_artifact_metadata: Specify whether metadata is enabled for this
step. If no value is passed, metadata is enabled by default.
enable_artifact_visualization: Specify whether visualization is enabled
for this step. If no value is passed, visualization is enabled by
default.
enable_step_logs: Specify whether step logs are enabled for this step.
experiment_tracker: The experiment tracker to use for this step.
step_operator: The step operator to use for this step.
output_materializers: Output materializers for this step. If
given as a dict, the keys must be a subset of the output names
of this step. If a single value (type or string) is given, the
materializer will be used for all outputs.
settings: Settings for this step.
extra: Extra configurations for this step.
on_failure: Callback function in event of failure of the step. Can be a
function with a single argument of type `BaseException`, or a source
path to such a function (e.g. `module.my_function`).
on_success: Callback function in event of success of the step. Can be a
function with no arguments, or a source path to such a function
(e.g. `module.my_function`).
model_config: Model(Version) configuration for this step as `ModelConfig` instance.
Returns:
The step instance.
"""
def inner_decorator(func: "F") -> "BaseStep":
from zenml.new.steps.decorated_step import _DecoratedStep
class_: Type["BaseStep"] = type(
func.__name__,
(_DecoratedStep,),
{
"entrypoint": staticmethod(func),
"__module__": func.__module__,
"__doc__": func.__doc__,
},
)
step_instance = class_(
name=name or func.__name__,
enable_cache=enable_cache,
enable_artifact_metadata=enable_artifact_metadata,
enable_artifact_visualization=enable_artifact_visualization,
enable_step_logs=enable_step_logs,
experiment_tracker=experiment_tracker,
step_operator=step_operator,
output_materializers=output_materializers,
settings=settings,
extra=extra,
on_failure=on_failure,
on_success=on_success,
model_config=model_config,
)
return step_instance
if _func is None:
return inner_decorator
else:
return inner_decorator(_func)