Skip to content

Container Engines

zenml.container_engines

Client-side container engines (Docker, Podman).

Attributes

__all__ = ['ContainerEngine', 'DockerContainerEngine', 'PodmanContainerEngine', 'get_container_engine', 'check_container_engine'] module-attribute

Classes

ContainerEngine

Bases: ABC

Client-side abstraction over Docker/Podman (build, tag, push, inspect).

Implementations encapsulate OCI client details; callers use the factory functions in :mod:zenml.container_engines.factory for the active engine.

Attributes
engine_type: ContainerEngineType abstractmethod property

The type of the container engine.

Returns:

Type Description
ContainerEngineType

The type of the container engine.

Functions
build(image_name: str, build_context: BuildContext, docker_build_options: Optional[DockerBuildOptions] = None, container_registry: Optional[BaseContainerRegistry] = None, **kwargs: Any) -> None abstractmethod

Build an image locally from a tarball build context.

Parameters:

Name Type Description Default
image_name str

Target image reference including tag.

required
build_context BuildContext

Archive and Dockerfile content for the build.

required
docker_build_options Optional[DockerBuildOptions]

Optional Docker-compatible build options.

None
container_registry Optional[BaseContainerRegistry]

When set, registry credentials are applied for authenticated base-image pulls during the build.

None
**kwargs Any

Additional keyword arguments.

{}

Returns:

Type Description
None

None

Raises:

Type Description
RuntimeError

If the build fails (implementation-specific).

Source code in src/zenml/container_engines/base.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
@abstractmethod
def build(
    self,
    image_name: str,
    build_context: "BuildContext",
    docker_build_options: Optional["DockerBuildOptions"] = None,
    container_registry: Optional["BaseContainerRegistry"] = None,
    **kwargs: Any,
) -> None:
    """Build an image locally from a tarball build context.

    Args:
        image_name: Target image reference including tag.
        build_context: Archive and Dockerfile content for the build.
        docker_build_options: Optional Docker-compatible build options.
        container_registry: When set, registry credentials are applied for
            authenticated base-image pulls during the build.
        **kwargs: Additional keyword arguments.

    Returns:
        None

    Raises:
        RuntimeError: If the build fails (implementation-specific).
    """
check_availability() -> Tuple[bool, str | None] abstractmethod

Check whether this engine is installed and accessible and return the error message if not.

Returns:

Type Description
bool

True if the engine binary and runtime are likely available, else

str | None

False and an error message if not.

Source code in src/zenml/container_engines/base.py
50
51
52
53
54
55
56
57
@abstractmethod
def check_availability(self) -> Tuple[bool, str | None]:
    """Check whether this engine is installed and accessible and return the error message if not.

    Returns:
        ``True`` if the engine binary and runtime are likely available, else
        ``False`` and an error message if not.
    """
get_image_repo_digest(image_name: str, container_registry: Optional[BaseContainerRegistry] = None) -> Optional[str] abstractmethod

Resolve the repository digest for an image when the client can.

Parameters:

Name Type Description Default
image_name str

Image reference (may include tag).

required
container_registry Optional[BaseContainerRegistry]

When set and the name matches the registry, credentials from this component are used for registry access.

None

Returns:

Type Description
Optional[str]

Repo digest string (e.g. sha256:...) when resolvable, else

Optional[str]

None.

Source code in src/zenml/container_engines/base.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
@abstractmethod
def get_image_repo_digest(
    self,
    image_name: str,
    container_registry: Optional["BaseContainerRegistry"] = None,
) -> Optional[str]:
    """Resolve the repository digest for an image when the client can.

    Args:
        image_name: Image reference (may include tag).
        container_registry: When set and the name matches the registry,
            credentials from this component are used for registry access.

    Returns:
        Repo digest string (e.g. ``sha256:...``) when resolvable, else
        ``None``.
    """
is_image_local(image_name: str) -> bool abstractmethod

Return whether the image appears to exist only locally.

Parameters:

Name Type Description Default
image_name str

Image reference to inspect.

required

Returns:

Type Description
bool

True if the image is present locally without a resolved remote

bool

repo digest heuristic; False otherwise.

Source code in src/zenml/container_engines/base.py
153
154
155
156
157
158
159
160
161
162
163
@abstractmethod
def is_image_local(self, image_name: str) -> bool:
    """Return whether the image appears to exist only locally.

    Args:
        image_name: Image reference to inspect.

    Returns:
        ``True`` if the image is present locally without a resolved remote
        repo digest heuristic; ``False`` otherwise.
    """
login(username: str, password: str, registry: str) -> None abstractmethod

Authenticate the container engine with the container registry credentials.

Parameters:

Name Type Description Default
username str

Registry username.

required
password str

Registry password or token.

required
registry str

Registry host or URI.

required
Source code in src/zenml/container_engines/base.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@abstractmethod
def login(
    self,
    username: str,
    password: str,
    registry: str,
) -> None:
    """Authenticate the container engine with the container registry credentials.

    Args:
        username: Registry username.
        password: Registry password or token.
        registry: Registry host or URI.
    """
login_registry(container_registry: BaseContainerRegistry) -> None

Authenticate the container engine with the container registry credentials.

Parameters:

Name Type Description Default
container_registry BaseContainerRegistry

Container registry to authenticate with.

required
Source code in src/zenml/container_engines/base.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def login_registry(
    self, container_registry: "BaseContainerRegistry"
) -> None:
    """Authenticate the container engine with the container registry credentials.

    Args:
        container_registry: Container registry to authenticate with.
    """
    credentials = container_registry.credentials
    if credentials:
        username, password = credentials
        self.login(
            username=username,
            password=password,
            registry=container_registry.config.uri,
        )
parse_dockerignore(dockerignore_path: str) -> List[str] staticmethod

Parses a dockerignore file and returns a list of patterns to ignore.

Parameters:

Name Type Description Default
dockerignore_path str

Path to the dockerignore file.

required

Returns:

Type Description
List[str]

List of patterns to ignore.

Source code in src/zenml/container_engines/base.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
@staticmethod
def parse_dockerignore(dockerignore_path: str) -> List[str]:
    """Parses a dockerignore file and returns a list of patterns to ignore.

    Args:
        dockerignore_path: Path to the dockerignore file.

    Returns:
        List of patterns to ignore.
    """
    try:
        file_content = io_utils.read_file_contents_as_string(
            dockerignore_path
        )
    except FileNotFoundError:
        logger.warning(
            "Unable to find dockerignore file at path '%s'.",
            dockerignore_path,
        )
        return []

    exclude_patterns = []
    for line in file_content.split("\n"):
        line = line.strip()
        if line and not line.startswith("#"):
            exclude_patterns.append(line)

    return exclude_patterns
push_image(image_name: str, container_registry: Optional[BaseContainerRegistry] = None) -> str abstractmethod

Push a local image to the configured registry.

Parameters:

Name Type Description Default
image_name str

Full image reference including tag.

required
container_registry Optional[BaseContainerRegistry]

Container registry to push to. Used for validation and credentials, if provided.

None

Returns:

Type Description
str

Repository digest when the engine reports one; otherwise an

str

implementation-defined stable surrogate (e.g. unique tag).

Raises:

Type Description
RuntimeError

If authentication or push fails.

Source code in src/zenml/container_engines/base.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
@abstractmethod
def push_image(
    self,
    image_name: str,
    container_registry: Optional["BaseContainerRegistry"] = None,
) -> str:
    """Push a local image to the configured registry.

    Args:
        image_name: Full image reference including tag.
        container_registry: Container registry to push to. Used for
            validation and credentials, if provided.

    Returns:
        Repository digest when the engine reports one; otherwise an
        implementation-defined stable surrogate (e.g. unique tag).

    Raises:
        RuntimeError: If authentication or push fails.
    """
sanitize_tag(tag: str) -> str staticmethod

Sanitize a Docker tag.

Parameters:

Name Type Description Default
tag str

The tag to sanitize.

required

Raises:

Type Description
ValueError

If the tag is empty.

Returns:

Type Description
str

The sanitized tag.

Source code in src/zenml/container_engines/base.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
@staticmethod
def sanitize_tag(tag: str) -> str:
    """Sanitize a Docker tag.

    Args:
        tag: The tag to sanitize.

    Raises:
        ValueError: If the tag is empty.

    Returns:
        The sanitized tag.
    """
    tag = re.sub(r"[^a-zA-Z0-9_.\-]", "", tag)
    # Tags can not start with a dot or a dash
    tag = re.sub(r"^[-.]", "_", tag)

    if not tag:
        raise ValueError("Docker image tag cannot be empty.")

    return tag[:128]
