Artifacts
zenml.artifacts
special
external_artifact
External artifact definition.
ExternalArtifact (ExternalArtifactConfiguration)
pydantic-model
External artifacts can be used to provide values as input to ZenML steps.
ZenML steps accept either artifacts (=outputs of other steps), parameters (raw, JSON serializable values) or external artifacts. External artifacts can be used to provide any value as input to a step without needing to write an additional step that returns this value.
This class can be configured using the following parameters: - value: The artifact value (any python object), that will be uploaded to the artifact store. - id: The ID of an artifact that is already registered in ZenML. - pipeline_name & artifact_name: Name of a pipeline and artifact to search in latest run. - model_name & model_version & model_artifact_name & model_artifact_version: Name of a model, model version, model artifact and artifact version to search.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
The artifact value. |
required | |
id |
The ID of an artifact that should be referenced by this external artifact. |
required | |
pipeline_name |
Name of a pipeline to search for artifact in latest run. |
required | |
artifact_name |
Name of an artifact to be searched in latest pipeline run. |
required | |
model_name |
Name of a model to search for artifact in (if None - derived from step context). |
required | |
model_version |
Version of a model to search for artifact in (if None - derived from step context). |
required | |
model_artifact_name |
Name of a model artifact to search for. |
required | |
model_artifact_version |
Version of a model artifact to search for. |
required | |
materializer |
The materializer to use for saving the artifact value
to the artifact store. Only used when |
required | |
store_artifact_metadata |
Whether metadata for the artifact should
be stored. Only used when |
required | |
store_artifact_visualizations |
Whether visualizations for the
artifact should be stored. Only used when |
required |
Examples:
from zenml import step, pipeline
from zenml.artifacts.external_artifact import ExternalArtifact
import numpy as np
@step
def my_step(value: np.ndarray) -> None:
print(value)
my_array = np.array([1, 2, 3])
@pipeline
def my_pipeline():
my_step(value=ExternalArtifact(my_array))
Source code in zenml/artifacts/external_artifact.py
class ExternalArtifact(ExternalArtifactConfiguration):
"""External artifacts can be used to provide values as input to ZenML steps.
ZenML steps accept either artifacts (=outputs of other steps), parameters
(raw, JSON serializable values) or external artifacts. External artifacts
can be used to provide any value as input to a step without needing to
write an additional step that returns this value.
This class can be configured using the following parameters:
- value: The artifact value (any python object), that will be uploaded to
the artifact store.
- id: The ID of an artifact that is already registered in ZenML.
- pipeline_name & artifact_name: Name of a pipeline and artifact to search
in latest run.
- model_name & model_version & model_artifact_name & model_artifact_version:
Name of a model, model version, model artifact and artifact version to
search.
Args:
value: The artifact value.
id: The ID of an artifact that should be referenced by this external
artifact.
pipeline_name: Name of a pipeline to search for artifact in latest run.
artifact_name: Name of an artifact to be searched in latest pipeline
run.
model_name: Name of a model to search for artifact in (if None -
derived from step context).
model_version: Version of a model to search for artifact in (if None -
derived from step context).
model_artifact_name: Name of a model artifact to search for.
model_artifact_version: Version of a model artifact to search for.
materializer: The materializer to use for saving the artifact value
to the artifact store. Only used when `value` is provided.
store_artifact_metadata: Whether metadata for the artifact should
be stored. Only used when `value` is provided.
store_artifact_visualizations: Whether visualizations for the
artifact should be stored. Only used when `value` is provided.
Example:
```
from zenml import step, pipeline
from zenml.artifacts.external_artifact import ExternalArtifact
import numpy as np
@step
def my_step(value: np.ndarray) -> None:
print(value)
my_array = np.array([1, 2, 3])
@pipeline
def my_pipeline():
my_step(value=ExternalArtifact(my_array))
```
"""
value: Optional[Any] = None
materializer: Optional[MaterializerClassOrSource] = None
store_artifact_metadata: bool = True
store_artifact_visualizations: bool = True
@root_validator
def _validate_all(cls, values: Dict[str, Any]) -> Dict[str, Any]:
value = values.get("value", None)
id = values.get("id", None)
pipeline_name = values.get("pipeline_name", None)
artifact_name = values.get("artifact_name", None)
model_name = values.get("model_name", None)
model_version = values.get("model_version", None)
model_artifact_name = values.get("model_artifact_name", None)
if (value is not None) + (id is not None) + (
pipeline_name is not None and artifact_name is not None
) + (model_artifact_name is not None) > 1:
raise ValueError(
"Only a value, an ID, pipeline/artifact name pair or "
"model name/model version/model artifact name group can be "
"provided when creating an external artifact."
)
elif all(
v is None
for v in [
value,
id,
pipeline_name or artifact_name,
model_name or model_version or model_artifact_name,
]
):
raise ValueError(
"Either a value, an ID, pipeline/artifact name pair or "
"model name/model version/model artifact name group must be "
"provided when creating an external artifact."
)
elif (pipeline_name is None) != (artifact_name is None):
raise ValueError(
"`pipeline_name` and `artifact_name` can be only provided "
"together when creating an external artifact."
)
return values
def upload_by_value(self) -> UUID:
"""Uploads the artifact by value.
Returns:
The uploaded artifact ID.
Raises:
RuntimeError: If artifact URI already exists.
"""
from zenml.client import Client
from zenml.utils.artifact_utils import upload_artifact
client = Client()
artifact_store_id = client.active_stack.artifact_store.id
logger.info("Uploading external artifact...")
artifact_name = f"external_{uuid4()}"
materializer_class = self._get_materializer_class(value=self.value)
uri = os.path.join(
client.active_stack.artifact_store.path,
"external_artifacts",
artifact_name,
)
if fileio.exists(uri):
raise RuntimeError(f"Artifact URI '{uri}' already exists.")
fileio.makedirs(uri)
materializer = materializer_class(uri)
artifact_id: UUID = upload_artifact(
name=artifact_name,
data=self.value,
materializer=materializer,
artifact_store_id=artifact_store_id,
extract_metadata=self.store_artifact_metadata,
include_visualizations=self.store_artifact_visualizations,
)
# To avoid duplicate uploads, switch to referencing the uploaded
# artifact by ID
self.id = artifact_id
# clean-up state after upload done
self.value = None
logger.info("Finished uploading external artifact %s.", artifact_id)
return self.id
@property
def config(self) -> ExternalArtifactConfiguration:
"""Returns the lightweight config without hard for JSON properties.
Returns:
The config object to be evaluated in runtime by step interface.
"""
return ExternalArtifactConfiguration(
id=self.id,
pipeline_name=self.pipeline_name,
artifact_name=self.artifact_name,
model_name=self.model_name,
model_version=self.model_version,
model_artifact_name=self.model_artifact_name,
model_artifact_version=self.model_artifact_version,
model_artifact_pipeline_name=self.model_artifact_pipeline_name,
model_artifact_step_name=self.model_artifact_step_name,
)
def _get_materializer_class(self, value: Any) -> Type[BaseMaterializer]:
"""Gets a materializer class for a value.
If a custom materializer is defined for this artifact it will be
returned. Otherwise it will get the materializer class from the
registry, falling back to the Cloudpickle materializer if no concrete
materializer is registered for the type of value.
Args:
value: The value for which to get the materializer class.
Returns:
The materializer class.
"""
from zenml.materializers.materializer_registry import (
materializer_registry,
)
from zenml.utils import source_utils
if isinstance(self.materializer, type):
return self.materializer
elif self.materializer:
return source_utils.load_and_validate_class(
self.materializer, expected_class=BaseMaterializer
)
else:
return materializer_registry[type(value)]
config: ExternalArtifactConfiguration
property
readonly
Returns the lightweight config without hard for JSON properties.
Returns:
Type | Description |
---|---|
ExternalArtifactConfiguration |
The config object to be evaluated in runtime by step interface. |
upload_by_value(self)
Uploads the artifact by value.
Returns:
Type | Description |
---|---|
UUID |
The uploaded artifact ID. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If artifact URI already exists. |
Source code in zenml/artifacts/external_artifact.py
def upload_by_value(self) -> UUID:
"""Uploads the artifact by value.
Returns:
The uploaded artifact ID.
Raises:
RuntimeError: If artifact URI already exists.
"""
from zenml.client import Client
from zenml.utils.artifact_utils import upload_artifact
client = Client()
artifact_store_id = client.active_stack.artifact_store.id
logger.info("Uploading external artifact...")
artifact_name = f"external_{uuid4()}"
materializer_class = self._get_materializer_class(value=self.value)
uri = os.path.join(
client.active_stack.artifact_store.path,
"external_artifacts",
artifact_name,
)
if fileio.exists(uri):
raise RuntimeError(f"Artifact URI '{uri}' already exists.")
fileio.makedirs(uri)
materializer = materializer_class(uri)
artifact_id: UUID = upload_artifact(
name=artifact_name,
data=self.value,
materializer=materializer,
artifact_store_id=artifact_store_id,
extract_metadata=self.store_artifact_metadata,
include_visualizations=self.store_artifact_visualizations,
)
# To avoid duplicate uploads, switch to referencing the uploaded
# artifact by ID
self.id = artifact_id
# clean-up state after upload done
self.value = None
logger.info("Finished uploading external artifact %s.", artifact_id)
return self.id
external_artifact_config
External artifact definition.
ExternalArtifactConfiguration (BaseModel)
pydantic-model
External artifact configuration.
Lightweight class to pass in the steps for runtime inference.
Source code in zenml/artifacts/external_artifact_config.py
class ExternalArtifactConfiguration(BaseModel):
"""External artifact configuration.
Lightweight class to pass in the steps for runtime inference.
"""
id: Optional[UUID] = None
pipeline_name: Optional[str] = None
artifact_name: Optional[str] = None
model_name: Optional[str] = None
model_version: Optional[Union[str, int, ModelStages]] = None
model_artifact_name: Optional[str] = None
model_artifact_version: Optional[str] = None
model_artifact_pipeline_name: Optional[str] = None
model_artifact_step_name: Optional[str] = None
def _get_artifact_from_pipeline_run(self) -> "ArtifactResponse":
"""Get artifact from pipeline run.
Returns:
The fetched Artifact.
Raises:
RuntimeError: If artifact was not found in pipeline run.
"""
from zenml.client import Client
client = Client()
response = None
pipeline = client.get_pipeline(
self.pipeline_name # type:ignore[arg-type]
)
for artifact in pipeline.last_successful_run.artifacts:
if artifact.name == self.artifact_name:
response = artifact
break
if response is None:
raise RuntimeError(
f"Artifact with name `{self.artifact_name}` was not found "
f"in last successful run of pipeline `{self.pipeline_name}`. "
"Please check your inputs and try again."
)
return response
def _get_artifact_from_model(
self, model_version: Optional["ModelVersion"] = None
) -> "ArtifactResponse":
"""Get artifact from Model Control Plane.
Args:
model_version: The model containing the model version.
Returns:
The fetched Artifact.
Raises:
RuntimeError: If artifact was not found in model version
RuntimeError: If `model_artifact_name` is set, but `model_name` is empty and
model version is missing in @step and @pipeline.
"""
if self.model_name is None:
if model_version is None:
raise RuntimeError(
"ExternalArtifact initiated with `model_artifact_name`, "
"but no model version was provided and missing in @step or "
"@pipeline definitions."
)
self.model_name = model_version.name
self.model_version = model_version.version
if (
model_version is None
or self.model_name != model_version.name
or self.model_version != model_version.version
):
from zenml.model.model_version import ModelVersion
model_version = ModelVersion(
name=self.model_name,
version=self.model_version,
)
for artifact_getter in [
model_version.get_data_artifact,
model_version.get_model_artifact,
model_version.get_endpoint_artifact,
]:
response = artifact_getter(
name=self.model_artifact_name, # type: ignore [arg-type]
version=self.model_artifact_version,
pipeline_name=self.model_artifact_pipeline_name,
step_name=self.model_artifact_step_name,
)
if response is not None:
break
if response is None:
raise RuntimeError(
f"Artifact with name `{self.model_artifact_name}` was not found "
f"in model `{self.model_name}` version `{self.model_version}`. "
"Please check your inputs and try again."
)
return response
def get_artifact_id(
self, model_version: Optional["ModelVersion"] = None
) -> UUID:
"""Get the artifact.
- If an artifact is referenced by ID, it will verify that the artifact
exists and is in the correct artifact store.
- If an artifact is referenced by pipeline and artifact name pair, it
will be searched in the artifact store by the referenced pipeline.
- If an artifact is referenced by model name and model version, it will
be searched in the artifact store by the referenced model.
Args:
model_version: The model version of the step (from step or pipeline).
Returns:
The artifact ID.
Raises:
RuntimeError: If the artifact store of the referenced artifact
is not the same as the one in the active stack.
RuntimeError: If the URI of the artifact already exists.
RuntimeError: If `model_artifact_name` is set, but `model_name` is empty and
model version is missing in @step and @pipeline.
RuntimeError: If no value, id, pipeline/artifact name pair or model name/model version/model
artifact name group is provided when creating an external artifact.
"""
from zenml.client import Client
client = Client()
if self.id:
response = client.get_artifact(artifact_id=self.id)
elif self.pipeline_name and self.artifact_name:
response = self._get_artifact_from_pipeline_run()
elif self.model_artifact_name:
response = self._get_artifact_from_model(model_version)
else:
raise RuntimeError(
"Either an ID, pipeline/artifact name pair or "
"model name/model version/model artifact name group can be "
"provided when creating an external artifact configuration.\n"
"Potential root cause: you instantiated an ExternalArtifact and "
"called this method before `upload_by_value` was called."
)
artifact_store_id = client.active_stack.artifact_store.id
if response.artifact_store_id != artifact_store_id:
raise RuntimeError(
f"The artifact {response.name} (ID: {response.id}) "
"referenced by an external artifact is not stored in the "
"artifact store of the active stack. This will lead to "
"issues loading the artifact. Please make sure to only "
"reference artifacts stored in your active artifact store."
)
self.id = response.id
return self.id
get_artifact_id(self, model_version=None)
Get the artifact.
- If an artifact is referenced by ID, it will verify that the artifact exists and is in the correct artifact store.
- If an artifact is referenced by pipeline and artifact name pair, it will be searched in the artifact store by the referenced pipeline.
- If an artifact is referenced by model name and model version, it will be searched in the artifact store by the referenced model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_version |
Optional[ModelVersion] |
The model version of the step (from step or pipeline). |
None |
Returns:
Type | Description |
---|---|
UUID |
The artifact ID. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the artifact store of the referenced artifact is not the same as the one in the active stack. |
RuntimeError |
If the URI of the artifact already exists. |
RuntimeError |
If |
RuntimeError |
If no value, id, pipeline/artifact name pair or model name/model version/model artifact name group is provided when creating an external artifact. |
Source code in zenml/artifacts/external_artifact_config.py
def get_artifact_id(
self, model_version: Optional["ModelVersion"] = None
) -> UUID:
"""Get the artifact.
- If an artifact is referenced by ID, it will verify that the artifact
exists and is in the correct artifact store.
- If an artifact is referenced by pipeline and artifact name pair, it
will be searched in the artifact store by the referenced pipeline.
- If an artifact is referenced by model name and model version, it will
be searched in the artifact store by the referenced model.
Args:
model_version: The model version of the step (from step or pipeline).
Returns:
The artifact ID.
Raises:
RuntimeError: If the artifact store of the referenced artifact
is not the same as the one in the active stack.
RuntimeError: If the URI of the artifact already exists.
RuntimeError: If `model_artifact_name` is set, but `model_name` is empty and
model version is missing in @step and @pipeline.
RuntimeError: If no value, id, pipeline/artifact name pair or model name/model version/model
artifact name group is provided when creating an external artifact.
"""
from zenml.client import Client
client = Client()
if self.id:
response = client.get_artifact(artifact_id=self.id)
elif self.pipeline_name and self.artifact_name:
response = self._get_artifact_from_pipeline_run()
elif self.model_artifact_name:
response = self._get_artifact_from_model(model_version)
else:
raise RuntimeError(
"Either an ID, pipeline/artifact name pair or "
"model name/model version/model artifact name group can be "
"provided when creating an external artifact configuration.\n"
"Potential root cause: you instantiated an ExternalArtifact and "
"called this method before `upload_by_value` was called."
)
artifact_store_id = client.active_stack.artifact_store.id
if response.artifact_store_id != artifact_store_id:
raise RuntimeError(
f"The artifact {response.name} (ID: {response.id}) "
"referenced by an external artifact is not stored in the "
"artifact store of the active stack. This will lead to "
"issues loading the artifact. Please make sure to only "
"reference artifacts stored in your active artifact store."
)
self.id = response.id
return self.id
unmaterialized_artifact
Unmaterialized artifact class.
UnmaterializedArtifact (ArtifactResponse)
pydantic-model
Unmaterialized artifact class.
Typing a step input to have this type will cause ZenML to not materialize the artifact. This is useful for steps that need to access the artifact metadata instead of the actual artifact data.
Usage example:
from zenml import step
from zenml.artifacts.unmaterialized_artifact import UnmaterializedArtifact
@step
def my_step(input_artifact: UnmaterializedArtifact):
print(input_artifact.uri)
Source code in zenml/artifacts/unmaterialized_artifact.py
class UnmaterializedArtifact(ArtifactResponse):
"""Unmaterialized artifact class.
Typing a step input to have this type will cause ZenML to not materialize
the artifact. This is useful for steps that need to access the artifact
metadata instead of the actual artifact data.
Usage example:
```python
from zenml import step
from zenml.artifacts.unmaterialized_artifact import UnmaterializedArtifact
@step
def my_step(input_artifact: UnmaterializedArtifact):
print(input_artifact.uri)
```
"""
__json_encoder__(obj)
special
staticmethod
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.