Kaniko
zenml.integrations.kaniko
special
Kaniko integration for image building.
KanikoIntegration (Integration)
Definition of the Kaniko integration for ZenML.
Source code in zenml/integrations/kaniko/__init__.py
class KanikoIntegration(Integration):
"""Definition of the Kaniko integration for ZenML."""
NAME = KANIKO
REQUIREMENTS = []
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Kaniko integration.
Returns:
List of new stack component flavors.
"""
from zenml.integrations.kaniko.flavors import KanikoImageBuilderFlavor
return [KanikoImageBuilderFlavor]
flavors()
classmethod
Declare the stack component flavors for the Kaniko integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of new stack component flavors. |
Source code in zenml/integrations/kaniko/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Kaniko integration.
Returns:
List of new stack component flavors.
"""
from zenml.integrations.kaniko.flavors import KanikoImageBuilderFlavor
return [KanikoImageBuilderFlavor]
flavors
special
Kaniko integration flavors.
kaniko_image_builder_flavor
Kaniko image builder flavor.
KanikoImageBuilderConfig (BaseImageBuilderConfig)
pydantic-model
Kaniko image builder configuration.
The env
, env_from
, volume_mounts
and volumes
attributes will be
used to generate the container specification. They should be used to
configure secrets and environment variables so that the Kaniko build
container is able to push to the container registry (and optionally access
the artifact store to upload the build context).
Attributes:
Name | Type | Description |
---|---|---|
kubernetes_context |
str |
The Kubernetes context in which to run the Kaniko pod. |
kubernetes_namespace |
str |
The Kubernetes namespace in which to run the Kaniko pod. This namespace will not be created and must already exist. |
executor_image |
str |
The image of the Kaniko executor to use. |
env |
List[Dict[str, Any]] |
|
env_from |
List[Dict[str, Any]] |
|
volume_mounts |
List[Dict[str, Any]] |
|
volumes |
List[Dict[str, Any]] |
|
service_account_name |
Optional[str] |
Name of the Kubernetes service account to use. |
store_context_in_artifact_store |
bool |
If |
executor_args |
List[str] |
Additional arguments to forward to the Kaniko executor. See https://github.com/GoogleContainerTools/kaniko#additional-flags for a full list of available arguments. |
Source code in zenml/integrations/kaniko/flavors/kaniko_image_builder_flavor.py
class KanikoImageBuilderConfig(BaseImageBuilderConfig):
"""Kaniko image builder configuration.
The `env`, `env_from`, `volume_mounts` and `volumes` attributes will be
used to generate the container specification. They should be used to
configure secrets and environment variables so that the Kaniko build
container is able to push to the container registry (and optionally access
the artifact store to upload the build context).
Attributes:
kubernetes_context: The Kubernetes context in which to run the Kaniko
pod.
kubernetes_namespace: The Kubernetes namespace in which to run the
Kaniko pod. This namespace will not be created and must already
exist.
executor_image: The image of the Kaniko executor to use.
env: `env` section of the Kubernetes container spec.
env_from: `envFrom` section of the Kubernetes container spec.
volume_mounts: `volumeMounts` section of the Kubernetes container spec.
volumes: `volumes` section of the Kubernetes pod spec.
service_account_name: Name of the Kubernetes service account to use.
store_context_in_artifact_store: If `True`, the build context will be
stored in the artifact store. If `False`, the build context will be
streamed over stdin of the `kubectl` process that runs the build.
In case the artifact store is used, the container running the build
needs read access to the artifact store.
executor_args: Additional arguments to forward to the Kaniko executor.
See https://github.com/GoogleContainerTools/kaniko#additional-flags
for a full list of available arguments.
"""
kubernetes_context: str
kubernetes_namespace: str = "zenml-kaniko"
executor_image: str = DEFAULT_KANIKO_EXECUTOR_IMAGE
env: List[Dict[str, Any]] = []
env_from: List[Dict[str, Any]] = []
volume_mounts: List[Dict[str, Any]] = []
volumes: List[Dict[str, Any]] = []
service_account_name: Optional[str] = None
store_context_in_artifact_store: bool = False
executor_args: List[str] = []
@validator(
"env",
"env_from",
"volume_mounts",
"volumes",
"executor_args",
pre=True,
)
def _convert_json_string(
cls, value: Union[None, str, List[Any]]
) -> Optional[List[Any]]:
"""Converts potential JSON strings passed via the CLI to lists.
Args:
value: The value to convert.
Returns:
The converted value.
Raises:
TypeError: If the value is not a `str`, `List` or `None`.
ValueError: If the value is an invalid json string or a json string
that does not decode into a list.
"""
if isinstance(value, str):
try:
list_ = json.loads(value)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid json string '{value}'") from e
if not isinstance(list_, List):
raise ValueError(
f"Json string '{value}' did not decode into a list."
)
return list_
elif isinstance(value, List) or value is None:
return value
else:
raise TypeError(f"{value} is not a json string or a list.")
KanikoImageBuilderFlavor (BaseImageBuilderFlavor)
Kaniko image builder flavor.
Source code in zenml/integrations/kaniko/flavors/kaniko_image_builder_flavor.py
class KanikoImageBuilderFlavor(BaseImageBuilderFlavor):
"""Kaniko image builder flavor."""
@property
def name(self) -> str:
"""The flavor name.
Returns:
The flavor name.
"""
return KANIKO_IMAGE_BUILDER_FLAVOR
@property
def config_class(self) -> Type[KanikoImageBuilderConfig]:
"""Config class.
Returns:
The config class.
"""
return KanikoImageBuilderConfig
@property
def implementation_class(self) -> Type["KanikoImageBuilder"]:
"""Implementation class.
Returns:
The implementation class.
"""
from zenml.integrations.kaniko.image_builders import KanikoImageBuilder
return KanikoImageBuilder
config_class: Type[zenml.integrations.kaniko.flavors.kaniko_image_builder_flavor.KanikoImageBuilderConfig]
property
readonly
Config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.kaniko.flavors.kaniko_image_builder_flavor.KanikoImageBuilderConfig] |
The config class. |
implementation_class: Type[KanikoImageBuilder]
property
readonly
Implementation class.
Returns:
Type | Description |
---|---|
Type[KanikoImageBuilder] |
The implementation class. |
name: str
property
readonly
The flavor name.
Returns:
Type | Description |
---|---|
str |
The flavor name. |
image_builders
special
Kaniko image building.
kaniko_image_builder
Kaniko image builder implementation.
KanikoImageBuilder (BaseImageBuilder)
Kaniko image builder implementation.
Source code in zenml/integrations/kaniko/image_builders/kaniko_image_builder.py
class KanikoImageBuilder(BaseImageBuilder):
"""Kaniko image builder implementation."""
@property
def config(self) -> KanikoImageBuilderConfig:
"""The stack component configuration.
Returns:
The configuration.
"""
return cast(KanikoImageBuilderConfig, self._config)
@property
def validator(self) -> Optional[StackValidator]:
"""Validates that the stack contains a container registry.
Returns:
Stack validator.
"""
def _validate_remote_components(
stack: "Stack",
) -> Tuple[bool, str]:
assert stack.container_registry
if stack.container_registry.config.is_local:
return False, (
"The Kaniko image builder builds Docker images in a "
"Kubernetes cluster and isn't able to push the resulting "
"image to a local container registry running on your "
"machine. Please update your stack to include a remote "
"container registry and try again."
)
if (
self.config.store_context_in_artifact_store
and stack.artifact_store.config.is_local
):
return False, (
"The Kaniko image builder is configured to upload the "
"build context to the artifact store. This only works with "
"remote artifact stores so that the Kaniko build pod is "
"able to read from it. Please update your stack to include "
"a remote artifact store and try again."
)
return True, ""
return StackValidator(
required_components={StackComponentType.CONTAINER_REGISTRY},
custom_validation_function=_validate_remote_components,
)
def build(
self,
image_name: str,
build_context: "BuildContext",
docker_build_options: Dict[str, Any],
container_registry: Optional["BaseContainerRegistry"] = None,
) -> str:
"""Builds and pushes a Docker image.
Args:
image_name: Name of the image to build and push.
build_context: The build context to use for the image.
docker_build_options: Docker build options.
container_registry: Optional container registry to push to.
Returns:
The Docker image repo digest.
Raises:
RuntimeError: If no container registry is passed.
RuntimeError: If the upload to the artifact store has failed.
"""
self._check_prerequisites()
if not container_registry:
raise RuntimeError(
"Unable to use the Kaniko image builder without a container "
"registry."
)
pod_name = self._generate_pod_name()
logger.info(
"Using Kaniko to build image `%s` in pod `%s`.",
image_name,
pod_name,
)
if self.config.store_context_in_artifact_store:
try:
kaniko_context = self._upload_build_context(build_context)
except Exception:
raise RuntimeError(
"Uploading the Kaniko build context to the artifact store "
"failed. Please make sure you have permissions to write "
"to the artifact store or update the Kaniko image builder "
"to stream the build context using stdin by running:\n"
f" `zenml image-builder update {self.name}` "
"--store_context_in_artifact_store=False`"
)
else:
kaniko_context = "tar://stdin"
spec_overrides = self._generate_spec_overrides(
pod_name=pod_name, image_name=image_name, context=kaniko_context
)
self._run_kaniko_build(
pod_name=pod_name,
spec_overrides=spec_overrides,
build_context=build_context,
)
image_name_with_sha = self._read_pod_output(pod_name=pod_name)
self._verify_image_name(
image_name_with_tag=image_name,
image_name_with_sha=image_name_with_sha,
)
self._delete_pod(pod_name=pod_name)
return image_name_with_sha
def _generate_spec_overrides(
self, pod_name: str, image_name: str, context: str
) -> Dict[str, Any]:
"""Generates Kubernetes spec overrides for the Kaniko build Pod.
These values are used to override the default specification of the
Kubernetes pod that is running the Kaniko build. This can be used to
specify arguments for the Kaniko executor as well as providing
environment variables and/or volume mounts.
Args:
pod_name: Name of the pod.
image_name: Name of the image that should be built.
context: The Kaniko executor context argument.
Returns:
Dictionary of spec override values.
"""
args = [
"--dockerfile=Dockerfile",
f"--context={context}",
f"--destination={image_name}",
# Use the image name with repo digest as the Pod termination
# message. We use this later to read the image name using kubectl.
"--image-name-with-digest-file=/dev/termination-log",
] + self.config.executor_args
optional_container_args: Dict[str, Any] = {}
if self.config.service_account_name:
optional_container_args[
"serviceAccountName"
] = self.config.service_account_name
return {
"apiVersion": "v1",
"spec": {
"containers": [
{
"name": pod_name,
"image": self.config.executor_image,
"stdin": True,
"stdinOnce": True,
"args": args,
"env": self.config.env,
"envFrom": self.config.env_from,
"volumeMounts": self.config.volume_mounts,
**optional_container_args,
}
],
"volumes": self.config.volumes,
},
}
def _run_kaniko_build(
self,
pod_name: str,
spec_overrides: Dict[str, Any],
build_context: "BuildContext",
) -> None:
"""Runs the Kaniko build in Kubernetes.
Args:
pod_name: Name of the Pod that should be created to run the build.
spec_overrides: Pod spec override values.
build_context: The build context.
Raises:
RuntimeError: If the process running the Kaniko build failed.
"""
command = [
"kubectl",
"--context",
self.config.kubernetes_context,
"--namespace",
self.config.kubernetes_namespace,
"run",
pod_name,
"--stdin",
"true",
"--restart",
"Never",
"--image",
self.config.executor_image,
"--overrides",
json.dumps(spec_overrides),
]
logger.debug("Running Kaniko build with command: %s", command)
with subprocess.Popen(
command,
stdin=subprocess.PIPE,
) as p:
if not self.config.store_context_in_artifact_store:
self._write_build_context(
process=p, build_context=build_context
)
try:
return_code = p.wait()
except:
p.kill()
raise
if return_code:
raise RuntimeError(
"The process that runs the Kaniko build Pod failed. Check the "
"log messages above for more information."
)
@staticmethod
def _upload_build_context(build_context: "BuildContext") -> str:
"""Uploads a build context to the artifact store.
Args:
build_context: The build context to upload.
Returns:
The path of the uploaded build context.
"""
artifact_store = Client().active_stack.artifact_store
hash_ = hashlib.sha1()
with tempfile.NamedTemporaryFile(mode="w+b") as f:
build_context.write_archive(f, gzip=True)
while True:
data = f.read(64 * 1024)
if not data:
break
hash_.update(data)
filename = f"{hash_.hexdigest()}.tar.gz"
filepath = f"{artifact_store.path}/kaniko-contexts/{filename}"
if not fileio.exists(filepath):
logger.info(
"Uploading Kaniko build context to `%s`.", filepath
)
fileio.copy(f.name, filepath)
else:
logger.info("Build context already exists, not uploading.")
return filepath
@staticmethod
def _write_build_context(
process: BytePopen, build_context: "BuildContext"
) -> None:
"""Writes the build context to the process stdin.
Args:
process: The process to which the context will be written.
build_context: The build context to write.
"""
logger.debug("Writing build context to process stdin.")
assert process.stdin
with process.stdin as _, tempfile.TemporaryFile(mode="w+b") as f:
build_context.write_archive(f, gzip=True)
while True:
data = f.read(1024)
if not data:
break
process.stdin.write(data)
@staticmethod
def _generate_pod_name() -> str:
"""Generates a random name for the Pod that runs the Kaniko build.
Returns:
The Pod name.
"""
return f"kaniko-build-{random.Random().getrandbits(32):08x}"
def _read_pod_output(self, pod_name: str) -> str:
"""Reads the Pod output message.
Args:
pod_name: Name of the Pod of which to read the output message.
Returns:
The Pod output message.
"""
command = [
"kubectl",
"--context",
self.config.kubernetes_context,
"--namespace",
self.config.kubernetes_namespace,
"get",
"pod",
pod_name,
"-o",
'jsonpath="{.status.containerStatuses[0].state.terminated.message}"',
]
output = subprocess.check_output(command).decode()
output = output.strip('"\n')
logger.debug("Kaniko build pod termination message: %s", output)
return output
def _delete_pod(self, pod_name: str) -> None:
"""Deletes a Pod.
Args:
pod_name: Name of the Pod to delete.
Raises:
subprocess.CalledProcessError: If the kubectl call to delete
the Pod failed.
"""
command = [
"kubectl",
"--context",
self.config.kubernetes_context,
"--namespace",
self.config.kubernetes_namespace,
"delete",
"pod",
pod_name,
]
try:
subprocess.run(command, stdout=subprocess.PIPE, check=True)
except subprocess.CalledProcessError as e:
logger.error(e.output)
raise
logger.info("Deleted Kaniko build Pod %s.", pod_name)
@staticmethod
def _check_prerequisites() -> None:
"""Checks that all prerequisites are installed.
Raises:
RuntimeError: If any of the prerequisites are not installed.
"""
if not shutil.which("kubectl"):
raise RuntimeError(
"`kubectl` is required to run the Kaniko image builder."
)
@staticmethod
def _verify_image_name(
image_name_with_tag: str, image_name_with_sha: str
) -> None:
"""Verifies the name/sha of the pushed image.
Args:
image_name_with_tag: The image name with a tag but without a unique
sha.
image_name_with_sha: The image name with a unique sha value
appended.
Raises:
RuntimeError: If the image names don't point to the same Docker
repository.
"""
image_name_without_tag, _ = image_name_with_tag.rsplit(":", 1)
if not image_name_with_sha.startswith(image_name_without_tag):
raise RuntimeError(
f"The Kaniko Pod output {image_name_with_sha} is not a valid "
f"image name in the repository {image_name_without_tag}."
)
config: KanikoImageBuilderConfig
property
readonly
The stack component configuration.
Returns:
Type | Description |
---|---|
KanikoImageBuilderConfig |
The configuration. |
validator: Optional[zenml.stack.stack_validator.StackValidator]
property
readonly
Validates that the stack contains a container registry.
Returns:
Type | Description |
---|---|
Optional[zenml.stack.stack_validator.StackValidator] |
Stack validator. |
build(self, image_name, build_context, docker_build_options, container_registry=None)
Builds and pushes a Docker image.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image_name |
str |
Name of the image to build and push. |
required |
build_context |
BuildContext |
The build context to use for the image. |
required |
docker_build_options |
Dict[str, Any] |
Docker build options. |
required |
container_registry |
Optional[BaseContainerRegistry] |
Optional container registry to push to. |
None |
Returns:
Type | Description |
---|---|
str |
The Docker image repo digest. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If no container registry is passed. |
RuntimeError |
If the upload to the artifact store has failed. |
Source code in zenml/integrations/kaniko/image_builders/kaniko_image_builder.py
def build(
self,
image_name: str,
build_context: "BuildContext",
docker_build_options: Dict[str, Any],
container_registry: Optional["BaseContainerRegistry"] = None,
) -> str:
"""Builds and pushes a Docker image.
Args:
image_name: Name of the image to build and push.
build_context: The build context to use for the image.
docker_build_options: Docker build options.
container_registry: Optional container registry to push to.
Returns:
The Docker image repo digest.
Raises:
RuntimeError: If no container registry is passed.
RuntimeError: If the upload to the artifact store has failed.
"""
self._check_prerequisites()
if not container_registry:
raise RuntimeError(
"Unable to use the Kaniko image builder without a container "
"registry."
)
pod_name = self._generate_pod_name()
logger.info(
"Using Kaniko to build image `%s` in pod `%s`.",
image_name,
pod_name,
)
if self.config.store_context_in_artifact_store:
try:
kaniko_context = self._upload_build_context(build_context)
except Exception:
raise RuntimeError(
"Uploading the Kaniko build context to the artifact store "
"failed. Please make sure you have permissions to write "
"to the artifact store or update the Kaniko image builder "
"to stream the build context using stdin by running:\n"
f" `zenml image-builder update {self.name}` "
"--store_context_in_artifact_store=False`"
)
else:
kaniko_context = "tar://stdin"
spec_overrides = self._generate_spec_overrides(
pod_name=pod_name, image_name=image_name, context=kaniko_context
)
self._run_kaniko_build(
pod_name=pod_name,
spec_overrides=spec_overrides,
build_context=build_context,
)
image_name_with_sha = self._read_pod_output(pod_name=pod_name)
self._verify_image_name(
image_name_with_tag=image_name,
image_name_with_sha=image_name_with_sha,
)
self._delete_pod(pod_name=pod_name)
return image_name_with_sha