tag_image(source: str, target: str) -> None abstractmethod

Apply an additional local name/tag to an existing local image.

Parameters:

Name Type Description Default
source str

Existing local image reference.

required
target str

New local reference (alias) for the same image.

required

Returns:

Type Description
None

None

Raises:

Type Description
RuntimeError

If the engine cannot apply the tag.

Source code in src/zenml/container_engines/base.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
@abstractmethod
def tag_image(self, source: str, target: str) -> None:
    """Apply an additional local name/tag to an existing local image.

    Args:
        source: Existing local image reference.
        target: New local reference (alias) for the same image.

    Returns:
        None

    Raises:
        RuntimeError: If the engine cannot apply the tag.
    """

DockerContainerEngine()

Bases: ContainerEngine

Container engine using the Docker CLI and/or docker-py.

Create a Docker engine (registry auth is per-operation).

Source code in src/zenml/container_engines/docker_engine.py
54
55
56
def __init__(self) -> None:
    """Create a Docker engine (registry auth is per-operation)."""
    self._client: Optional[DockerClient] = None
Attributes
client: DockerClient property

Return a docker-py client, reusing a cached unauthenticated instance.

Returns:

Name Type Description
DockerClient DockerClient

Connection to the local Docker daemon.

Raises:

Type Description
RuntimeError

If docker-py cannot create a client from the environment (for example when the daemon is not running).

engine_type: ContainerEngineType property

The type of the container engine.

Returns:

Type Description
ContainerEngineType

The type of the container engine.

Functions
build(image_name: str, build_context: BuildContext, docker_build_options: Optional[DockerBuildOptions] = None, container_registry: Optional[BaseContainerRegistry] = None, **kwargs: Any) -> None

Build an image using docker-py or docker build -.

Parameters:

Name Type Description Default
image_name str

Target image reference including tag.

required
build_context BuildContext

Tarball context written to the Docker daemon.

required
docker_build_options Optional[DockerBuildOptions]

Optional Docker build options.

None
container_registry Optional[BaseContainerRegistry]

When set, applies registry credentials for pulls.

None
**kwargs Any

Additional keyword arguments.

{}

Returns:

Type Description
None

None

Source code in src/zenml/container_engines/docker_engine.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def build(
    self,
    image_name: str,
    build_context: "BuildContext",
    docker_build_options: Optional["DockerBuildOptions"] = None,
    container_registry: Optional["BaseContainerRegistry"] = None,
    **kwargs: Any,
) -> None:
    """Build an image using docker-py or ``docker build -``.

    Args:
        image_name: Target image reference including tag.
        build_context: Tarball context written to the Docker daemon.
        docker_build_options: Optional Docker build options.
        container_registry: When set, applies registry credentials for pulls.
        **kwargs: Additional keyword arguments.

    Returns:
        None
    """
    if container_registry:
        self.login_registry(container_registry)

    if kwargs.get("use_subprocess", False):
        self._build_cli(image_name, build_context, docker_build_options)
        return

    build_options = (
        docker_build_options.to_docker_python_sdk_options()
        if docker_build_options
        else {}
    )
    with tempfile.TemporaryFile(mode="w+b") as f:
        build_context.write_archive(f)
        f.seek(0)
        output_stream = self.client.images.client.api.build(
            fileobj=f,
            custom_context=True,
            tag=image_name,
            **build_options,
        )
    self.process_docker_stream(output_stream)
check_availability() -> Tuple[bool, str | None]

Check whether this engine is installed and accessible and return the error message if not.

Returns:

Type Description
bool

True if the engine binary and runtime are likely available, else

str | None

False and an error message if not.

Source code in src/zenml/container_engines/docker_engine.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def check_availability(self) -> Tuple[bool, str | None]:
    """Check whether this engine is installed and accessible and return the error message if not.

    Returns:
        ``True`` if the engine binary and runtime are likely available, else
        ``False`` and an error message if not.
    """
    if not shutil.which("docker"):
        return (
            False,
            "`docker` is not installed or not accessible on your system.",
        )

    try:
        self.client.ping()
        return (True, None)
    except Exception:
        return (
            False,
            "Unable to connect to the Docker daemon. There are three "
            "common causes for this:\n"
            "1) The Docker daemon isn't running.\n"
            "2) The Docker client isn't configured correctly. The client "
            "loads its configuration from the following file: "
            "$HOME/.docker/config.json. If your configuration file is in a "
            "different location, set the `DOCKER_CONFIG` environment "
            "variable to the directory that contains your `config.json` "
            "file.\n"
            "3) If your Docker CLI is working fine but you ran into this "
            "issue, you might be using a non-default Docker context which "
            "is not supported by the Docker python library. To verify "
            "this, run `docker context ls` and check which context has a "
            "`*` next to it. If this is not the `default` context, copy "
            "the `DOCKER ENDPOINT` value of that context and set the "
            "`DOCKER_HOST` environment variable to that value.",
        )
close() -> None

Close the cached docker-py client if one was opened.

Source code in src/zenml/container_engines/docker_engine.py
448
449
450
451
452
def close(self) -> None:
    """Close the cached docker-py client if one was opened."""
    if self._client is not None:
        self._client.close()
        self._client = None
get_image_repo_digest(image_name: str, container_registry: Optional[BaseContainerRegistry] = None) -> Optional[str]

Resolve a repository digest using docker-py (and registry creds).

Parameters:

Name Type Description Default
image_name str

Image reference to resolve.

required
container_registry Optional[BaseContainerRegistry]

When the name matches this registry, its credentials authenticate registry API access.

None

Returns:

Type Description
Optional[str]

Digest string when resolvable; None if not found or unknown.

Source code in src/zenml/container_engines/docker_engine.py
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
def get_image_repo_digest(
    self,
    image_name: str,
    container_registry: Optional["BaseContainerRegistry"] = None,
) -> Optional[str]:
    """Resolve a repository digest using docker-py (and registry creds).

    Args:
        image_name: Image reference to resolve.
        container_registry: When the name matches this registry, its
            credentials authenticate registry API access.

    Returns:
        Digest string when resolvable; ``None`` if not found or unknown.
    """
    if (
        container_registry
        and container_registry.is_valid_image_name_for_registry(image_name)
    ):
        self.login_registry(container_registry)

    try:
        metadata = self.client.images.get_registry_data(image_name)
    except Exception:
        return None

    return cast(str, metadata.id.split(":")[-1])
is_image_local(image_name: str) -> bool

Return whether Docker considers the image local-only.

Parameters:

Name Type Description Default
image_name str

Image reference to inspect.

required

Returns:

Type Description
bool

True if the image is local without a resolved repo digest.

Source code in src/zenml/container_engines/docker_engine.py
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
def is_image_local(self, image_name: str) -> bool:
    """Return whether Docker considers the image local-only.

    Args:
        image_name: Image reference to inspect.

    Returns:
        ``True`` if the image is local without a resolved repo digest.
    """
    images = self.client.images.list(name=image_name)
    if not images:
        return False

    image = self.client.images.get(image_name)
    repo_digests = image.attrs["RepoDigests"]

    return len(repo_digests) == 0
login(username: str, password: str, registry: str) -> None

Authenticate the Docker client with the container registry credentials.

Parameters:

Name Type Description Default
username str

Registry username.

required
password str

Registry password or token.

required
registry str

Registry host or URI.

required
Source code in src/zenml/container_engines/docker_engine.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def login(
    self,
    username: str,
    password: str,
    registry: str,
) -> None:
    """Authenticate the Docker client with the container registry credentials.

    Args:
        username: Registry username.
        password: Registry password or token.
        registry: Registry host or URI.
    """
    self.login_client(
        username=username,
        password=password,
        registry=registry,
    )
    self.login_cli(
        username=username,
        password=password,
        registry=registry,
    )
login_cli(username: str, password: str, registry: str) -> None staticmethod

Run docker login for the given registry.

Parameters:

Name Type Description Default
username str

Registry username.

required
password str

Registry password or token.

required
registry str

Registry host or URI.

required

Raises:

Type Description
AuthorizationException

If docker login fails.

Source code in src/zenml/container_engines/docker_engine.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
@staticmethod
def login_cli(
    username: str,
    password: str,
    registry: str,
) -> None:
    """Run ``docker login`` for the given registry.

    Args:
        username: Registry username.
        password: Registry password or token.
        registry: Registry host or URI.

    Raises:
        AuthorizationException: If ``docker login`` fails.
    """
    docker_login_cmd = [
        "docker",
        "login",
        "-u",
        username,
        "--password-stdin",
    ]
    if registry != DOCKERHUB_REGISTRY_URI:
        docker_login_cmd.append(registry)

    try:
        subprocess.run(
            docker_login_cmd,
            check=True,
            input=password.encode(),
        )
    except subprocess.CalledProcessError as e:
        raise AuthorizationException(
            f"Failed to authenticate to Docker registry '{registry}': {e}"
        ) from e
login_client(username: str, password: str, registry: str) -> None

Login to a Docker registry.

Parameters:

Name Type Description Default
username str

Registry username.

required
password str

Registry password or token.

required
registry str

Registry URI for login.

required

Raises:

Type Description
AuthorizationException

If registry login fails.

Source code in src/zenml/container_engines/docker_engine.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def login_client(
    self,
    username: str,
    password: str,
    registry: str,
) -> None:
    """Login to a Docker registry.

    Args:
        username: Registry username.
        password: Registry password or token.
        registry: Registry URI for login.

    Raises:
        AuthorizationException: If registry login fails.
    """
    try:
        self.client.login(
            username=username,
            password=password,
            registry=registry
            if registry != DOCKERHUB_REGISTRY_URI
            else None,
            reauth=True,
        )
    except DockerException as e:
        raise AuthorizationException(
            f"failed to authenticate with Docker registry {registry}: {e}"
        )
process_docker_stream(stream: Iterable[bytes]) -> List[Dict[str, Any]] staticmethod

Decode JSON lines from a docker-py build/push stream.

Parameters:

Name Type Description Default
stream Iterable[bytes]

Raw byte chunks from docker API build/push.

required

Raises:

Type Description
RuntimeError

If the stream contains an error payload.

Returns:

Type Description
List[Dict[str, Any]]

Auxiliary aux objects collected from the stream.

Source code in src/zenml/container_engines/docker_engine.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
@staticmethod
def process_docker_stream(stream: Iterable[bytes]) -> List[Dict[str, Any]]:
    """Decode JSON lines from a docker-py build/push stream.

    Args:
        stream: Raw byte chunks from ``docker`` API ``build``/``push``.

    Raises:
        RuntimeError: If the stream contains an ``error`` payload.

    Returns:
        Auxiliary ``aux`` objects collected from the stream.
    """
    auxiliary_info = []

    for element in stream:
        lines = element.decode("utf-8").strip().split("\n")

        for line in lines:
            try:
                line_json = json.loads(line)
                if "error" in line_json:
                    raise RuntimeError(
                        f"Docker error: {line_json['error']}."
                    )
                elif "stream" in line_json:
                    text = line_json["stream"].strip()
                    if "ERROR" in text:
                        logger.error(text)
                    elif re.match(r"^Step [0-9]+/[0-9]+", text):
                        logger.info(text)
                    else:
                        logger.debug(text)
                elif "aux" in line_json:
                    auxiliary_info.append(line_json["aux"])
            except json.JSONDecodeError as error:
                logger.warning(
                    "Failed to decode json for line '%s': %s", line, error
                )

    return auxiliary_info
push_image(image_name: str, container_registry: Optional[BaseContainerRegistry] = None) -> str

Push an image and return a digest or image reference string.

Parameters:

Name Type Description Default
image_name str

Local image to push (must match registry URI).

required
container_registry Optional[BaseContainerRegistry]

Container registry to push to. Used for validation and credentials, if provided.

None

Returns:

Type Description
str

Repository digest from docker-py when available; otherwise a

str

string identity for the pushed image.

Raises:

Type Description
RuntimeError

If push fails or the image digest is not found.

Source code in src/zenml/container_engines/docker_engine.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
def push_image(
    self,
    image_name: str,
    container_registry: Optional["BaseContainerRegistry"] = None,
) -> str:
    """Push an image and return a digest or image reference string.

    Args:
        image_name: Local image to push (must match registry URI).
        container_registry: Container registry to push to. Used for
            validation and credentials, if provided.

    Returns:
        Repository digest from docker-py when available; otherwise a
        string identity for the pushed image.

    Raises:
        RuntimeError: If push fails or the image digest is not found.
    """
    if (
        container_registry
        and container_registry.is_valid_image_name_for_registry(image_name)
    ):
        self.login_registry(container_registry)
        container_registry.prepare_image_push(image_name)

    logger.info("Pushing Docker image `%s`.", image_name)
    output_stream = self.client.images.push(image_name, stream=True)
    aux_info = self.process_docker_stream(output_stream)
    logger.info("Finished pushing Docker image.")

    image_name_without_tag, _ = image_name.rsplit(":", maxsplit=1)
    prefix_candidates = [f"{image_name_without_tag}@"]

    if image_name_without_tag.startswith(
        ("index.docker.io/", "docker.io/")
    ):
        image_name_without_index = image_name_without_tag.split(
            "/", maxsplit=1
        )[1]
        prefix_candidates.append(f"{image_name_without_index}@")

    image = self.client.images.get(image_name)
    repo_digests: List[str] = image.attrs["RepoDigests"]

    for digest in repo_digests:
        if digest.startswith(tuple(prefix_candidates)):
            return digest

    for info in reversed(aux_info):
        try:
            repo_digest = info["Digest"]
            return f"{image_name_without_tag}@{repo_digest}"
        except KeyError:
            pass
    raise RuntimeError(
        f"Unable to find repo digest after pushing image {image_name}."
    )
tag_image(source: str, target: str) -> None

Apply a local tag using the Docker engine.

Parameters:

Name Type Description Default
source str

Existing image reference.

required
target str

Additional name:tag for the same image.

required
Source code in src/zenml/container_engines/docker_engine.py
333
334
335
336
337
338
339
340
341
def tag_image(self, source: str, target: str) -> None:
    """Apply a local tag using the Docker engine.

    Args:
        source: Existing image reference.
        target: Additional name:tag for the same image.
    """
    image = self.client.images.get(source)
    image.tag(target)

PodmanContainerEngine

Bases: ContainerEngine

Container engine using the Podman CLI.

Attributes
engine_type: ContainerEngineType property

The type of the container engine.

Returns:

Type Description
ContainerEngineType

The type of the container engine.

Functions
build(image_name: str, build_context: BuildContext, docker_build_options: Optional[DockerBuildOptions] = None, container_registry: Optional[BaseContainerRegistry] = None, **kwargs: Any) -> None

Build an image using podman build with a tar context on stdin.

Parameters:

Name Type Description Default
image_name str

-t tag for the built image.

required
build_context BuildContext

Tarball written to build stdin.

required
docker_build_options Optional[DockerBuildOptions]

Optional flags (Docker CLI compatible).

None
container_registry Optional[BaseContainerRegistry]

When set, runs podman login if creds exist.

None
**kwargs Any

Additional keyword arguments.

{}

Raises:

Type Description
RuntimeError

If podman build exits non-zero.

Source code in src/zenml/container_engines/podman_engine.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def build(
    self,
    image_name: str,
    build_context: "BuildContext",
    docker_build_options: Optional["DockerBuildOptions"] = None,
    container_registry: Optional["BaseContainerRegistry"] = None,
    **kwargs: Any,
) -> None:
    """Build an image using ``podman build`` with a tar context on stdin.

    Args:
        image_name: ``-t`` tag for the built image.
        build_context: Tarball written to build stdin.
        docker_build_options: Optional flags (Docker CLI compatible).
        container_registry: When set, runs ``podman login`` if creds exist.
        **kwargs: Additional keyword arguments.

    Raises:
        RuntimeError: If ``podman build`` exits non-zero.
    """
    if container_registry:
        self.login_registry(container_registry)

    with tempfile.TemporaryFile(mode="w+b") as f:
        build_context.write_archive(f)
        f.seek(0)
        command = ["podman", "build", "-", "-t", image_name]
        if docker_build_options:
            command.extend(docker_build_options.to_docker_cli_options())
        process = subprocess.Popen(command, stdin=f)
        result = process.wait()
        if result != 0:
            raise RuntimeError(
                f"Failed to build image {image_name} with `podman`. "
                "Please check the logs above for more information."
            )
check_availability() -> Tuple[bool, str | None]

Check whether this engine is installed and accessible and return the error message if not.

Returns:

Type Description
bool

True if the engine binary and runtime are likely available, else

str | None

False and an error message if not.

Source code in src/zenml/container_engines/podman_engine.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def check_availability(self) -> Tuple[bool, str | None]:
    """Check whether this engine is installed and accessible and return the error message if not.

    Returns:
        ``True`` if the engine binary and runtime are likely available, else
        ``False`` and an error message if not.
    """
    if not shutil.which("podman"):
        return (
            False,
            "`podman` is not installed or not accessible on your system.",
        )

    result = subprocess.run(
        ["podman", "info"],
        capture_output=True,
        text=True,
        check=False,
    )
    if result.returncode != 0:
        return (
            False,
            "Podman is installed but `podman info` failed: "
            f"{result.stderr or result.stdout}",
        )

    return (True, None)
get_image_repo_digest(image_name: str, container_registry: Optional[BaseContainerRegistry] = None) -> Optional[str]

Always return None; Podman cannot reliably report repo digests.

Parameters:

Name Type Description Default
image_name str

Unused; accepted for API symmetry.

required
container_registry Optional[BaseContainerRegistry]

Unused; accepted for API symmetry.

None

Returns:

Type Description
Optional[str]

None because remote manifest digests are not reliable for this

Optional[str]

engine (https://github.com/containers/podman/issues/14779).

Source code in src/zenml/container_engines/podman_engine.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def get_image_repo_digest(
    self,
    image_name: str,
    container_registry: Optional["BaseContainerRegistry"] = None,
) -> Optional[str]:
    """Always return ``None``; Podman cannot reliably report repo digests.

    Args:
        image_name: Unused; accepted for API symmetry.
        container_registry: Unused; accepted for API symmetry.

    Returns:
        ``None`` because remote manifest digests are not reliable for this
        engine (https://github.com/containers/podman/issues/14779).
    """
    return None
is_image_local(image_name: str) -> bool

Return whether Podman considers the image local-only.

Parameters:

Name Type Description Default
image_name str

Image reference to inspect.

required

Returns:

Type Description
bool

True if the image matches the local-only heuristic.

Source code in src/zenml/container_engines/podman_engine.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def is_image_local(self, image_name: str) -> bool:
    """Return whether Podman considers the image local-only.

    Args:
        image_name: Image reference to inspect.

    Returns:
        ``True`` if the image matches the local-only heuristic.
    """
    result = subprocess.run(
        [
            "podman",
            "image",
            "inspect",
            image_name,
            "--format",
            "{{json .RepoDigests}}",
        ],
        capture_output=True,
        text=True,
        check=False,
    )
    if result.returncode != 0:
        logger.error("Podman image inspect failed for '%s'.", image_name)
        return False

    try:
        repo_digests: List[str] = json.loads(result.stdout)
    except json.JSONDecodeError as e:
        logger.error(
            "Failed to parse podman image inspect JSON for '%s': %s",
            image_name,
            e,
        )
        return False

    repo_digests = [
        digest
        for digest in repo_digests
        if not digest.startswith("localhost/")
    ]

    return len(repo_digests) == 0
login(username: str, password: str, registry: str) -> None

Run podman login for the given registry.

Parameters:

Name Type Description Default
username str

Registry username.

required
password str

Registry password or token.

required
registry str

Registry host or URI.

required

Raises:

Type Description
AuthorizationException

If podman login fails.

Source code in src/zenml/container_engines/podman_engine.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def login(
    self,
    username: str,
    password: str,
    registry: str,
) -> None:
    """Run ``podman login`` for the given registry.

    Args:
        username: Registry username.
        password: Registry password or token.
        registry: Registry host or URI.

    Raises:
        AuthorizationException: If ``podman login`` fails.
    """
    try:
        subprocess.run(
            [
                "podman",
                "login",
                registry,
                "--username",
                username,
                "--password-stdin",
            ],
            input=password.encode(),
            check=True,
        )
    except subprocess.CalledProcessError as e:
        raise AuthorizationException(
            f"Podman login failed for {registry}: {e}"
        ) from e
push_image(image_name: str, container_registry: Optional[BaseContainerRegistry] = None) -> str

Push an image and return a surrogate reference when digest is absent.

Podman may not report a correct remote manifest digest; this method pushes a uniquely tagged image and returns that reference as a stable surrogate (see https://github.com/containers/podman/issues/14779).

Parameters:

Name Type Description Default
image_name str

Local image to push.

required
container_registry Optional[BaseContainerRegistry]

Container registry to push to. Used for validation and credentials, if provided.

None

Returns:

Type Description
str

A unique image reference string used as the push identity.

Source code in src/zenml/container_engines/podman_engine.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def push_image(
    self,
    image_name: str,
    container_registry: Optional["BaseContainerRegistry"] = None,
) -> str:
    """Push an image and return a surrogate reference when digest is absent.

    Podman may not report a correct remote manifest digest; this method
    pushes a uniquely tagged image and returns that reference as a stable
    surrogate (see https://github.com/containers/podman/issues/14779).

    Args:
        image_name: Local image to push.
        container_registry: Container registry to push to. Used for
            validation and credentials, if provided.

    Returns:
        A unique image reference string used as the push identity.
    """
    if (
        container_registry
        and container_registry.is_valid_image_name_for_registry(image_name)
    ):
        self.login_registry(container_registry)
        container_registry.prepare_image_push(image_name)

    self._push_image_cli(image_name)

    unique_tag = uuid.uuid4()
    image_name_without_tag, _ = image_name.rsplit(":", maxsplit=1)
    unique_image_name = f"{image_name_without_tag}:{unique_tag}"
    self.tag_image(image_name, unique_image_name)
    self._push_image_cli(unique_image_name)
    return unique_image_name
tag_image(source: str, target: str) -> None

Apply a local tag using Podman.

Parameters:

Name Type Description Default
source str

Existing image reference.

required
target str

Additional name:tag for the same image.

required

Raises:

Type Description
RuntimeError

If podman tag fails.

Source code in src/zenml/container_engines/podman_engine.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def tag_image(self, source: str, target: str) -> None:
    """Apply a local tag using Podman.

    Args:
        source: Existing image reference.
        target: Additional name:tag for the same image.

    Raises:
        RuntimeError: If ``podman tag`` fails.
    """
    try:
        subprocess.run(
            ["podman", "tag", source, target],
            check=True,
        )
    except subprocess.CalledProcessError as e:
        raise RuntimeError(f"Podman tag failed for {source}: {e}") from e

Functions

check_container_engine(engine_type: ContainerEngineType) -> Tuple[bool, str | None]

Check whether the container engine is available.

Parameters:

Name Type Description Default
engine_type ContainerEngineType

Docker or Podman engine to probe.

required

Returns:

Type Description
bool

Availability flag and, when unavailable, an error message from the

str | None

engine implementation (otherwise None).

Source code in src/zenml/container_engines/factory.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def check_container_engine(
    engine_type: ContainerEngineType,
) -> Tuple[bool, str | None]:
    """Check whether the container engine is available.

    Args:
        engine_type: Docker or Podman engine to probe.

    Returns:
        Availability flag and, when unavailable, an error message from the
        engine implementation (otherwise ``None``).
    """
    engine = CONTAINER_ENGINE_CLASSES[engine_type]()
    return engine.check_availability()

get_container_engine(engine_type: Optional[ContainerEngineType] = None) -> ContainerEngine

Return the configured container engine, or auto-detect.

Parameters:

Name Type Description Default
engine_type Optional[ContainerEngineType]

When set, use this engine instead of global configuration or auto-detection.

None

Returns:

Name Type Description
ContainerEngine ContainerEngine

Ready-to-use engine instance.

Raises:

Type Description
RuntimeError

If the requested engine is unavailable or neither engine works in auto mode.

Source code in src/zenml/container_engines/factory.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def get_container_engine(
    engine_type: Optional[ContainerEngineType] = None,
) -> ContainerEngine:
    """Return the configured container engine, or auto-detect.

    Args:
        engine_type: When set, use this engine instead of global configuration
            or auto-detection.

    Returns:
        ContainerEngine: Ready-to-use engine instance.

    Raises:
        RuntimeError: If the requested engine is unavailable or neither
            engine works in auto mode.
    """
    requested = engine_type or GlobalConfiguration().container_engine
    if not requested:
        choices = [ContainerEngineType.DOCKER, ContainerEngineType.PODMAN]
    else:
        choices = [requested]

    error_message: str | None = None
    for candidate in choices:
        engine = CONTAINER_ENGINE_CLASSES[candidate]()
        is_available, error_message = engine.check_availability()
        if is_available:
            return engine

    if requested:
        raise RuntimeError(
            f"The requested container engine {requested} is not available. "
            f"Make sure it is installed and accessible on your system: "
            f"{error_message}"
        )

    raise RuntimeError(
        "No container engine is available. Install and start Docker, or "
        "install Podman."
    )

Modules

base

Abstract container engine for local OCI-compatible tooling.

Classes
ContainerEngine

Bases: ABC

Client-side abstraction over Docker/Podman (build, tag, push, inspect).

Implementations encapsulate OCI client details; callers use the factory functions in :mod:zenml.container_engines.factory for the active engine.

Attributes
engine_type: ContainerEngineType abstractmethod property

The type of the container engine.

Returns:

Type Description
ContainerEngineType

The type of the container engine.

Functions
build(image_name: str, build_context: BuildContext, docker_build_options: Optional[DockerBuildOptions] = None, container_registry: Optional[BaseContainerRegistry] = None, **kwargs: Any) -> None abstractmethod

Build an image locally from a tarball build context.

Parameters:

Name Type Description Default
image_name str

Target image reference including tag.

required
build_context BuildContext

Archive and Dockerfile content for the build.

required
docker_build_options Optional[DockerBuildOptions]

Optional Docker-compatible build options.

None
container_registry Optional[BaseContainerRegistry]

When set, registry credentials are applied for authenticated base-image pulls during the build.

None
**kwargs Any

Additional keyword arguments.

{}

Returns:

Type Description
None

None

Raises:

Type Description
RuntimeError

If the build fails (implementation-specific).

Source code in src/zenml/container_engines/base.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
@abstractmethod
def build(
    self,
    image_name: str,
    build_context: "BuildContext",
    docker_build_options: Optional["DockerBuildOptions"] = None,
    container_registry: Optional["BaseContainerRegistry"] = None,
    **kwargs: Any,
) -> None:
    """Build an image locally from a tarball build context.

    Args:
        image_name: Target image reference including tag.
        build_context: Archive and Dockerfile content for the build.
        docker_build_options: Optional Docker-compatible build options.
        container_registry: When set, registry credentials are applied for
            authenticated base-image pulls during the build.
        **kwargs: Additional keyword arguments.

    Returns:
        None

    Raises:
        RuntimeError: If the build fails (implementation-specific).
    """
check_availability() -> Tuple[bool, str | None] abstractmethod

Check whether this engine is installed and accessible and return the error message if not.

Returns:

Type Description
bool

True if the engine binary and runtime are likely available, else

str | None

False and an error message if not.

Source code in src/zenml/container_engines/base.py
50
51
52
53
54
55
56
57
@abstractmethod
def check_availability(self) -> Tuple[bool, str | None]:
    """Check whether this engine is installed and accessible and return the error message if not.

    Returns:
        ``True`` if the engine binary and runtime are likely available, else
        ``False`` and an error message if not.
    """
get_image_repo_digest(image_name: str, container_registry: Optional[BaseContainerRegistry] = None) -> Optional[str] abstractmethod

Resolve the repository digest for an image when the client can.

Parameters:

Name Type Description Default
image_name str

Image reference (may include tag).

required
container_registry Optional[BaseContainerRegistry]

When set and the name matches the registry, credentials from this component are used for registry access.

None

Returns:

Type Description
Optional[str]

Repo digest string (e.g. sha256:...) when resolvable, else

Optional[str]

None.

Source code in src/zenml/container_engines/base.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
@abstractmethod
def get_image_repo_digest(
    self,
    image_name: str,
    container_registry: Optional["BaseContainerRegistry"] = None,
) -> Optional[str]:
    """Resolve the repository digest for an image when the client can.

    Args:
        image_name: Image reference (may include tag).
        container_registry: When set and the name matches the registry,
            credentials from this component are used for registry access.

    Returns:
        Repo digest string (e.g. ``sha256:...``) when resolvable, else
        ``None``.
    """
is_image_local(image_name: str) -> bool abstractmethod

Return whether the image appears to exist only locally.

Parameters:

Name Type Description Default
image_name str

Image reference to inspect.

required

Returns:

Type Description
bool

True if the image is present locally without a resolved remote

bool

repo digest heuristic; False otherwise.

Source code in src/zenml/container_engines/base.py
153
154
155
156
157
158
159
160
161
162
163
@abstractmethod
def is_image_local(self, image_name: str) -> bool:
    """Return whether the image appears to exist only locally.

    Args:
        image_name: Image reference to inspect.

    Returns:
        ``True`` if the image is present locally without a resolved remote
        repo digest heuristic; ``False`` otherwise.
    """
login(username: str, password: str, registry: str) -> None abstractmethod

Authenticate the container engine with the container registry credentials.

Parameters:

Name Type Description Default
username str

Registry username.

required
password str

Registry password or token.

required
registry str

Registry host or URI.

required
Source code in src/zenml/container_engines/base.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@abstractmethod
def login(
    self,
    username: str,
    password: str,
    registry: str,
) -> None:
    """Authenticate the container engine with the container registry credentials.

    Args:
        username: Registry username.
        password: Registry password or token.
        registry: Registry host or URI.
    """
login_registry(container_registry: BaseContainerRegistry) -> None

Authenticate the container engine with the container registry credentials.

Parameters:

Name Type Description Default
container_registry BaseContainerRegistry

Container registry to authenticate with.

required
Source code in src/zenml/container_engines/base.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def login_registry(
    self, container_registry: "BaseContainerRegistry"
) -> None:
    """Authenticate the container engine with the container registry credentials.

    Args:
        container_registry: Container registry to authenticate with.
    """
    credentials = container_registry.credentials
    if credentials:
        username, password = credentials
        self.login(
            username=username,
            password=password,
            registry=container_registry.config.uri,
        )
parse_dockerignore(dockerignore_path: str) -> List[str] staticmethod

Parses a dockerignore file and returns a list of patterns to ignore.

Parameters:

Name Type Description Default
dockerignore_path str

Path to the dockerignore file.

required

Returns:

Type Description
List[str]

List of patterns to ignore.

Source code in src/zenml/container_engines/base.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
@staticmethod
def parse_dockerignore(dockerignore_path: str) -> List[str]:
    """Parses a dockerignore file and returns a list of patterns to ignore.

    Args:
        dockerignore_path: Path to the dockerignore file.

    Returns:
        List of patterns to ignore.
    """
    try:
        file_content = io_utils.read_file_contents_as_string(
            dockerignore_path
        )
    except FileNotFoundError:
        logger.warning(
            "Unable to find dockerignore file at path '%s'.",
            dockerignore_path,
        )
        return []

    exclude_patterns = []
    for line in file_content.split("\n"):
        line = line.strip()
        if line and not line.startswith("#"):
            exclude_patterns.append(line)

    return exclude_patterns
push_image(image_name: str, container_registry: Optional[BaseContainerRegistry] = None) -> str abstractmethod

Push a local image to the configured registry.

Parameters:

Name Type Description Default
image_name str

Full image reference including tag.

required
container_registry Optional[BaseContainerRegistry]

Container registry to push to. Used for validation and credentials, if provided.

None

Returns:

Type Description
str

Repository digest when the engine reports one; otherwise an

str

implementation-defined stable surrogate (e.g. unique tag).

Raises:

Type Description
RuntimeError

If authentication or push fails.

Source code in src/zenml/container_engines/base.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
@abstractmethod
def push_image(
    self,
    image_name: str,
    container_registry: Optional["BaseContainerRegistry"] = None,
) -> str:
    """Push a local image to the configured registry.

    Args:
        image_name: Full image reference including tag.
        container_registry: Container registry to push to. Used for
            validation and credentials, if provided.

    Returns:
        Repository digest when the engine reports one; otherwise an
        implementation-defined stable surrogate (e.g. unique tag).

    Raises:
        RuntimeError: If authentication or push fails.
    """
sanitize_tag(tag: str) -> str staticmethod

Sanitize a Docker tag.

Parameters:

Name Type Description Default
tag str

The tag to sanitize.

required

Raises:

Type Description
ValueError

If the tag is empty.

Returns:

Type Description
str

The sanitized tag.

Source code in src/zenml/container_engines/base.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
@staticmethod
def sanitize_tag(tag: str) -> str:
    """Sanitize a Docker tag.

    Args:
        tag: The tag to sanitize.

    Raises:
        ValueError: If the tag is empty.

    Returns:
        The sanitized tag.
    """
    tag = re.sub(r"[^a-zA-Z0-9_.\-]", "", tag)
    # Tags can not start with a dot or a dash
    tag = re.sub(r"^[-.]", "_", tag)

    if not tag:
        raise ValueError("Docker image tag cannot be empty.")

    return tag[:128]
tag_image(source: str, target: str) -> None abstractmethod

Apply an additional local name/tag to an existing local image.

Parameters:

Name Type Description Default
source str

Existing local image reference.

required
target str

New local reference (alias) for the same image.

required

Returns:

Type Description
None

None

Raises:

Type Description
RuntimeError

If the engine cannot apply the tag.

Source code in src/zenml/container_engines/base.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
@abstractmethod
def tag_image(self, source: str, target: str) -> None:
    """Apply an additional local name/tag to an existing local image.

    Args:
        source: Existing local image reference.
        target: New local reference (alias) for the same image.

    Returns:
        None

    Raises:
        RuntimeError: If the engine cannot apply the tag.
    """
Functions
Modules

docker_engine

Docker-backed :class:ContainerEngine implementation.

Classes
DockerContainerEngine()

Bases: ContainerEngine

Container engine using the Docker CLI and/or docker-py.

Create a Docker engine (registry auth is per-operation).

Source code in src/zenml/container_engines/docker_engine.py
54
55
56
def __init__(self) -> None:
    """Create a Docker engine (registry auth is per-operation)."""
    self._client: Optional[DockerClient] = None
Attributes
client: DockerClient property

Return a docker-py client, reusing a cached unauthenticated instance.

Returns:

Name Type Description
DockerClient DockerClient

Connection to the local Docker daemon.

Raises:

Type Description
RuntimeError

If docker-py cannot create a client from the environment (for example when the daemon is not running).

engine_type: ContainerEngineType property

The type of the container engine.

Returns:

Type Description
ContainerEngineType

The type of the container engine.

Functions
build(image_name: str, build_context: BuildContext, docker_build_options: Optional[DockerBuildOptions] = None, container_registry: Optional[BaseContainerRegistry] = None, **kwargs: Any) -> None

Build an image using docker-py or docker build -.

Parameters:

Name Type Description Default
image_name str

Target image reference including tag.

required
build_context BuildContext

Tarball context written to the Docker daemon.

required
docker_build_options Optional[DockerBuildOptions]

Optional Docker build options.

None
container_registry Optional[BaseContainerRegistry]

When set, applies registry credentials for pulls.

None
**kwargs Any

Additional keyword arguments.

{}

Returns:

Type Description
None

None

Source code in src/zenml/container_engines/docker_engine.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def build(
    self,
    image_name: str,
    build_context: "BuildContext",
    docker_build_options: Optional["DockerBuildOptions"] = None,
    container_registry: Optional["BaseContainerRegistry"] = None,
    **kwargs: Any,
) -> None:
    """Build an image using docker-py or ``docker build -``.

    Args:
        image_name: Target image reference including tag.
        build_context: Tarball context written to the Docker daemon.
        docker_build_options: Optional Docker build options.
        container_registry: When set, applies registry credentials for pulls.
        **kwargs: Additional keyword arguments.

    Returns:
        None
    """
    if container_registry:
        self.login_registry(container_registry)

    if kwargs.get("use_subprocess", False):
        self._build_cli(image_name, build_context, docker_build_options)
        return

    build_options = (
        docker_build_options.to_docker_python_sdk_options()
        if docker_build_options
        else {}
    )
    with tempfile.TemporaryFile(mode="w+b") as f:
        build_context.write_archive(f)
        f.seek(0)
        output_stream = self.client.images.client.api.build(
            fileobj=f,
            custom_context=True,
            tag=image_name,
            **build_options,
        )
    self.process_docker_stream(output_stream)
check_availability() -> Tuple[bool, str | None]

Check whether this engine is installed and accessible and return the error message if not.

Returns:

Type Description
bool

True if the engine binary and runtime are likely available, else

str | None

False and an error message if not.

Source code in src/zenml/container_engines/docker_engine.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def check_availability(self) -> Tuple[bool, str | None]:
    """Check whether this engine is installed and accessible and return the error message if not.

    Returns:
        ``True`` if the engine binary and runtime are likely available, else
        ``False`` and an error message if not.
    """
    if not shutil.which("docker"):
        return (
            False,
            "`docker` is not installed or not accessible on your system.",
        )

    try:
        self.client.ping()
        return (True, None)
    except Exception:
        return (
            False,
            "Unable to connect to the Docker daemon. There are three "
            "common causes for this:\n"
            "1) The Docker daemon isn't running.\n"
            "2) The Docker client isn't configured correctly. The client "
            "loads its configuration from the following file: "
            "$HOME/.docker/config.json. If your configuration file is in a "
            "different location, set the `DOCKER_CONFIG` environment "
            "variable to the directory that contains your `config.json` "
            "file.\n"
            "3) If your Docker CLI is working fine but you ran into this "
            "issue, you might be using a non-default Docker context which "
            "is not supported by the Docker python library. To verify "
            "this, run `docker context ls` and check which context has a "
            "`*` next to it. If this is not the `default` context, copy "
            "the `DOCKER ENDPOINT` value of that context and set the "
            "`DOCKER_HOST` environment variable to that value.",
        )
close() -> None

Close the cached docker-py client if one was opened.

Source code in src/zenml/container_engines/docker_engine.py
448
449
450
451
452
def close(self) -> None:
    """Close the cached docker-py client if one was opened."""
    if self._client is not None:
        self._client.close()
        self._client = None
get_image_repo_digest(image_name: str, container_registry: Optional[BaseContainerRegistry] = None) -> Optional[str]

Resolve a repository digest using docker-py (and registry creds).

Parameters:

Name Type Description Default
image_name str

Image reference to resolve.

required
container_registry Optional[BaseContainerRegistry]

When the name matches this registry, its credentials authenticate registry API access.

None

Returns:

Type Description
Optional[str]

Digest string when resolvable; None if not found or unknown.

Source code in src/zenml/container_engines/docker_engine.py
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
def get_image_repo_digest(
    self,
    image_name: str,
    container_registry: Optional["BaseContainerRegistry"] = None,
) -> Optional[str]:
    """Resolve a repository digest using docker-py (and registry creds).

    Args:
        image_name: Image reference to resolve.
        container_registry: When the name matches this registry, its
            credentials authenticate registry API access.

    Returns:
        Digest string when resolvable; ``None`` if not found or unknown.
    """
    if (
        container_registry
        and container_registry.is_valid_image_name_for_registry(image_name)
    ):
        self.login_registry(container_registry)

    try:
        metadata = self.client.images.get_registry_data(image_name)
    except Exception:
        return None

    return cast(str, metadata.id.split(":")[-1])
is_image_local(image_name: str) -> bool

Return whether Docker considers the image local-only.

Parameters:

Name Type Description Default
image_name str

Image reference to inspect.

required

Returns:

Type Description
bool

True if the image is local without a resolved repo digest.

Source code in src/zenml/container_engines/docker_engine.py
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
def is_image_local(self, image_name: str) -> bool:
    """Return whether Docker considers the image local-only.

    Args:
        image_name: Image reference to inspect.

    Returns:
        ``True`` if the image is local without a resolved repo digest.
    """
    images = self.client.images.list(name=image_name)
    if not images:
        return False

    image = self.client.images.get(image_name)
    repo_digests = image.attrs["RepoDigests"]

    return len(repo_digests) == 0
login(username: str, password: str, registry: str) -> None

Authenticate the Docker client with the container registry credentials.

Parameters:

Name Type Description Default
username str

Registry username.

required
password str

Registry password or token.

required
registry str

Registry host or URI.

required
Source code in src/zenml/container_engines/docker_engine.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def login(
    self,
    username: str,
    password: str,
    registry: str,
) -> None:
    """Authenticate the Docker client with the container registry credentials.

    Args:
        username: Registry username.
        password: Registry password or token.
        registry: Registry host or URI.
    """
    self.login_client(
        username=username,
        password=password,
        registry=registry,
    )
    self.login_cli(
        username=username,
        password=password,
        registry=registry,
    )
login_cli(username: str, password: str, registry: str) -> None staticmethod

Run docker login for the given registry.

Parameters:

Name Type Description Default
username str

Registry username.

required
password str

Registry password or token.

required
registry str

Registry host or URI.

required

Raises:

Type Description
AuthorizationException

If docker login fails.

Source code in src/zenml/container_engines/docker_engine.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
@staticmethod
def login_cli(
    username: str,
    password: str,
    registry: str,
) -> None:
    """Run ``docker login`` for the given registry.

    Args:
        username: Registry username.
        password: Registry password or token.
        registry: Registry host or URI.

    Raises:
        AuthorizationException: If ``docker login`` fails.
    """
    docker_login_cmd = [
        "docker",
        "login",
        "-u",
        username,
        "--password-stdin",
    ]
    if registry != DOCKERHUB_REGISTRY_URI:
        docker_login_cmd.append(registry)

    try:
        subprocess.run(
            docker_login_cmd,
            check=True,
            input=password.encode(),
        )
    except subprocess.CalledProcessError as e:
        raise AuthorizationException(
            f"Failed to authenticate to Docker registry '{registry}': {e}"
        ) from e
login_client(username: str, password: str, registry: str) -> None

Login to a Docker registry.

Parameters:

Name Type Description Default
username str

Registry username.

required
password str

Registry password or token.

required
registry str

Registry URI for login.

required

Raises:

Type Description
AuthorizationException

If registry login fails.

Source code in src/zenml/container_engines/docker_engine.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def login_client(
    self,
    username: str,
    password: str,
    registry: str,
) -> None:
    """Login to a Docker registry.

    Args:
        username: Registry username.
        password: Registry password or token.
        registry: Registry URI for login.

    Raises:
        AuthorizationException: If registry login fails.
    """
    try:
        self.client.login(
            username=username,
            password=password,
            registry=registry
            if registry != DOCKERHUB_REGISTRY_URI
            else None,
            reauth=True,
        )
    except DockerException as e:
        raise AuthorizationException(
            f"failed to authenticate with Docker registry {registry}: {e}"
        )
process_docker_stream(stream: Iterable[bytes]) -> List[Dict[str, Any]] staticmethod

Decode JSON lines from a docker-py build/push stream.

Parameters:

Name Type Description Default
stream Iterable[bytes]

Raw byte chunks from docker API build/push.

required

Raises:

Type Description
RuntimeError

If the stream contains an error payload.

Returns:

Type Description
List[Dict[str, Any]]

Auxiliary aux objects collected from the stream.

Source code in src/zenml/container_engines/docker_engine.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
@staticmethod
def process_docker_stream(stream: Iterable[bytes]) -> List[Dict[str, Any]]:
    """Decode JSON lines from a docker-py build/push stream.

    Args:
        stream: Raw byte chunks from ``docker`` API ``build``/``push``.

    Raises:
        RuntimeError: If the stream contains an ``error`` payload.

    Returns:
        Auxiliary ``aux`` objects collected from the stream.
    """
    auxiliary_info = []

    for element in stream:
        lines = element.decode("utf-8").strip().split("\n")

        for line in lines:
            try:
                line_json = json.loads(line)
                if "error" in line_json:
                    raise RuntimeError(
                        f"Docker error: {line_json['error']}."
                    )
                elif "stream" in line_json:
                    text = line_json["stream"].strip()
                    if "ERROR" in text:
                        logger.error(text)
                    elif re.match(r"^Step [0-9]+/[0-9]+", text):
                        logger.info(text)
                    else:
                        logger.debug(text)
                elif "aux" in line_json:
                    auxiliary_info.append(line_json["aux"])
            except json.JSONDecodeError as error:
                logger.warning(
                    "Failed to decode json for line '%s': %s", line, error
                )

    return auxiliary_info
push_image(image_name: str, container_registry: Optional[BaseContainerRegistry] = None) -> str

Push an image and return a digest or image reference string.

Parameters:

Name Type Description Default
image_name str

Local image to push (must match registry URI).

required
container_registry Optional[BaseContainerRegistry]

Container registry to push to. Used for validation and credentials, if provided.

None

Returns:

Type Description
str

Repository digest from docker-py when available; otherwise a

str

string identity for the pushed image.

Raises:

Type Description
RuntimeError

If push fails or the image digest is not found.

Source code in src/zenml/container_engines/docker_engine.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
def push_image(
    self,
    image_name: str,
    container_registry: Optional["BaseContainerRegistry"] = None,
) -> str:
    """Push an image and return a digest or image reference string.

    Args:
        image_name: Local image to push (must match registry URI).
        container_registry: Container registry to push to. Used for
            validation and credentials, if provided.

    Returns:
        Repository digest from docker-py when available; otherwise a
        string identity for the pushed image.

    Raises:
        RuntimeError: If push fails or the image digest is not found.
    """
    if (
        container_registry
        and container_registry.is_valid_image_name_for_registry(image_name)
    ):
        self.login_registry(container_registry)
        container_registry.prepare_image_push(image_name)

    logger.info("Pushing Docker image `%s`.", image_name)
    output_stream = self.client.images.push(image_name, stream=True)
    aux_info = self.process_docker_stream(output_stream)
    logger.info("Finished pushing Docker image.")

    image_name_without_tag, _ = image_name.rsplit(":", maxsplit=1)
    prefix_candidates = [f"{image_name_without_tag}@"]

    if image_name_without_tag.startswith(
        ("index.docker.io/", "docker.io/")
    ):
        image_name_without_index = image_name_without_tag.split(
            "/", maxsplit=1
        )[1]
        prefix_candidates.append(f"{image_name_without_index}@")

    image = self.client.images.get(image_name)
    repo_digests: List[str] = image.attrs["RepoDigests"]

    for digest in repo_digests:
        if digest.startswith(tuple(prefix_candidates)):
            return digest

    for info in reversed(aux_info):
        try:
            repo_digest = info["Digest"]
            return f"{image_name_without_tag}@{repo_digest}"
        except KeyError:
            pass
    raise RuntimeError(
        f"Unable to find repo digest after pushing image {image_name}."
    )
tag_image(source: str, target: str) -> None

Apply a local tag using the Docker engine.

Parameters:

Name Type Description Default
source str

Existing image reference.

required
target str

Additional name:tag for the same image.

required
Source code in src/zenml/container_engines/docker_engine.py
333
334
335
336
337
338
339
340
341
def tag_image(self, source: str, target: str) -> None:
    """Apply a local tag using the Docker engine.

    Args:
        source: Existing image reference.
        target: Additional name:tag for the same image.
    """
    image = self.client.images.get(source)
    image.tag(target)
Functions

factory

Resolve the active :class:ContainerEngine from global settings.

Classes
Functions
check_container_engine(engine_type: ContainerEngineType) -> Tuple[bool, str | None]

Check whether the container engine is available.

Parameters:

Name Type Description Default
engine_type ContainerEngineType

Docker or Podman engine to probe.

required

Returns:

Type Description
bool

Availability flag and, when unavailable, an error message from the

str | None

engine implementation (otherwise None).

Source code in src/zenml/container_engines/factory.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def check_container_engine(
    engine_type: ContainerEngineType,
) -> Tuple[bool, str | None]:
    """Check whether the container engine is available.

    Args:
        engine_type: Docker or Podman engine to probe.

    Returns:
        Availability flag and, when unavailable, an error message from the
        engine implementation (otherwise ``None``).
    """
    engine = CONTAINER_ENGINE_CLASSES[engine_type]()
    return engine.check_availability()
get_container_engine(engine_type: Optional[ContainerEngineType] = None) -> ContainerEngine

Return the configured container engine, or auto-detect.

Parameters:

Name Type Description Default
engine_type Optional[ContainerEngineType]

When set, use this engine instead of global configuration or auto-detection.

None

Returns:

Name Type Description
ContainerEngine ContainerEngine

Ready-to-use engine instance.

Raises:

Type Description
RuntimeError

If the requested engine is unavailable or neither engine works in auto mode.

Source code in src/zenml/container_engines/factory.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def get_container_engine(
    engine_type: Optional[ContainerEngineType] = None,
) -> ContainerEngine:
    """Return the configured container engine, or auto-detect.

    Args:
        engine_type: When set, use this engine instead of global configuration
            or auto-detection.

    Returns:
        ContainerEngine: Ready-to-use engine instance.

    Raises:
        RuntimeError: If the requested engine is unavailable or neither
            engine works in auto mode.
    """
    requested = engine_type or GlobalConfiguration().container_engine
    if not requested:
        choices = [ContainerEngineType.DOCKER, ContainerEngineType.PODMAN]
    else:
        choices = [requested]

    error_message: str | None = None
    for candidate in choices:
        engine = CONTAINER_ENGINE_CLASSES[candidate]()
        is_available, error_message = engine.check_availability()
        if is_available:
            return engine

    if requested:
        raise RuntimeError(
            f"The requested container engine {requested} is not available. "
            f"Make sure it is installed and accessible on your system: "
            f"{error_message}"
        )

    raise RuntimeError(
        "No container engine is available. Install and start Docker, or "
        "install Podman."
    )

podman_engine

Podman-backed :class:ContainerEngine implementation.

Classes
PodmanContainerEngine

Bases: ContainerEngine

Container engine using the Podman CLI.

Attributes
engine_type: ContainerEngineType property

The type of the container engine.

Returns:

Type Description
ContainerEngineType

The type of the container engine.

Functions
build(image_name: str, build_context: BuildContext, docker_build_options: Optional[DockerBuildOptions] = None, container_registry: Optional[BaseContainerRegistry] = None, **kwargs: Any) -> None

Build an image using podman build with a tar context on stdin.

Parameters:

Name Type Description Default
image_name str

-t tag for the built image.

required
build_context BuildContext

Tarball written to build stdin.

required
docker_build_options Optional[DockerBuildOptions]

Optional flags (Docker CLI compatible).

None
container_registry Optional[BaseContainerRegistry]

When set, runs podman login if creds exist.

None
**kwargs Any

Additional keyword arguments.

{}

Raises:

Type Description
RuntimeError

If podman build exits non-zero.

Source code in src/zenml/container_engines/podman_engine.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def build(
    self,
    image_name: str,
    build_context: "BuildContext",
    docker_build_options: Optional["DockerBuildOptions"] = None,
    container_registry: Optional["BaseContainerRegistry"] = None,
    **kwargs: Any,
) -> None:
    """Build an image using ``podman build`` with a tar context on stdin.

    Args:
        image_name: ``-t`` tag for the built image.
        build_context: Tarball written to build stdin.
        docker_build_options: Optional flags (Docker CLI compatible).
        container_registry: When set, runs ``podman login`` if creds exist.
        **kwargs: Additional keyword arguments.

    Raises:
        RuntimeError: If ``podman build`` exits non-zero.
    """
    if container_registry:
        self.login_registry(container_registry)

    with tempfile.TemporaryFile(mode="w+b") as f:
        build_context.write_archive(f)
        f.seek(0)
        command = ["podman", "build", "-", "-t", image_name]
        if docker_build_options:
            command.extend(docker_build_options.to_docker_cli_options())
        process = subprocess.Popen(command, stdin=f)
        result = process.wait()
        if result != 0:
            raise RuntimeError(
                f"Failed to build image {image_name} with `podman`. "
                "Please check the logs above for more information."
            )
check_availability() -> Tuple[bool, str | None]

Check whether this engine is installed and accessible and return the error message if not.

Returns:

Type Description
bool

True if the engine binary and runtime are likely available, else

str | None

False and an error message if not.

Source code in src/zenml/container_engines/podman_engine.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def check_availability(self) -> Tuple[bool, str | None]:
    """Check whether this engine is installed and accessible and return the error message if not.

    Returns:
        ``True`` if the engine binary and runtime are likely available, else
        ``False`` and an error message if not.
    """
    if not shutil.which("podman"):
        return (
            False,
            "`podman` is not installed or not accessible on your system.",
        )

    result = subprocess.run(
        ["podman", "info"],
        capture_output=True,
        text=True,
        check=False,
    )
    if result.returncode != 0:
        return (
            False,
            "Podman is installed but `podman info` failed: "
            f"{result.stderr or result.stdout}",
        )

    return (True, None)
get_image_repo_digest(image_name: str, container_registry: Optional[BaseContainerRegistry] = None) -> Optional[str]

Always return None; Podman cannot reliably report repo digests.

Parameters:

Name Type Description Default
image_name str

Unused; accepted for API symmetry.

required
container_registry Optional[BaseContainerRegistry]

Unused; accepted for API symmetry.

None

Returns:

Type Description
Optional[str]

None because remote manifest digests are not reliable for this

Optional[str]

engine (https://github.com/containers/podman/issues/14779).

Source code in src/zenml/container_engines/podman_engine.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def get_image_repo_digest(
    self,
    image_name: str,
    container_registry: Optional["BaseContainerRegistry"] = None,
) -> Optional[str]:
    """Always return ``None``; Podman cannot reliably report repo digests.

    Args:
        image_name: Unused; accepted for API symmetry.
        container_registry: Unused; accepted for API symmetry.

    Returns:
        ``None`` because remote manifest digests are not reliable for this
        engine (https://github.com/containers/podman/issues/14779).
    """
    return None
is_image_local(image_name: str) -> bool

Return whether Podman considers the image local-only.

Parameters:

Name Type Description Default
image_name str

Image reference to inspect.

required

Returns:

Type Description
bool

True if the image matches the local-only heuristic.

Source code in src/zenml/container_engines/podman_engine.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def is_image_local(self, image_name: str) -> bool:
    """Return whether Podman considers the image local-only.

    Args:
        image_name: Image reference to inspect.

    Returns:
        ``True`` if the image matches the local-only heuristic.
    """
    result = subprocess.run(
        [
            "podman",
            "image",
            "inspect",
            image_name,
            "--format",
            "{{json .RepoDigests}}",
        ],
        capture_output=True,
        text=True,
        check=False,
    )
    if result.returncode != 0:
        logger.error("Podman image inspect failed for '%s'.", image_name)
        return False

    try:
        repo_digests: List[str] = json.loads(result.stdout)
    except json.JSONDecodeError as e:
        logger.error(
            "Failed to parse podman image inspect JSON for '%s': %s",
            image_name,
            e,
        )
        return False

    repo_digests = [
        digest
        for digest in repo_digests
        if not digest.startswith("localhost/")
    ]

    return len(repo_digests) == 0
login(username: str, password: str, registry: str) -> None

Run podman login for the given registry.

Parameters:

Name Type Description Default
username str

Registry username.

required
password str

Registry password or token.

required
registry str

Registry host or URI.

required

Raises:

Type Description
AuthorizationException

If podman login fails.

Source code in src/zenml/container_engines/podman_engine.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def login(
    self,
    username: str,
    password: str,
    registry: str,
) -> None:
    """Run ``podman login`` for the given registry.

    Args:
        username: Registry username.
        password: Registry password or token.
        registry: Registry host or URI.

    Raises:
        AuthorizationException: If ``podman login`` fails.
    """
    try:
        subprocess.run(
            [
                "podman",
                "login",
                registry,
                "--username",
                username,
                "--password-stdin",
            ],
            input=password.encode(),
            check=True,
        )
    except subprocess.CalledProcessError as e:
        raise AuthorizationException(
            f"Podman login failed for {registry}: {e}"
        ) from e
push_image(image_name: str, container_registry: Optional[BaseContainerRegistry] = None) -> str

Push an image and return a surrogate reference when digest is absent.

Podman may not report a correct remote manifest digest; this method pushes a uniquely tagged image and returns that reference as a stable surrogate (see https://github.com/containers/podman/issues/14779).

Parameters:

Name Type Description Default
image_name str

Local image to push.

required
container_registry Optional[BaseContainerRegistry]

Container registry to push to. Used for validation and credentials, if provided.

None

Returns:

Type Description
str

A unique image reference string used as the push identity.

Source code in src/zenml/container_engines/podman_engine.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def push_image(
    self,
    image_name: str,
    container_registry: Optional["BaseContainerRegistry"] = None,
) -> str:
    """Push an image and return a surrogate reference when digest is absent.

    Podman may not report a correct remote manifest digest; this method
    pushes a uniquely tagged image and returns that reference as a stable
    surrogate (see https://github.com/containers/podman/issues/14779).

    Args:
        image_name: Local image to push.
        container_registry: Container registry to push to. Used for
            validation and credentials, if provided.

    Returns:
        A unique image reference string used as the push identity.
    """
    if (
        container_registry
        and container_registry.is_valid_image_name_for_registry(image_name)
    ):
        self.login_registry(container_registry)
        container_registry.prepare_image_push(image_name)

    self._push_image_cli(image_name)

    unique_tag = uuid.uuid4()
    image_name_without_tag, _ = image_name.rsplit(":", maxsplit=1)
    unique_image_name = f"{image_name_without_tag}:{unique_tag}"
    self.tag_image(image_name, unique_image_name)
    self._push_image_cli(unique_image_name)
    return unique_image_name
tag_image(source: str, target: str) -> None

Apply a local tag using Podman.

Parameters:

Name Type Description Default
source str

Existing image reference.

required
target str

Additional name:tag for the same image.

required

Raises:

Type Description
RuntimeError

If podman tag fails.

Source code in src/zenml/container_engines/podman_engine.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def tag_image(self, source: str, target: str) -> None:
    """Apply a local tag using Podman.

    Args:
        source: Existing image reference.
        target: Additional name:tag for the same image.

    Raises:
        RuntimeError: If ``podman tag`` fails.
    """
    try:
        subprocess.run(
            ["podman", "tag", source, target],
            check=True,
        )
    except subprocess.CalledProcessError as e:
        raise RuntimeError(f"Podman tag failed for {source}: {e}") from e
Functions