Skip to content

Services

zenml.services

Initialization of the ZenML services module.

A service is a process or set of processes that outlive a pipeline run.

Attributes

__all__ = ['ServiceConfig', 'ServiceStatus', 'ServiceEndpointProtocol', 'ServiceEndpointConfig', 'ServiceEndpointStatus', 'BaseServiceEndpoint', 'BaseService', 'ServiceEndpointHealthMonitorConfig', 'BaseServiceEndpointHealthMonitor', 'HTTPEndpointHealthMonitorConfig', 'HTTPEndpointHealthMonitor', 'TCPEndpointHealthMonitorConfig', 'TCPEndpointHealthMonitor', 'ContainerService', 'ContainerServiceConfig', 'ContainerServiceStatus', 'ContainerServiceEndpoint', 'ContainerServiceEndpointConfig', 'ContainerServiceEndpointStatus', 'LocalDaemonService', 'LocalDaemonServiceConfig', 'LocalDaemonServiceStatus', 'LocalDaemonServiceEndpointConfig', 'LocalDaemonServiceEndpointStatus', 'LocalDaemonServiceEndpoint'] module-attribute

Classes

BaseService(**attrs: Any)

Bases: BaseTypedModel

Base service class.

This class implements generic functionality concerning the life-cycle management and tracking of an external service (e.g. process, container, Kubernetes deployment etc.).

Attributes:

Name Type Description
SERVICE_TYPE ServiceType

a service type descriptor with information describing the service class. Every concrete service class must define this.

admin_state ServiceState

the administrative state of the service.

uuid UUID

unique UUID identifier for the service instance.

config ServiceConfig

service configuration

status ServiceStatus

service status

endpoint Optional[BaseServiceEndpoint]

optional service endpoint

Initialize the service instance.

Parameters:

Name Type Description Default
**attrs Any

keyword arguments.

{}
Source code in src/zenml/services/service.py
188
189
190
191
192
193
194
195
196
197
198
def __init__(
    self,
    **attrs: Any,
) -> None:
    """Initialize the service instance.

    Args:
        **attrs: keyword arguments.
    """
    super().__init__(**attrs)
    self.config.name = self.config.name or self.__class__.__name__
Attributes
is_failed: bool property

Check if the service is currently failed.

This method will actively poll the external service to get its status and will return the result.

Returns:

Type Description
bool

True if the service is in a failure state, otherwise False.

is_running: bool property

Check if the service is currently running.

This method will actively poll the external service to get its status and will return the result.

Returns:

Type Description
bool

True if the service is running and active (i.e. the endpoints are

bool

responsive, if any are configured), otherwise False.

is_stopped: bool property

Check if the service is currently stopped.

This method will actively poll the external service to get its status and will return the result.

Returns:

Type Description
bool

True if the service is stopped, otherwise False.

Functions
check_status() -> Tuple[ServiceState, str] abstractmethod

Check the the current operational state of the external service.

This method should be overridden by subclasses that implement concrete service tracking functionality.

Returns:

Type Description
ServiceState

The operational state of the external service and a message

str

providing additional information about that state (e.g. a

Tuple[ServiceState, str]

description of the error if one is encountered while checking the

Tuple[ServiceState, str]

service status).

Source code in src/zenml/services/service.py
243
244
245
246
247
248
249
250
251
252
253
254
255
@abstractmethod
def check_status(self) -> Tuple[ServiceState, str]:
    """Check the the current operational state of the external service.

    This method should be overridden by subclasses that implement
    concrete service tracking functionality.

    Returns:
        The operational state of the external service and a message
        providing additional information about that state (e.g. a
        description of the error if one is encountered while checking the
        service status).
    """
deprovision(force: bool = False) -> None

Deprovisions all resources used by the service.

Parameters:

Name Type Description Default
force bool

if True, the service will be deprovisioned even if it is in a failed state.

False

Raises:

Type Description
NotImplementedError

if the service does not implement deprovisioning functionality.

Source code in src/zenml/services/service.py
412
413
414
415
416
417
418
419
420
421
422
423
424
425
def deprovision(self, force: bool = False) -> None:
    """Deprovisions all resources used by the service.

    Args:
        force: if True, the service will be deprovisioned even if it is
            in a failed state.

    Raises:
        NotImplementedError: if the service does not implement
            deprovisioning functionality.
    """
    raise NotImplementedError(
        f"Deprovisioning resources not implemented for {self}."
    )
from_json(json_str: str) -> BaseTypedModel classmethod

Loads a service from a JSON string.

Parameters:

Name Type Description Default
json_str str

the JSON string to load from.

required

Returns:

Type Description
BaseTypedModel

The loaded service object.

Source code in src/zenml/services/service.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
@classmethod
def from_json(cls, json_str: str) -> "BaseTypedModel":
    """Loads a service from a JSON string.

    Args:
        json_str: the JSON string to load from.

    Returns:
        The loaded service object.
    """
    service_dict = json.loads(json_str)
    class_: Type[BaseService] = source_utils.load_and_validate_class(
        source=service_dict["type"], expected_class=BaseService
    )
    return class_.from_dict(service_dict)
from_model(model: ServiceResponse) -> BaseService classmethod

Loads a service from a model.

Parameters:

Name Type Description Default
model ServiceResponse

The ServiceResponse to load from.

required

Returns:

Type Description
BaseService

The loaded service object.

Raises:

Type Description
ValueError

if the service source is not found in the model.

Source code in src/zenml/services/service.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
@classmethod
def from_model(cls, model: "ServiceResponse") -> "BaseService":
    """Loads a service from a model.

    Args:
        model: The ServiceResponse to load from.

    Returns:
        The loaded service object.

    Raises:
        ValueError: if the service source is not found in the model.
    """
    if not model.service_source:
        raise ValueError("Service source not found in the model.")
    class_: Type[BaseService] = source_utils.load_and_validate_class(
        source=model.service_source, expected_class=BaseService
    )
    return class_(
        uuid=model.id,
        admin_state=model.admin_state,
        config=model.config,
        status=model.status,
        service_type=model.service_type.model_dump(),
        endpoint=model.endpoint,
    )
get_healthcheck_url() -> Optional[str]

Gets the healthcheck URL for the endpoint.

Returns:

Type Description
Optional[str]

the healthcheck URL for the endpoint

Source code in src/zenml/services/service.py
497
498
499
500
501
502
503
504
505
506
507
508
def get_healthcheck_url(self) -> Optional[str]:
    """Gets the healthcheck URL for the endpoint.

    Returns:
        the healthcheck URL for the endpoint
    """
    return (
        self.endpoint.monitor.get_healthcheck_uri(self.endpoint)
        if (self.endpoint and self.endpoint.monitor)
        and isinstance(self.endpoint.monitor, HTTPEndpointHealthMonitor)
        else None
    )
get_logs(follow: bool = False, tail: Optional[int] = None) -> Generator[str, bool, None] abstractmethod

Retrieve the service logs.

This method should be overridden by subclasses that implement concrete service tracking functionality.

Parameters:

Name Type Description Default
follow bool

if True, the logs will be streamed as they are written

False
tail Optional[int]

only retrieve the last NUM lines of log output.

None

Returns:

Type Description
None

A generator that can be accessed to get the service logs.

Source code in src/zenml/services/service.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
@abstractmethod
def get_logs(
    self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
    """Retrieve the service logs.

    This method should be overridden by subclasses that implement
    concrete service tracking functionality.

    Args:
        follow: if True, the logs will be streamed as they are written
        tail: only retrieve the last NUM lines of log output.

    Returns:
        A generator that can be accessed to get the service logs.
    """
get_prediction_url() -> Optional[str]

Gets the prediction URL for the endpoint.

Returns:

Type Description
Optional[str]

the prediction URL for the endpoint

Source code in src/zenml/services/service.py
482
483
484
485
486
487
488
489
490
491
492
493
494
495
def get_prediction_url(self) -> Optional[str]:
    """Gets the prediction URL for the endpoint.

    Returns:
        the prediction URL for the endpoint
    """
    prediction_url = None
    if isinstance(self, BaseDeploymentService) and self.prediction_url:
        prediction_url = self.prediction_url
    elif self.endpoint:
        prediction_url = (
            self.endpoint.status.uri if self.endpoint.status else None
        )
    return prediction_url
get_service_status_message() -> str

Get a service status message.

Returns:

Type Description
str

A message providing information about the current operational

str

state of the service.

Source code in src/zenml/services/service.py
310
311
312
313
314
315
316
317
318
319
320
321
def get_service_status_message(self) -> str:
    """Get a service status message.

    Returns:
        A message providing information about the current operational
        state of the service.
    """
    return (
        f"  Administrative state: `{self.admin_state.value}`\n"
        f"  Operational state: `{self.status.state.value}`\n"
        f"  Last status message: '{self.status.last_error}'\n"
    )
poll_service_status(timeout: int = 0) -> bool

Polls the external service status.

It does this until the service operational state matches the administrative state, the service enters a failed state, or the timeout is reached.

Parameters:

Name Type Description Default
timeout int

maximum time to wait for the service operational state to match the administrative state, in seconds

0

Returns:

Type Description
bool

True if the service operational state matches the administrative

bool

state, False otherwise.

Source code in src/zenml/services/service.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
def poll_service_status(self, timeout: int = 0) -> bool:
    """Polls the external service status.

    It does this until the service operational state matches the
    administrative state, the service enters a failed state, or the timeout
    is reached.

    Args:
        timeout: maximum time to wait for the service operational state
            to match the administrative state, in seconds

    Returns:
        True if the service operational state matches the administrative
        state, False otherwise.
    """
    time_remaining = timeout
    while True:
        if self.admin_state == ServiceState.ACTIVE and self.is_running:
            return True
        if self.admin_state == ServiceState.INACTIVE and self.is_stopped:
            return True
        if self.is_failed:
            return False
        if time_remaining <= 0:
            break
        time.sleep(1)
        time_remaining -= 1

    if timeout > 0:
        logger.error(
            f"Timed out waiting for service {self} to become "
            f"{self.admin_state.value}:\n"
            + self.get_service_status_message()
        )

    return False
provision() -> None

Provisions resources to run the service.

Raises:

Type Description
NotImplementedError

if the service does not implement provisioning functionality

Source code in src/zenml/services/service.py
402
403
404
405
406
407
408
409
410
def provision(self) -> None:
    """Provisions resources to run the service.

    Raises:
        NotImplementedError: if the service does not implement provisioning functionality
    """
    raise NotImplementedError(
        f"Provisioning resources not implemented for {self}."
    )
start(timeout: int = 0) -> None

Start the service and optionally wait for it to become active.

Parameters:

Name Type Description Default
timeout int

amount of time to wait for the service to become active. If set to 0, the method will return immediately after checking the service status.

0
Source code in src/zenml/services/service.py
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
@update_service_status(
    pre_status=ServiceState.PENDING_STARTUP,
    post_status=ServiceState.ACTIVE,
)
def start(self, timeout: int = 0) -> None:
    """Start the service and optionally wait for it to become active.

    Args:
        timeout: amount of time to wait for the service to become active.
            If set to 0, the method will return immediately after checking
            the service status.
    """
    with console.status(f"Starting service '{self}'.\n"):
        self.admin_state = ServiceState.ACTIVE
        self.provision()
        if timeout > 0 and not self.poll_service_status(timeout):
            logger.error(
                f"Failed to start service {self}\n"
                + self.get_service_status_message()
            )
stop(timeout: int = 0, force: bool = False) -> None

Stop the service and optionally wait for it to shutdown.

Parameters:

Name Type Description Default
timeout int

amount of time to wait for the service to shutdown. If set to 0, the method will return immediately after checking the service status.

0
force bool

if True, the service will be stopped even if it is not currently running.

False
Source code in src/zenml/services/service.py
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
@update_service_status(
    pre_status=ServiceState.PENDING_SHUTDOWN,
    post_status=ServiceState.INACTIVE,
)
def stop(self, timeout: int = 0, force: bool = False) -> None:
    """Stop the service and optionally wait for it to shutdown.

    Args:
        timeout: amount of time to wait for the service to shutdown.
            If set to 0, the method will return immediately after checking
            the service status.
        force: if True, the service will be stopped even if it is not
            currently running.
    """
    with console.status(f"Stopping service '{self}'.\n"):
        self.admin_state = ServiceState.INACTIVE
        self.deprovision(force)
        if timeout > 0:
            self.poll_service_status(timeout)
            if not self.is_stopped:
                logger.error(
                    f"Failed to stop service {self}. Last state: "
                    f"'{self.status.state.value}'. Last error: "
                    f"'{self.status.last_error}'"
                )
update(config: ServiceConfig) -> None

Update the service configuration.

Parameters:

Name Type Description Default
config ServiceConfig

the new service configuration.

required
Source code in src/zenml/services/service.py
427
428
429
430
431
432
433
def update(self, config: ServiceConfig) -> None:
    """Update the service configuration.

    Args:
        config: the new service configuration.
    """
    self.config = config
update_status() -> None

Update the status of the service.

Check the current operational state of the external service and update the local operational status information to reflect it.

This method should be overridden by subclasses that implement concrete service status tracking functionality.

Source code in src/zenml/services/service.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def update_status(self) -> None:
    """Update the status of the service.

    Check the current operational state of the external service
    and update the local operational status information to reflect it.

    This method should be overridden by subclasses that implement
    concrete service status tracking functionality.
    """
    logger.debug(
        "Running status check for service '%s' ...",
        self,
    )
    try:
        state, err = self.check_status()
        logger.debug(
            "Status check results for service '%s': %s [%s]",
            self,
            state.name,
            err,
        )
        self.status.update_state(state, err)

        # don't bother checking the endpoint state if the service is not active
        if self.status.state == ServiceState.INACTIVE:
            return

        if self.endpoint:
            self.endpoint.update_status()
    except Exception as e:
        logger.error(
            f"Failed to update status for service '{self}': {e}",
            exc_info=True,
        )
        self.status.update_state(ServiceState.ERROR, str(e))

BaseServiceEndpoint(*args: Any, **kwargs: Any)

Bases: BaseTypedModel

Base service class.

This class implements generic functionality concerning the life-cycle management and tracking of an external service endpoint (e.g. a HTTP/HTTPS API or generic TCP endpoint exposed by a service).

Attributes:

Name Type Description
admin_state ServiceState

the administrative state of the service endpoint

config ServiceEndpointConfig

service endpoint configuration

status ServiceEndpointStatus

service endpoint status

monitor Optional[BaseServiceEndpointHealthMonitor]

optional service endpoint health monitor

Initialize the service endpoint.

Parameters:

Name Type Description Default
*args Any

positional arguments.

()
**kwargs Any

keyword arguments.

{}
Source code in src/zenml/services/service_endpoint.py
111
112
113
114
115
116
117
118
119
120
121
122
123
def __init__(
    self,
    *args: Any,
    **kwargs: Any,
) -> None:
    """Initialize the service endpoint.

    Args:
        *args: positional arguments.
        **kwargs: keyword arguments.
    """
    super().__init__(*args, **kwargs)
    self.config.name = self.config.name or self.__class__.__name__
Functions
check_status() -> Tuple[ServiceState, str]

Check the the current operational state of the external service endpoint.

Returns:

Type Description
ServiceState

The operational state of the external service endpoint and a

str

message providing additional information about that state

Tuple[ServiceState, str]

(e.g. a description of the error, if one is encountered while

Tuple[ServiceState, str]

checking the service status).

Source code in src/zenml/services/service_endpoint.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def check_status(self) -> Tuple[ServiceState, str]:
    """Check the the current operational state of the external service endpoint.

    Returns:
        The operational state of the external service endpoint and a
        message providing additional information about that state
        (e.g. a description of the error, if one is encountered while
        checking the service status).
    """
    if not self.monitor:
        # no health monitor configured; assume service operational state
        # always matches the admin state
        return self.admin_state, ""
    return self.monitor.check_endpoint_status(self)
is_active() -> bool

Check if the service endpoint is active.

This means that it is responsive and can receive requests). This method will use the configured health monitor to actively check the endpoint status and will return the result.

Returns:

Type Description
bool

True if the service endpoint is active, otherwise False.

Source code in src/zenml/services/service_endpoint.py
158
159
160
161
162
163
164
165
166
167
168
169
def is_active(self) -> bool:
    """Check if the service endpoint is active.

    This means that it is responsive and can receive requests). This method
    will use the configured health monitor to actively check the endpoint
    status and will return the result.

    Returns:
        True if the service endpoint is active, otherwise False.
    """
    self.update_status()
    return self.status.state == ServiceState.ACTIVE
is_inactive() -> bool

Check if the service endpoint is inactive.

This means that it is unresponsive and cannot receive requests. This method will use the configured health monitor to actively check the endpoint status and will return the result.

Returns:

Type Description
bool

True if the service endpoint is inactive, otherwise False.

Source code in src/zenml/services/service_endpoint.py
171
172
173
174
175
176
177
178
179
180
181
182
def is_inactive(self) -> bool:
    """Check if the service endpoint is inactive.

    This means that it is unresponsive and cannot receive requests. This
    method will use the configured health monitor to actively check the
    endpoint status and will return the result.

    Returns:
        True if the service endpoint is inactive, otherwise False.
    """
    self.update_status()
    return self.status.state == ServiceState.INACTIVE
update_status() -> None

Check the the current operational state of the external service endpoint.

It updates the local operational status information accordingly.

Source code in src/zenml/services/service_endpoint.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def update_status(self) -> None:
    """Check the the current operational state of the external service endpoint.

    It updates the local operational status information accordingly.
    """
    logger.debug(
        "Running health check for service endpoint '%s' ...",
        self.config.name,
    )
    state, err = self.check_status()
    logger.debug(
        "Health check results for service endpoint '%s': %s [%s]",
        self.config.name,
        state.name,
        err,
    )
    self.status.update_state(state, err)

BaseServiceEndpointHealthMonitor

Bases: BaseTypedModel

Base class used for service endpoint health monitors.

Attributes:

Name Type Description
config ServiceEndpointHealthMonitorConfig

health monitor configuration for endpoint

Functions
check_endpoint_status(endpoint: BaseServiceEndpoint) -> Tuple[ServiceState, str] abstractmethod

Check the the current operational state of the external service endpoint.

Parameters:

Name Type Description Default
endpoint BaseServiceEndpoint

service endpoint to check

required

This method should be overridden by subclasses that implement concrete service endpoint tracking functionality.

Returns:

Type Description
ServiceState

The operational state of the external service endpoint and an

str

optional error message, if an error is encountered while checking

Tuple[ServiceState, str]

the service endpoint status.

Source code in src/zenml/services/service_monitor.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@abstractmethod
def check_endpoint_status(
    self, endpoint: "BaseServiceEndpoint"
) -> Tuple[ServiceState, str]:
    """Check the the current operational state of the external service endpoint.

    Args:
        endpoint: service endpoint to check

    This method should be overridden by subclasses that implement
    concrete service endpoint tracking functionality.

    Returns:
        The operational state of the external service endpoint and an
        optional error message, if an error is encountered while checking
        the service endpoint status.
    """

ContainerService(**attrs: Any)

Bases: BaseService

A service represented by a containerized process.

This class extends the base service class with functionality concerning the life-cycle management and tracking of external services implemented as docker containers.

To define a containerized service, subclass this class and implement the run method. Upon start, the service will spawn a container that ends up calling the run method.

For example,


from zenml.services import ServiceType, ContainerService, ContainerServiceConfig
import time

class SleepingServiceConfig(ContainerServiceConfig):

    wake_up_after: int

class SleepingService(ContainerService):

    SERVICE_TYPE = ServiceType(
        name="sleeper",
        description="Sleeping container",
        type="container",
        flavor="sleeping",
    )
    config: SleepingServiceConfig

    def run(self) -> None:
        time.sleep(self.config.wake_up_after)

service = SleepingService(config=SleepingServiceConfig(wake_up_after=10))
service.start()

NOTE: the SleepingService class and its parent module have to be discoverable as part of a ZenML Integration, otherwise the daemon will fail with the following error:

TypeError: Cannot load service with unregistered service type:
name='sleeper' type='container' flavor='sleeping' description='Sleeping container'

Attributes:

Name Type Description
config ContainerServiceConfig

service configuration

status ContainerServiceStatus

service status

endpoint Optional[ContainerServiceEndpoint]

optional service endpoint

Source code in src/zenml/services/service.py
188
189
190
191
192
193
194
195
196
197
198
def __init__(
    self,
    **attrs: Any,
) -> None:
    """Initialize the service instance.

    Args:
        **attrs: keyword arguments.
    """
    super().__init__(**attrs)
    self.config.name = self.config.name or self.__class__.__name__
Attributes
container: Optional[Container] property

Get the docker container for the service.

Returns:

Type Description
Optional[Container]

The docker container for the service, or None if the container

Optional[Container]

does not exist.

container_id: str property

Get the ID of the docker container for a service.

Returns:

Type Description
str

The ID of the docker container for the service.

docker_client: DockerClient property

Initialize and/or return the docker client.

Returns:

Type Description
DockerClient

The docker client.

Functions
check_status() -> Tuple[ServiceState, str]

Check the the current operational state of the docker container.

Returns:

Type Description
ServiceState

The operational state of the docker container and a message

str

providing additional information about that state (e.g. a

Tuple[ServiceState, str]

description of the error, if one is encountered).

Source code in src/zenml/services/container/container_service.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def check_status(self) -> Tuple[ServiceState, str]:
    """Check the the current operational state of the docker container.

    Returns:
        The operational state of the docker container and a message
        providing additional information about that state (e.g. a
        description of the error, if one is encountered).
    """
    container: Optional[Container] = None
    try:
        container = self.docker_client.containers.get(self.container_id)
    except docker_errors.NotFound:
        # container doesn't exist yet or was removed
        pass

    if container is None:
        return ServiceState.INACTIVE, "Docker container is not present"
    elif container.status == "running":
        return ServiceState.ACTIVE, "Docker container is running"
    elif container.status == "exited":
        return (
            ServiceState.ERROR,
            "Docker container has exited.",
        )
    else:
        return (
            ServiceState.INACTIVE,
            f"Docker container is {container.status}",
        )
deprovision(force: bool = False) -> None

Deprovision the service.

Parameters:

Name Type Description Default
force bool

if True, the service container will be forcefully stopped

False
Source code in src/zenml/services/container/container_service.py
483
484
485
486
487
488
489
def deprovision(self, force: bool = False) -> None:
    """Deprovision the service.

    Args:
        force: if True, the service container will be forcefully stopped
    """
    self._stop_daemon(force)
get_logs(follow: bool = False, tail: Optional[int] = None) -> Generator[str, bool, None]

Retrieve the service logs.

Parameters:

Name Type Description Default
follow bool

if True, the logs will be streamed as they are written

False
tail Optional[int]

only retrieve the last NUM lines of log output.

None

Yields:

Type Description
str

A generator that can be accessed to get the service logs.

Source code in src/zenml/services/container/container_service.py
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
def get_logs(
    self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
    """Retrieve the service logs.

    Args:
        follow: if True, the logs will be streamed as they are written
        tail: only retrieve the last NUM lines of log output.

    Yields:
        A generator that can be accessed to get the service logs.
    """
    if not self.status.log_file or not os.path.exists(
        self.status.log_file
    ):
        return

    with open(self.status.log_file, "r") as f:
        if tail:
            # TODO[ENG-864]: implement a more efficient tailing mechanism that
            #   doesn't read the entire file
            lines = f.readlines()[-tail:]
            for line in lines:
                yield line.rstrip("\n")
            if not follow:
                return
        line = ""
        while True:
            partial_line = f.readline()
            if partial_line:
                line += partial_line
                if line.endswith("\n"):
                    stop = yield line.rstrip("\n")
                    if stop:
                        break
                    line = ""
            elif follow:
                time.sleep(1)
            else:
                break
get_service_status_message() -> str

Get a message about the current operational state of the service.

Returns:

Type Description
str

A message providing information about the current operational

str

state of the service.

Source code in src/zenml/services/container/container_service.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def get_service_status_message(self) -> str:
    """Get a message about the current operational state of the service.

    Returns:
        A message providing information about the current operational
        state of the service.
    """
    msg = super().get_service_status_message()
    msg += f"  Container ID: `{self.container_id}`\n"
    if self.status.log_file:
        msg += (
            f"For more information on the service status, please see the "
            f"following log file: {self.status.log_file}\n"
        )
    return msg
provision() -> None

Provision the service.

Source code in src/zenml/services/container/container_service.py
479
480
481
def provision(self) -> None:
    """Provision the service."""
    self._start_container()
run() -> None abstractmethod

Run the containerized service logic associated with this service.

Subclasses must implement this method to provide the containerized service functionality. This method will be executed in the context of the running container, not in the context of the process that calls the start method.

Source code in src/zenml/services/container/container_service.py
532
533
534
535
536
537
538
539
540
@abstractmethod
def run(self) -> None:
    """Run the containerized service logic associated with this service.

    Subclasses must implement this method to provide the containerized
    service functionality. This method will be executed in the context of
    the running container, not in the context of the process that calls the
    `start` method.
    """

ContainerServiceConfig(**data: Any)

Bases: ServiceConfig

containerized service configuration.

Attributes:

Name Type Description
root_runtime_path Optional[str]

the root path where the service stores its files.

singleton bool

set to True to store the service files directly in the root_runtime_path directory instead of creating a subdirectory for each service instance. Only has effect if the root_runtime_path is also set.

image str

the container image to use for the service.

Source code in src/zenml/services/service.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def __init__(self, **data: Any):
    """Initialize the service configuration.

    Args:
        **data: keyword arguments.

    Raises:
        ValueError: if neither 'name' nor 'model_name' is set.
    """
    super().__init__(**data)
    if self.name or self.model_name:
        self.service_name = data.get(
            "service_name",
            f"{ZENM_ENDPOINT_PREFIX}{self.name or self.model_name}",
        )
    else:
        raise ValueError("Either 'name' or 'model_name' must be set.")

ContainerServiceEndpoint(*args: Any, **kwargs: Any)

Bases: BaseServiceEndpoint

A service endpoint exposed by a containerized process.

This class extends the base service endpoint class with functionality concerning the life-cycle management and tracking of endpoints exposed by external services implemented as containerized processes.

Attributes:

Name Type Description
config ContainerServiceEndpointConfig

service endpoint configuration

status ContainerServiceEndpointStatus

service endpoint status

monitor Optional[Union[HTTPEndpointHealthMonitor, TCPEndpointHealthMonitor]]

optional service endpoint health monitor

Source code in src/zenml/services/service_endpoint.py
111
112
113
114
115
116
117
118
119
120
121
122
123
def __init__(
    self,
    *args: Any,
    **kwargs: Any,
) -> None:
    """Initialize the service endpoint.

    Args:
        *args: positional arguments.
        **kwargs: keyword arguments.
    """
    super().__init__(*args, **kwargs)
    self.config.name = self.config.name or self.__class__.__name__
Functions
prepare_for_start() -> None

Prepare the service endpoint for starting.

This method is called before the service is started.

Source code in src/zenml/services/container/container_service_endpoint.py
121
122
123
124
125
126
127
128
129
def prepare_for_start(self) -> None:
    """Prepare the service endpoint for starting.

    This method is called before the service is started.
    """
    self.status.protocol = self.config.protocol
    self.status.port = self._lookup_free_port()
    # Container endpoints are always exposed on the local host
    self.status.hostname = DEFAULT_LOCAL_SERVICE_IP_ADDRESS

ContainerServiceEndpointConfig

Bases: ServiceEndpointConfig

Local daemon service endpoint configuration.

Attributes:

Name Type Description
protocol ServiceEndpointProtocol

the TCP protocol implemented by the service endpoint

port Optional[int]

preferred TCP port value for the service endpoint. If the port is in use when the service is started, setting allocate_port to True will also try to allocate a new port value, otherwise an exception will be raised.

allocate_port bool

set to True to allocate a free TCP port for the service endpoint automatically.

ContainerServiceEndpointStatus

Bases: ServiceEndpointStatus

Local daemon service endpoint status.

ContainerServiceStatus

Bases: ServiceStatus

containerized service status.

Attributes:

Name Type Description
runtime_path Optional[str]

the path where the service files (e.g. the configuration file used to start the service daemon and the logfile) are located

Attributes
config_file: Optional[str] property

Get the path to the service configuration file.

Returns:

Type Description
Optional[str]

The path to the configuration file, or None, if the

Optional[str]

service has never been started before.

log_file: Optional[str] property

Get the path to the log file where the service output is/has been logged.

Returns:

Type Description
Optional[str]

The path to the log file, or None, if the service has never been

Optional[str]

started before.

HTTPEndpointHealthMonitor

Bases: BaseServiceEndpointHealthMonitor

HTTP service endpoint health monitor.

Attributes:

Name Type Description
config HTTPEndpointHealthMonitorConfig

health monitor configuration for HTTP endpoint

Functions
check_endpoint_status(endpoint: BaseServiceEndpoint) -> Tuple[ServiceState, str]

Run a HTTP endpoint API healthcheck.

Parameters:

Name Type Description Default
endpoint BaseServiceEndpoint

service endpoint to check.

required

Returns:

Type Description
ServiceState

The operational state of the external HTTP endpoint and an

str

optional message describing that state (e.g. an error message,

Tuple[ServiceState, str]

if an error is encountered while checking the HTTP endpoint

Tuple[ServiceState, str]

status).

Source code in src/zenml/services/service_monitor.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def check_endpoint_status(
    self, endpoint: "BaseServiceEndpoint"
) -> Tuple[ServiceState, str]:
    """Run a HTTP endpoint API healthcheck.

    Args:
        endpoint: service endpoint to check.

    Returns:
        The operational state of the external HTTP endpoint and an
        optional message describing that state (e.g. an error message,
        if an error is encountered while checking the HTTP endpoint
        status).
    """
    from zenml.services.service_endpoint import ServiceEndpointProtocol

    if endpoint.status.protocol not in [
        ServiceEndpointProtocol.HTTP,
        ServiceEndpointProtocol.HTTPS,
    ]:
        return (
            ServiceState.ERROR,
            "endpoint protocol is not HTTP nor HTTPS.",
        )

    check_uri = self.get_healthcheck_uri(endpoint)
    if not check_uri:
        return ServiceState.ERROR, "no HTTP healthcheck URI available"

    logger.debug("Running HTTP healthcheck for URI: %s", check_uri)

    try:
        if self.config.use_head_request:
            r = requests.head(
                check_uri,
                timeout=self.config.http_timeout,
            )
        else:
            r = requests.get(
                check_uri,
                timeout=self.config.http_timeout,
            )
        if r.status_code == self.config.http_status_code:
            # the endpoint is healthy
            return ServiceState.ACTIVE, ""
        error = f"HTTP endpoint healthcheck returned unexpected status code: {r.status_code}"
    except requests.ConnectionError as e:
        error = f"HTTP endpoint healthcheck connection error: {str(e)}"
    except requests.Timeout as e:
        error = f"HTTP endpoint healthcheck request timed out: {str(e)}"
    except requests.RequestException as e:
        error = (
            f"unexpected error encountered while running HTTP endpoint "
            f"healthcheck: {str(e)}"
        )

    return ServiceState.ERROR, error
get_healthcheck_uri(endpoint: BaseServiceEndpoint) -> Optional[str]

Get the healthcheck URI for the given service endpoint.

Parameters:

Name Type Description Default
endpoint BaseServiceEndpoint

service endpoint to get the healthcheck URI for

required

Returns:

Type Description
Optional[str]

The healthcheck URI for the given service endpoint or None, if

Optional[str]

the service endpoint doesn't have a healthcheck URI.

Source code in src/zenml/services/service_monitor.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def get_healthcheck_uri(
    self, endpoint: "BaseServiceEndpoint"
) -> Optional[str]:
    """Get the healthcheck URI for the given service endpoint.

    Args:
        endpoint: service endpoint to get the healthcheck URI for

    Returns:
        The healthcheck URI for the given service endpoint or None, if
        the service endpoint doesn't have a healthcheck URI.
    """
    uri = endpoint.status.uri
    if not uri:
        return None
    if not self.config.healthcheck_uri_path:
        return uri
    return (
        f"{uri.rstrip('/')}/{self.config.healthcheck_uri_path.lstrip('/')}"
    )

HTTPEndpointHealthMonitorConfig

Bases: ServiceEndpointHealthMonitorConfig

HTTP service endpoint health monitor configuration.

Attributes:

Name Type Description
healthcheck_uri_path str

URI subpath to use to perform service endpoint healthchecks. If not set, the service endpoint URI will be used instead.

use_head_request bool

set to True to use a HEAD request instead of a GET when calling the healthcheck URI.

http_status_code int

HTTP status code to expect in the health check response.

http_timeout int

HTTP health check request timeout in seconds.

LocalDaemonService(**attrs: Any)

Bases: BaseService

A service represented by a local daemon process.

This class extends the base service class with functionality concerning the life-cycle management and tracking of external services implemented as local daemon processes.

To define a local daemon service, subclass this class and implement the run method. Upon start, the service will spawn a daemon process that ends up calling the run method.

For example,


from zenml.services import ServiceType, LocalDaemonService, LocalDaemonServiceConfig
import time

class SleepingDaemonConfig(LocalDaemonServiceConfig):

    wake_up_after: int

class SleepingDaemon(LocalDaemonService):

    SERVICE_TYPE = ServiceType(
        name="sleeper",
        description="Sleeping daemon",
        type="daemon",
        flavor="sleeping",
    )
    config: SleepingDaemonConfig

    def run(self) -> None:
        time.sleep(self.config.wake_up_after)

daemon = SleepingDaemon(config=SleepingDaemonConfig(wake_up_after=10))
daemon.start()

NOTE: the SleepingDaemon class and its parent module have to be discoverable as part of a ZenML Integration, otherwise the daemon will fail with the following error:

TypeError: Cannot load service with unregistered service type:
name='sleeper' type='daemon' flavor='sleeping' description='Sleeping daemon'

Attributes:

Name Type Description
config LocalDaemonServiceConfig

service configuration

status LocalDaemonServiceStatus

service status

endpoint Optional[LocalDaemonServiceEndpoint]

optional service endpoint

Source code in src/zenml/services/service.py
188
189
190
191
192
193
194
195
196
197
198
def __init__(
    self,
    **attrs: Any,
) -> None:
    """Initialize the service instance.

    Args:
        **attrs: keyword arguments.
    """
    super().__init__(**attrs)
    self.config.name = self.config.name or self.__class__.__name__
Functions
check_status() -> Tuple[ServiceState, str]

Check the the current operational state of the daemon process.

Returns:

Type Description
ServiceState

The operational state of the daemon process and a message

str

providing additional information about that state (e.g. a

Tuple[ServiceState, str]

description of the error, if one is encountered).

Source code in src/zenml/services/local/local_service.py
273
274
275
276
277
278
279
280
281
282
283
284
285
def check_status(self) -> Tuple[ServiceState, str]:
    """Check the the current operational state of the daemon process.

    Returns:
        The operational state of the daemon process and a message
        providing additional information about that state (e.g. a
        description of the error, if one is encountered).
    """
    if not self.status.pid:
        return ServiceState.INACTIVE, "service daemon is not running"

    # the daemon is running
    return ServiceState.ACTIVE, ""
deprovision(force: bool = False) -> None

Deprovision the service.

Parameters:

Name Type Description Default
force bool

if True, the service daemon will be forcefully stopped

False
Source code in src/zenml/services/local/local_service.py
429
430
431
432
433
434
435
def deprovision(self, force: bool = False) -> None:
    """Deprovision the service.

    Args:
        force: if True, the service daemon will be forcefully stopped
    """
    self._stop_daemon(force)
get_logs(follow: bool = False, tail: Optional[int] = None) -> Generator[str, bool, None]

Retrieve the service logs.

Parameters:

Name Type Description Default
follow bool

if True, the logs will be streamed as they are written

False
tail Optional[int]

only retrieve the last NUM lines of log output.

None

Yields:

Type Description
str

A generator that can be accessed to get the service logs.

Source code in src/zenml/services/local/local_service.py
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
def get_logs(
    self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
    """Retrieve the service logs.

    Args:
        follow: if True, the logs will be streamed as they are written
        tail: only retrieve the last NUM lines of log output.

    Yields:
        A generator that can be accessed to get the service logs.
    """
    if not self.status.log_file or not os.path.exists(
        self.status.log_file
    ):
        return

    with open(self.status.log_file, "r") as f:
        if tail:
            # TODO[ENG-864]: implement a more efficient tailing mechanism that
            #   doesn't read the entire file
            lines = f.readlines()[-tail:]
            for line in lines:
                yield line.rstrip("\n")
            if not follow:
                return
        line = ""
        while True:
            partial_line = f.readline()
            if partial_line:
                line += partial_line
                if line.endswith("\n"):
                    stop = yield line.rstrip("\n")
                    if stop:
                        break
                    line = ""
            elif follow:
                time.sleep(1)
            else:
                break
get_service_status_message() -> str

Get a message about the current operational state of the service.

Returns:

Type Description
str

A message providing information about the current operational

str

state of the service.

Source code in src/zenml/services/local/local_service.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def get_service_status_message(self) -> str:
    """Get a message about the current operational state of the service.

    Returns:
        A message providing information about the current operational
        state of the service.
    """
    msg = super().get_service_status_message()
    pid = self.status.pid
    if pid:
        msg += f"  Daemon PID: `{self.status.pid}`\n"
    if self.status.log_file:
        msg += (
            f"For more information on the service status, please see the "
            f"following log file: {self.status.log_file}\n"
        )
    return msg
provision() -> None

Provision the service.

Source code in src/zenml/services/local/local_service.py
425
426
427
def provision(self) -> None:
    """Provision the service."""
    self._start_daemon()
run() -> None abstractmethod

Run the service daemon process associated with this service.

Subclasses must implement this method to provide the service daemon functionality. This method will be executed in the context of the running daemon, not in the context of the process that calls the start method.

Source code in src/zenml/services/local/local_service.py
491
492
493
494
495
496
497
498
499
@abstractmethod
def run(self) -> None:
    """Run the service daemon process associated with this service.

    Subclasses must implement this method to provide the service daemon
    functionality. This method will be executed in the context of the
    running daemon, not in the context of the process that calls the
    `start` method.
    """
start(timeout: int = 0) -> None

Start the service and optionally wait for it to become active.

Parameters:

Name Type Description Default
timeout int

amount of time to wait for the service to become active. If set to 0, the method will return immediately after checking the service status.

0
Source code in src/zenml/services/local/local_service.py
437
438
439
440
441
442
443
444
445
446
447
448
def start(self, timeout: int = 0) -> None:
    """Start the service and optionally wait for it to become active.

    Args:
        timeout: amount of time to wait for the service to become active.
            If set to 0, the method will return immediately after checking
            the service status.
    """
    if not self.config.blocking:
        super().start(timeout)
    else:
        self.run()

LocalDaemonServiceConfig(**data: Any)

Bases: ServiceConfig

Local daemon service configuration.

Attributes:

Name Type Description
silent_daemon bool

set to True to suppress the output of the daemon (i.e. redirect stdout and stderr to /dev/null). If False, the daemon output will be redirected to a logfile.

root_runtime_path Optional[str]

the root path where the service daemon will store service configuration files

singleton bool

set to True to store the service daemon configuration files directly in the root_runtime_path directory instead of creating a subdirectory for each service instance. Only has effect if the root_runtime_path is also set.

blocking bool

set to True to run the service the context of the current process and block until the service is stopped instead of running the service as a daemon process. Useful for operating systems that do not support daemon processes.

Source code in src/zenml/services/service.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def __init__(self, **data: Any):
    """Initialize the service configuration.

    Args:
        **data: keyword arguments.

    Raises:
        ValueError: if neither 'name' nor 'model_name' is set.
    """
    super().__init__(**data)
    if self.name or self.model_name:
        self.service_name = data.get(
            "service_name",
            f"{ZENM_ENDPOINT_PREFIX}{self.name or self.model_name}",
        )
    else:
        raise ValueError("Either 'name' or 'model_name' must be set.")

LocalDaemonServiceEndpoint(*args: Any, **kwargs: Any)

Bases: BaseServiceEndpoint

A service endpoint exposed by a local daemon process.

This class extends the base service endpoint class with functionality concerning the life-cycle management and tracking of endpoints exposed by external services implemented as local daemon processes.

Attributes:

Name Type Description
config LocalDaemonServiceEndpointConfig

service endpoint configuration

status LocalDaemonServiceEndpointStatus

service endpoint status

monitor Optional[Union[HTTPEndpointHealthMonitor, TCPEndpointHealthMonitor]]

optional service endpoint health monitor

Source code in src/zenml/services/service_endpoint.py
111
112
113
114
115
116
117
118
119
120
121
122
123
def __init__(
    self,
    *args: Any,
    **kwargs: Any,
) -> None:
    """Initialize the service endpoint.

    Args:
        *args: positional arguments.
        **kwargs: keyword arguments.
    """
    super().__init__(*args, **kwargs)
    self.config.name = self.config.name or self.__class__.__name__
Functions
prepare_for_start() -> None

Prepare the service endpoint for starting.

This method is called before the service is started.

Source code in src/zenml/services/local/local_service_endpoint.py
124
125
126
127
128
129
130
131
def prepare_for_start(self) -> None:
    """Prepare the service endpoint for starting.

    This method is called before the service is started.
    """
    self.status.protocol = self.config.protocol
    self.status.hostname = self.config.ip_address
    self.status.port = self._lookup_free_port()

LocalDaemonServiceEndpointConfig

Bases: ServiceEndpointConfig

Local daemon service endpoint configuration.

Attributes:

Name Type Description
protocol ServiceEndpointProtocol

the TCP protocol implemented by the service endpoint

port Optional[int]

preferred TCP port value for the service endpoint. If the port is in use when the service is started, setting allocate_port to True will also try to allocate a new port value, otherwise an exception will be raised.

ip_address str

the IP address of the service endpoint. If not set, the default localhost IP address will be used.

allocate_port bool

set to True to allocate a free TCP port for the service endpoint automatically.

LocalDaemonServiceEndpointStatus

Bases: ServiceEndpointStatus

Local daemon service endpoint status.

LocalDaemonServiceStatus

Bases: ServiceStatus

Local daemon service status.

Attributes:

Name Type Description
runtime_path Optional[str]

the path where the service daemon runtime files (the configuration file used to start the service daemon and the logfile) are located

silent_daemon bool

flag indicating whether the output of the daemon is suppressed (redirected to /dev/null).

Attributes
config_file: Optional[str] property

Get the path to the configuration file used to start the service daemon.

Returns:

Type Description
Optional[str]

The path to the configuration file, or None, if the

Optional[str]

service has never been started before.

log_file: Optional[str] property

Get the path to the log file where the service output is/has been logged.

Returns:

Type Description
Optional[str]

The path to the log file, or None, if the service has never been

Optional[str]

started before, or if the service daemon output is suppressed.

pid: Optional[int] property

Return the PID of the currently running daemon.

Returns:

Type Description
Optional[int]

The PID of the daemon, or None, if the service has never been

Optional[int]

started before.

pid_file: Optional[str] property

Get the path to a daemon PID file.

This is where the last known PID of the daemon process is stored.

Returns:

Type Description
Optional[str]

The path to the PID file, or None, if the service has never been

Optional[str]

started before.

ServiceConfig(**data: Any)

Bases: BaseTypedModel

Generic service configuration.

Concrete service classes should extend this class and add additional attributes that they want to see reflected and used in the service configuration.

Attributes:

Name Type Description
name str

name for the service instance

description str

description of the service

pipeline_name str

name of the pipeline that spun up the service

pipeline_step_name str

name of the pipeline step that spun up the service

run_name str

name of the pipeline run that spun up the service.

Initialize the service configuration.

Parameters:

Name Type Description Default
**data Any

keyword arguments.

{}

Raises:

Type Description
ValueError

if neither 'name' nor 'model_name' is set.

Source code in src/zenml/services/service.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def __init__(self, **data: Any):
    """Initialize the service configuration.

    Args:
        **data: keyword arguments.

    Raises:
        ValueError: if neither 'name' nor 'model_name' is set.
    """
    super().__init__(**data)
    if self.name or self.model_name:
        self.service_name = data.get(
            "service_name",
            f"{ZENM_ENDPOINT_PREFIX}{self.name or self.model_name}",
        )
    else:
        raise ValueError("Either 'name' or 'model_name' must be set.")
Functions
get_service_labels() -> Dict[str, str]

Get the service labels.

Returns:

Type Description
Dict[str, str]

a dictionary of service labels.

Source code in src/zenml/services/service.py
149
150
151
152
153
154
155
156
157
158
159
def get_service_labels(self) -> Dict[str, str]:
    """Get the service labels.

    Returns:
        a dictionary of service labels.
    """
    labels = {}
    for k, v in self.model_dump().items():
        label = f"zenml_{k}".upper()
        labels[label] = str(v)
    return labels

ServiceEndpointConfig

Bases: BaseTypedModel

Generic service endpoint configuration.

Concrete service classes should extend this class and add additional attributes that they want to see reflected and use in the endpoint configuration.

Attributes:

Name Type Description
name str

unique name for the service endpoint

description str

description of the service endpoint

ServiceEndpointHealthMonitorConfig

Bases: BaseTypedModel

Generic service health monitor configuration.

Concrete service classes should extend this class and add additional attributes that they want to see reflected and use in the health monitor configuration.

ServiceEndpointProtocol

Bases: StrEnum

Possible endpoint protocol values.

ServiceEndpointStatus

Bases: ServiceStatus

Status information describing the operational state of a service endpoint.

For example, this could be a HTTP/HTTPS API or generic TCP endpoint exposed by a service. Concrete service classes should extend this class and add additional attributes that make up the operational state of the service endpoint.

Attributes:

Name Type Description
protocol ServiceEndpointProtocol

the TCP protocol used by the service endpoint

hostname Optional[str]

the hostname where the service endpoint is accessible

port Optional[int]

the current TCP port where the service endpoint is accessible

Attributes
uri: Optional[str] property

Get the URI of the service endpoint.

Returns:

Type Description
Optional[str]

The URI of the service endpoint or None, if the service endpoint

Optional[str]

operational status doesn't have the required information.

ServiceState

Bases: StrEnum

Possible states for the service and service endpoint.

ServiceStatus

Bases: BaseTypedModel

Information about the status of a service or process.

This information describes the operational status of an external process or service tracked by ZenML. This could be a process, container, Kubernetes deployment etc.

Concrete service classes should extend this class and add additional attributes that make up the operational state of the service.

Attributes:

Name Type Description
state ServiceState

the current operational state

last_state ServiceState

the operational state prior to the last status update

last_error str

the error encountered during the last status update

Functions
clear_error() -> None

Clear the last error message.

Source code in src/zenml/services/service_status.py
64
65
66
def clear_error(self) -> None:
    """Clear the last error message."""
    self.last_error = ""
update_state(new_state: Optional[ServiceState] = None, error: str = '') -> None

Update the current operational state to reflect a new state value and/or error.

Parameters:

Name Type Description Default
new_state Optional[ServiceState]

new operational state discovered by the last service status update

None
error str

error message describing an operational failure encountered during the last service status update

''
Source code in src/zenml/services/service_status.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def update_state(
    self,
    new_state: Optional[ServiceState] = None,
    error: str = "",
) -> None:
    """Update the current operational state to reflect a new state value and/or error.

    Args:
        new_state: new operational state discovered by the last service
            status update
        error: error message describing an operational failure encountered
            during the last service status update
    """
    if new_state and self.state != new_state:
        self.last_state = self.state
        self.state = new_state
    if error:
        self.last_error = error

ServiceType

Bases: BaseModel

Service type descriptor.

Attributes:

Name Type Description
type str

service type

flavor str

service flavor

name str

name of the service type

description str

description of the service type

logo_url str

logo of the service type

TCPEndpointHealthMonitor

Bases: BaseServiceEndpointHealthMonitor

TCP service endpoint health monitor.

Attributes:

Name Type Description
config TCPEndpointHealthMonitorConfig

health monitor configuration for TCP endpoint

Functions
check_endpoint_status(endpoint: BaseServiceEndpoint) -> Tuple[ServiceState, str]

Run a TCP endpoint healthcheck.

Parameters:

Name Type Description Default
endpoint BaseServiceEndpoint

service endpoint to check.

required

Returns:

Type Description
ServiceState

The operational state of the external TCP endpoint and an

str

optional message describing that state (e.g. an error message,

Tuple[ServiceState, str]

if an error is encountered while checking the TCP endpoint

Tuple[ServiceState, str]

status).

Source code in src/zenml/services/service_monitor.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def check_endpoint_status(
    self, endpoint: "BaseServiceEndpoint"
) -> Tuple[ServiceState, str]:
    """Run a TCP endpoint healthcheck.

    Args:
        endpoint: service endpoint to check.

    Returns:
        The operational state of the external TCP endpoint and an
        optional message describing that state (e.g. an error message,
        if an error is encountered while checking the TCP endpoint
        status).
    """
    if not endpoint.status.port or not endpoint.status.hostname:
        return (
            ServiceState.ERROR,
            "TCP port and hostname values are not known",
        )

    logger.debug(
        "Running TCP healthcheck for TCP port: %d", endpoint.status.port
    )

    if port_is_open(endpoint.status.hostname, endpoint.status.port):
        # the endpoint is healthy
        return ServiceState.ACTIVE, ""

    return (
        ServiceState.ERROR,
        "TCP endpoint healthcheck error: TCP port is not "
        "open or not accessible",
    )

TCPEndpointHealthMonitorConfig

Bases: ServiceEndpointHealthMonitorConfig

TCP service endpoint health monitor configuration.

Modules

container

Initialization of a containerized ZenML service.

Modules
container_service

Implementation of a containerized ZenML service.

Classes
ContainerService(**attrs: Any)

Bases: BaseService

A service represented by a containerized process.

This class extends the base service class with functionality concerning the life-cycle management and tracking of external services implemented as docker containers.

To define a containerized service, subclass this class and implement the run method. Upon start, the service will spawn a container that ends up calling the run method.

For example,


from zenml.services import ServiceType, ContainerService, ContainerServiceConfig
import time

class SleepingServiceConfig(ContainerServiceConfig):

    wake_up_after: int

class SleepingService(ContainerService):

    SERVICE_TYPE = ServiceType(
        name="sleeper",
        description="Sleeping container",
        type="container",
        flavor="sleeping",
    )
    config: SleepingServiceConfig

    def run(self) -> None:
        time.sleep(self.config.wake_up_after)

service = SleepingService(config=SleepingServiceConfig(wake_up_after=10))
service.start()

NOTE: the SleepingService class and its parent module have to be discoverable as part of a ZenML Integration, otherwise the daemon will fail with the following error:

TypeError: Cannot load service with unregistered service type:
name='sleeper' type='container' flavor='sleeping' description='Sleeping container'

Attributes:

Name Type Description
config ContainerServiceConfig

service configuration

status ContainerServiceStatus

service status

endpoint Optional[ContainerServiceEndpoint]

optional service endpoint

Source code in src/zenml/services/service.py
188
189
190
191
192
193
194
195
196
197
198
def __init__(
    self,
    **attrs: Any,
) -> None:
    """Initialize the service instance.

    Args:
        **attrs: keyword arguments.
    """
    super().__init__(**attrs)
    self.config.name = self.config.name or self.__class__.__name__
Attributes
container: Optional[Container] property

Get the docker container for the service.

Returns:

Type Description
Optional[Container]

The docker container for the service, or None if the container

Optional[Container]

does not exist.

container_id: str property

Get the ID of the docker container for a service.

Returns:

Type Description
str

The ID of the docker container for the service.

docker_client: DockerClient property

Initialize and/or return the docker client.

Returns:

Type Description
DockerClient

The docker client.

Functions
check_status() -> Tuple[ServiceState, str]

Check the the current operational state of the docker container.

Returns:

Type Description
ServiceState

The operational state of the docker container and a message

str

providing additional information about that state (e.g. a

Tuple[ServiceState, str]

description of the error, if one is encountered).

Source code in src/zenml/services/container/container_service.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def check_status(self) -> Tuple[ServiceState, str]:
    """Check the the current operational state of the docker container.

    Returns:
        The operational state of the docker container and a message
        providing additional information about that state (e.g. a
        description of the error, if one is encountered).
    """
    container: Optional[Container] = None
    try:
        container = self.docker_client.containers.get(self.container_id)
    except docker_errors.NotFound:
        # container doesn't exist yet or was removed
        pass

    if container is None:
        return ServiceState.INACTIVE, "Docker container is not present"
    elif container.status == "running":
        return ServiceState.ACTIVE, "Docker container is running"
    elif container.status == "exited":
        return (
            ServiceState.ERROR,
            "Docker container has exited.",
        )
    else:
        return (
            ServiceState.INACTIVE,
            f"Docker container is {container.status}",
        )
deprovision(force: bool = False) -> None

Deprovision the service.

Parameters:

Name Type Description Default
force bool

if True, the service container will be forcefully stopped

False
Source code in src/zenml/services/container/container_service.py
483
484
485
486
487
488
489
def deprovision(self, force: bool = False) -> None:
    """Deprovision the service.

    Args:
        force: if True, the service container will be forcefully stopped
    """
    self._stop_daemon(force)
get_logs(follow: bool = False, tail: Optional[int] = None) -> Generator[str, bool, None]

Retrieve the service logs.

Parameters:

Name Type Description Default
follow bool

if True, the logs will be streamed as they are written

False
tail Optional[int]

only retrieve the last NUM lines of log output.

None

Yields:

Type Description
str

A generator that can be accessed to get the service logs.

Source code in src/zenml/services/container/container_service.py
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
def get_logs(
    self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
    """Retrieve the service logs.

    Args:
        follow: if True, the logs will be streamed as they are written
        tail: only retrieve the last NUM lines of log output.

    Yields:
        A generator that can be accessed to get the service logs.
    """
    if not self.status.log_file or not os.path.exists(
        self.status.log_file
    ):
        return

    with open(self.status.log_file, "r") as f:
        if tail:
            # TODO[ENG-864]: implement a more efficient tailing mechanism that
            #   doesn't read the entire file
            lines = f.readlines()[-tail:]
            for line in lines:
                yield line.rstrip("\n")
            if not follow:
                return
        line = ""
        while True:
            partial_line = f.readline()
            if partial_line:
                line += partial_line
                if line.endswith("\n"):
                    stop = yield line.rstrip("\n")
                    if stop:
                        break
                    line = ""
            elif follow:
                time.sleep(1)
            else:
                break
get_service_status_message() -> str

Get a message about the current operational state of the service.

Returns:

Type Description
str

A message providing information about the current operational

str

state of the service.

Source code in src/zenml/services/container/container_service.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def get_service_status_message(self) -> str:
    """Get a message about the current operational state of the service.

    Returns:
        A message providing information about the current operational
        state of the service.
    """
    msg = super().get_service_status_message()
    msg += f"  Container ID: `{self.container_id}`\n"
    if self.status.log_file:
        msg += (
            f"For more information on the service status, please see the "
            f"following log file: {self.status.log_file}\n"
        )
    return msg
provision() -> None

Provision the service.

Source code in src/zenml/services/container/container_service.py
479
480
481
def provision(self) -> None:
    """Provision the service."""
    self._start_container()
run() -> None abstractmethod

Run the containerized service logic associated with this service.

Subclasses must implement this method to provide the containerized service functionality. This method will be executed in the context of the running container, not in the context of the process that calls the start method.

Source code in src/zenml/services/container/container_service.py
532
533
534
535
536
537
538
539
540
@abstractmethod
def run(self) -> None:
    """Run the containerized service logic associated with this service.

    Subclasses must implement this method to provide the containerized
    service functionality. This method will be executed in the context of
    the running container, not in the context of the process that calls the
    `start` method.
    """
ContainerServiceConfig(**data: Any)

Bases: ServiceConfig

containerized service configuration.

Attributes:

Name Type Description
root_runtime_path Optional[str]

the root path where the service stores its files.

singleton bool

set to True to store the service files directly in the root_runtime_path directory instead of creating a subdirectory for each service instance. Only has effect if the root_runtime_path is also set.

image str

the container image to use for the service.

Source code in src/zenml/services/service.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def __init__(self, **data: Any):
    """Initialize the service configuration.

    Args:
        **data: keyword arguments.

    Raises:
        ValueError: if neither 'name' nor 'model_name' is set.
    """
    super().__init__(**data)
    if self.name or self.model_name:
        self.service_name = data.get(
            "service_name",
            f"{ZENM_ENDPOINT_PREFIX}{self.name or self.model_name}",
        )
    else:
        raise ValueError("Either 'name' or 'model_name' must be set.")
ContainerServiceStatus

Bases: ServiceStatus

containerized service status.

Attributes:

Name Type Description
runtime_path Optional[str]

the path where the service files (e.g. the configuration file used to start the service daemon and the logfile) are located

Attributes
config_file: Optional[str] property

Get the path to the service configuration file.

Returns:

Type Description
Optional[str]

The path to the configuration file, or None, if the

Optional[str]

service has never been started before.

log_file: Optional[str] property

Get the path to the log file where the service output is/has been logged.

Returns:

Type Description
Optional[str]

The path to the log file, or None, if the service has never been

Optional[str]

started before.

Functions Modules
container_service_endpoint

Implementation of a containerized service endpoint.

Classes
ContainerServiceEndpoint(*args: Any, **kwargs: Any)

Bases: BaseServiceEndpoint

A service endpoint exposed by a containerized process.

This class extends the base service endpoint class with functionality concerning the life-cycle management and tracking of endpoints exposed by external services implemented as containerized processes.

Attributes:

Name Type Description
config ContainerServiceEndpointConfig

service endpoint configuration

status ContainerServiceEndpointStatus

service endpoint status

monitor Optional[Union[HTTPEndpointHealthMonitor, TCPEndpointHealthMonitor]]

optional service endpoint health monitor

Source code in src/zenml/services/service_endpoint.py
111
112
113
114
115
116
117
118
119
120
121
122
123
def __init__(
    self,
    *args: Any,
    **kwargs: Any,
) -> None:
    """Initialize the service endpoint.

    Args:
        *args: positional arguments.
        **kwargs: keyword arguments.
    """
    super().__init__(*args, **kwargs)
    self.config.name = self.config.name or self.__class__.__name__
Functions
prepare_for_start() -> None

Prepare the service endpoint for starting.

This method is called before the service is started.

Source code in src/zenml/services/container/container_service_endpoint.py
121
122
123
124
125
126
127
128
129
def prepare_for_start(self) -> None:
    """Prepare the service endpoint for starting.

    This method is called before the service is started.
    """
    self.status.protocol = self.config.protocol
    self.status.port = self._lookup_free_port()
    # Container endpoints are always exposed on the local host
    self.status.hostname = DEFAULT_LOCAL_SERVICE_IP_ADDRESS
ContainerServiceEndpointConfig

Bases: ServiceEndpointConfig

Local daemon service endpoint configuration.

Attributes:

Name Type Description
protocol ServiceEndpointProtocol

the TCP protocol implemented by the service endpoint

port Optional[int]

preferred TCP port value for the service endpoint. If the port is in use when the service is started, setting allocate_port to True will also try to allocate a new port value, otherwise an exception will be raised.

allocate_port bool

set to True to allocate a free TCP port for the service endpoint automatically.

ContainerServiceEndpointStatus

Bases: ServiceEndpointStatus

Local daemon service endpoint status.

Functions
entrypoint

Implementation of a containerized service entrypoint.

This executable file is utilized as an entrypoint for all ZenML services that are implemented as locally running docker containers.

Classes Functions
launch_service(service_config_file: str) -> None

Instantiate and launch a ZenML local service from its configuration file.

Parameters:

Name Type Description Default
service_config_file str

the path to the service configuration file.

required

Raises:

Type Description
TypeError

if the service configuration file is the wrong type.

Source code in src/zenml/services/container/entrypoint.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def launch_service(service_config_file: str) -> None:
    """Instantiate and launch a ZenML local service from its configuration file.

    Args:
        service_config_file: the path to the service configuration file.

    Raises:
        TypeError: if the service configuration file is the wrong type.
    """
    # doing zenml imports here to avoid polluting the stdout/stderr
    # with messages before daemonization is complete
    from zenml.integrations.registry import integration_registry
    from zenml.logger import get_logger
    from zenml.services import ContainerService

    logger = get_logger(__name__)

    logger.info("Loading service configuration from %s", service_config_file)
    with open(service_config_file, "r") as f:
        config = f.read()

    integration_registry.activate_integrations()

    logger.debug(
        "Running containerized service with configuration:\n %s", config
    )
    service = cast("ContainerService", ContainerService.from_json(config))
    if not isinstance(service, ContainerService):
        raise TypeError(
            f"Expected service type ContainerService but got "
            f"{type(service)} instead"
        )
    service.run()
run(config_file: str) -> None

Runs a ZenML service as a docker container.

Parameters:

Name Type Description Default
config_file str

path to the configuration file for the service.

required
Source code in src/zenml/services/container/entrypoint.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
@click.command()
@click.option("--config-file", required=True, type=click.Path(exists=True))
def run(
    config_file: str,
) -> None:
    """Runs a ZenML service as a docker container.

    Args:
        config_file: path to the configuration file for the service.
    """
    log_file = os.path.join(SERVICE_CONTAINER_PATH, SERVICE_LOG_FILE_NAME)

    devnull = "/dev/null"
    if hasattr(os, "devnull"):
        devnull = os.devnull

    devnull_fd = os.open(devnull, os.O_RDWR)
    log_fd = os.open(log_file, os.O_CREAT | os.O_RDWR | os.O_APPEND)
    out_fd = log_fd or devnull_fd

    os.dup2(devnull_fd, sys.stdin.fileno())
    os.dup2(out_fd, sys.stdout.fileno())
    os.dup2(out_fd, sys.stderr.fileno())

    launch_service(config_file)

local

Initialization of a local ZenML service.

Modules
local_daemon_entrypoint

Implementation of a local daemon entrypoint.

This executable file is utilized as an entrypoint for all ZenML services that are implemented as locally running daemon processes.

Classes Functions
run(config_file: str, log_file: str, pid_file: str) -> None

Runs a ZenML service as a daemon process.

Parameters:

Name Type Description Default
config_file str

path to the configuration file for the service.

required
log_file str

path to the log file for the service.

required
pid_file str

path to the PID file for the service.

required
Source code in src/zenml/services/local/local_daemon_entrypoint.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
@click.command()
@click.option("--config-file", required=True, type=click.Path(exists=True))
@click.option("--log-file", required=False, type=click.Path())
@click.option("--pid-file", required=False, type=click.Path())
def run(
    config_file: str,
    log_file: str,
    pid_file: str,
) -> None:
    """Runs a ZenML service as a daemon process.

    Args:
        config_file: path to the configuration file for the service.
        log_file: path to the log file for the service.
        pid_file: path to the PID file for the service.
    """

    @daemonize(
        log_file=log_file, pid_file=pid_file, working_directory=os.getcwd()
    )
    def launch_service(service_config_file: str) -> None:
        """Instantiate and launch a ZenML local service from its configuration file.

        Args:
            service_config_file: the path to the service configuration file.

        Raises:
            TypeError: if the service configuration file is the wrong type.
        """
        # doing zenml imports here to avoid polluting the stdout/stderr
        # with messages before daemonization is complete
        from zenml.integrations.registry import integration_registry
        from zenml.logger import get_logger
        from zenml.services import LocalDaemonService

        logger = get_logger(__name__)

        logger.info(
            "Loading service daemon configuration from %s", service_config_file
        )
        with open(service_config_file, "r") as f:
            config = f.read()

        integration_registry.activate_integrations()

        logger.debug("Running service daemon with configuration:\n %s", config)
        service = cast(
            "LocalDaemonService", LocalDaemonService.from_json(config)
        )
        if not isinstance(service, LocalDaemonService):
            raise TypeError(
                f"Expected service type LocalDaemonService but got "
                f"{type(service)} instead"
            )
        service.run()

    launch_service(config_file)
local_service

Implementation of a local ZenML service.

Classes
LocalDaemonService(**attrs: Any)

Bases: BaseService

A service represented by a local daemon process.

This class extends the base service class with functionality concerning the life-cycle management and tracking of external services implemented as local daemon processes.

To define a local daemon service, subclass this class and implement the run method. Upon start, the service will spawn a daemon process that ends up calling the run method.

For example,


from zenml.services import ServiceType, LocalDaemonService, LocalDaemonServiceConfig
import time

class SleepingDaemonConfig(LocalDaemonServiceConfig):

    wake_up_after: int

class SleepingDaemon(LocalDaemonService):

    SERVICE_TYPE = ServiceType(
        name="sleeper",
        description="Sleeping daemon",
        type="daemon",
        flavor="sleeping",
    )
    config: SleepingDaemonConfig

    def run(self) -> None:
        time.sleep(self.config.wake_up_after)

daemon = SleepingDaemon(config=SleepingDaemonConfig(wake_up_after=10))
daemon.start()

NOTE: the SleepingDaemon class and its parent module have to be discoverable as part of a ZenML Integration, otherwise the daemon will fail with the following error:

TypeError: Cannot load service with unregistered service type:
name='sleeper' type='daemon' flavor='sleeping' description='Sleeping daemon'

Attributes:

Name Type Description
config LocalDaemonServiceConfig

service configuration

status LocalDaemonServiceStatus

service status

endpoint Optional[LocalDaemonServiceEndpoint]

optional service endpoint

Source code in src/zenml/services/service.py
188
189
190
191
192
193
194
195
196
197
198
def __init__(
    self,
    **attrs: Any,
) -> None:
    """Initialize the service instance.

    Args:
        **attrs: keyword arguments.
    """
    super().__init__(**attrs)
    self.config.name = self.config.name or self.__class__.__name__
Functions
check_status() -> Tuple[ServiceState, str]

Check the the current operational state of the daemon process.

Returns:

Type Description
ServiceState

The operational state of the daemon process and a message

str

providing additional information about that state (e.g. a

Tuple[ServiceState, str]

description of the error, if one is encountered).

Source code in src/zenml/services/local/local_service.py
273
274
275
276
277
278
279
280
281
282
283
284
285
def check_status(self) -> Tuple[ServiceState, str]:
    """Check the the current operational state of the daemon process.

    Returns:
        The operational state of the daemon process and a message
        providing additional information about that state (e.g. a
        description of the error, if one is encountered).
    """
    if not self.status.pid:
        return ServiceState.INACTIVE, "service daemon is not running"

    # the daemon is running
    return ServiceState.ACTIVE, ""
deprovision(force: bool = False) -> None

Deprovision the service.

Parameters:

Name Type Description Default
force bool

if True, the service daemon will be forcefully stopped

False
Source code in src/zenml/services/local/local_service.py
429
430
431
432
433
434
435
def deprovision(self, force: bool = False) -> None:
    """Deprovision the service.

    Args:
        force: if True, the service daemon will be forcefully stopped
    """
    self._stop_daemon(force)
get_logs(follow: bool = False, tail: Optional[int] = None) -> Generator[str, bool, None]

Retrieve the service logs.

Parameters:

Name Type Description Default
follow bool

if True, the logs will be streamed as they are written

False
tail Optional[int]

only retrieve the last NUM lines of log output.

None

Yields:

Type Description
str

A generator that can be accessed to get the service logs.

Source code in src/zenml/services/local/local_service.py
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
def get_logs(
    self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
    """Retrieve the service logs.

    Args:
        follow: if True, the logs will be streamed as they are written
        tail: only retrieve the last NUM lines of log output.

    Yields:
        A generator that can be accessed to get the service logs.
    """
    if not self.status.log_file or not os.path.exists(
        self.status.log_file
    ):
        return

    with open(self.status.log_file, "r") as f:
        if tail:
            # TODO[ENG-864]: implement a more efficient tailing mechanism that
            #   doesn't read the entire file
            lines = f.readlines()[-tail:]
            for line in lines:
                yield line.rstrip("\n")
            if not follow:
                return
        line = ""
        while True:
            partial_line = f.readline()
            if partial_line:
                line += partial_line
                if line.endswith("\n"):
                    stop = yield line.rstrip("\n")
                    if stop:
                        break
                    line = ""
            elif follow:
                time.sleep(1)
            else:
                break
get_service_status_message() -> str

Get a message about the current operational state of the service.

Returns:

Type Description
str

A message providing information about the current operational

str

state of the service.

Source code in src/zenml/services/local/local_service.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def get_service_status_message(self) -> str:
    """Get a message about the current operational state of the service.

    Returns:
        A message providing information about the current operational
        state of the service.
    """
    msg = super().get_service_status_message()
    pid = self.status.pid
    if pid:
        msg += f"  Daemon PID: `{self.status.pid}`\n"
    if self.status.log_file:
        msg += (
            f"For more information on the service status, please see the "
            f"following log file: {self.status.log_file}\n"
        )
    return msg
provision() -> None

Provision the service.

Source code in src/zenml/services/local/local_service.py
425
426
427
def provision(self) -> None:
    """Provision the service."""
    self._start_daemon()
run() -> None abstractmethod

Run the service daemon process associated with this service.

Subclasses must implement this method to provide the service daemon functionality. This method will be executed in the context of the running daemon, not in the context of the process that calls the start method.

Source code in src/zenml/services/local/local_service.py
491
492
493
494
495
496
497
498
499
@abstractmethod
def run(self) -> None:
    """Run the service daemon process associated with this service.

    Subclasses must implement this method to provide the service daemon
    functionality. This method will be executed in the context of the
    running daemon, not in the context of the process that calls the
    `start` method.
    """
start(timeout: int = 0) -> None

Start the service and optionally wait for it to become active.

Parameters:

Name Type Description Default
timeout int

amount of time to wait for the service to become active. If set to 0, the method will return immediately after checking the service status.

0
Source code in src/zenml/services/local/local_service.py
437
438
439
440
441
442
443
444
445
446
447
448
def start(self, timeout: int = 0) -> None:
    """Start the service and optionally wait for it to become active.

    Args:
        timeout: amount of time to wait for the service to become active.
            If set to 0, the method will return immediately after checking
            the service status.
    """
    if not self.config.blocking:
        super().start(timeout)
    else:
        self.run()
LocalDaemonServiceConfig(**data: Any)

Bases: ServiceConfig

Local daemon service configuration.

Attributes:

Name Type Description
silent_daemon bool

set to True to suppress the output of the daemon (i.e. redirect stdout and stderr to /dev/null). If False, the daemon output will be redirected to a logfile.

root_runtime_path Optional[str]

the root path where the service daemon will store service configuration files

singleton bool

set to True to store the service daemon configuration files directly in the root_runtime_path directory instead of creating a subdirectory for each service instance. Only has effect if the root_runtime_path is also set.

blocking bool

set to True to run the service the context of the current process and block until the service is stopped instead of running the service as a daemon process. Useful for operating systems that do not support daemon processes.

Source code in src/zenml/services/service.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def __init__(self, **data: Any):
    """Initialize the service configuration.

    Args:
        **data: keyword arguments.

    Raises:
        ValueError: if neither 'name' nor 'model_name' is set.
    """
    super().__init__(**data)
    if self.name or self.model_name:
        self.service_name = data.get(
            "service_name",
            f"{ZENM_ENDPOINT_PREFIX}{self.name or self.model_name}",
        )
    else:
        raise ValueError("Either 'name' or 'model_name' must be set.")
LocalDaemonServiceStatus

Bases: ServiceStatus

Local daemon service status.

Attributes:

Name Type Description
runtime_path Optional[str]

the path where the service daemon runtime files (the configuration file used to start the service daemon and the logfile) are located

silent_daemon bool

flag indicating whether the output of the daemon is suppressed (redirected to /dev/null).

Attributes
config_file: Optional[str] property

Get the path to the configuration file used to start the service daemon.

Returns:

Type Description
Optional[str]

The path to the configuration file, or None, if the

Optional[str]

service has never been started before.

log_file: Optional[str] property

Get the path to the log file where the service output is/has been logged.

Returns:

Type Description
Optional[str]

The path to the log file, or None, if the service has never been

Optional[str]

started before, or if the service daemon output is suppressed.

pid: Optional[int] property

Return the PID of the currently running daemon.

Returns:

Type Description
Optional[int]

The PID of the daemon, or None, if the service has never been

Optional[int]

started before.

pid_file: Optional[str] property

Get the path to a daemon PID file.

This is where the last known PID of the daemon process is stored.

Returns:

Type Description
Optional[str]

The path to the PID file, or None, if the service has never been

Optional[str]

started before.

Functions
local_service_endpoint

Implementation of a local service endpoint.

Classes
LocalDaemonServiceEndpoint(*args: Any, **kwargs: Any)

Bases: BaseServiceEndpoint

A service endpoint exposed by a local daemon process.

This class extends the base service endpoint class with functionality concerning the life-cycle management and tracking of endpoints exposed by external services implemented as local daemon processes.

Attributes:

Name Type Description
config LocalDaemonServiceEndpointConfig

service endpoint configuration

status LocalDaemonServiceEndpointStatus

service endpoint status

monitor Optional[Union[HTTPEndpointHealthMonitor, TCPEndpointHealthMonitor]]

optional service endpoint health monitor

Source code in src/zenml/services/service_endpoint.py
111
112
113
114
115
116
117
118
119
120
121
122
123
def __init__(
    self,
    *args: Any,
    **kwargs: Any,
) -> None:
    """Initialize the service endpoint.

    Args:
        *args: positional arguments.
        **kwargs: keyword arguments.
    """
    super().__init__(*args, **kwargs)
    self.config.name = self.config.name or self.__class__.__name__
Functions
prepare_for_start() -> None

Prepare the service endpoint for starting.

This method is called before the service is started.

Source code in src/zenml/services/local/local_service_endpoint.py
124
125
126
127
128
129
130
131
def prepare_for_start(self) -> None:
    """Prepare the service endpoint for starting.

    This method is called before the service is started.
    """
    self.status.protocol = self.config.protocol
    self.status.hostname = self.config.ip_address
    self.status.port = self._lookup_free_port()
LocalDaemonServiceEndpointConfig

Bases: ServiceEndpointConfig

Local daemon service endpoint configuration.

Attributes:

Name Type Description
protocol ServiceEndpointProtocol

the TCP protocol implemented by the service endpoint

port Optional[int]

preferred TCP port value for the service endpoint. If the port is in use when the service is started, setting allocate_port to True will also try to allocate a new port value, otherwise an exception will be raised.

ip_address str

the IP address of the service endpoint. If not set, the default localhost IP address will be used.

allocate_port bool

set to True to allocate a free TCP port for the service endpoint automatically.

LocalDaemonServiceEndpointStatus

Bases: ServiceEndpointStatus

Local daemon service endpoint status.

Functions

service

Implementation of the ZenML Service class.

Classes
BaseDeploymentService(**attrs: Any)

Bases: BaseService

Base class for deployment services.

Source code in src/zenml/services/service.py
188
189
190
191
192
193
194
195
196
197
198
def __init__(
    self,
    **attrs: Any,
) -> None:
    """Initialize the service instance.

    Args:
        **attrs: keyword arguments.
    """
    super().__init__(**attrs)
    self.config.name = self.config.name or self.__class__.__name__
Attributes
healthcheck_url: Optional[str] property

Gets the healthcheck URL for the endpoint.

Returns:

Type Description
Optional[str]

the healthcheck URL for the endpoint

prediction_url: Optional[str] property

Gets the prediction URL for the endpoint.

Returns:

Type Description
Optional[str]

the prediction URL for the endpoint

BaseService(**attrs: Any)

Bases: BaseTypedModel

Base service class.

This class implements generic functionality concerning the life-cycle management and tracking of an external service (e.g. process, container, Kubernetes deployment etc.).

Attributes:

Name Type Description
SERVICE_TYPE ServiceType

a service type descriptor with information describing the service class. Every concrete service class must define this.

admin_state ServiceState

the administrative state of the service.

uuid UUID

unique UUID identifier for the service instance.

config ServiceConfig

service configuration

status ServiceStatus

service status

endpoint Optional[BaseServiceEndpoint]

optional service endpoint

Initialize the service instance.

Parameters:

Name Type Description Default
**attrs Any

keyword arguments.

{}
Source code in src/zenml/services/service.py
188
189
190
191
192
193
194
195
196
197
198
def __init__(
    self,
    **attrs: Any,
) -> None:
    """Initialize the service instance.

    Args:
        **attrs: keyword arguments.
    """
    super().__init__(**attrs)
    self.config.name = self.config.name or self.__class__.__name__
Attributes
is_failed: bool property

Check if the service is currently failed.

This method will actively poll the external service to get its status and will return the result.

Returns:

Type Description
bool

True if the service is in a failure state, otherwise False.

is_running: bool property

Check if the service is currently running.

This method will actively poll the external service to get its status and will return the result.

Returns:

Type Description
bool

True if the service is running and active (i.e. the endpoints are

bool

responsive, if any are configured), otherwise False.

is_stopped: bool property

Check if the service is currently stopped.

This method will actively poll the external service to get its status and will return the result.

Returns:

Type Description
bool

True if the service is stopped, otherwise False.

Functions
check_status() -> Tuple[ServiceState, str] abstractmethod

Check the the current operational state of the external service.

This method should be overridden by subclasses that implement concrete service tracking functionality.

Returns:

Type Description
ServiceState

The operational state of the external service and a message

str

providing additional information about that state (e.g. a

Tuple[ServiceState, str]

description of the error if one is encountered while checking the

Tuple[ServiceState, str]

service status).

Source code in src/zenml/services/service.py
243
244
245
246
247
248
249
250
251
252
253
254
255
@abstractmethod
def check_status(self) -> Tuple[ServiceState, str]:
    """Check the the current operational state of the external service.

    This method should be overridden by subclasses that implement
    concrete service tracking functionality.

    Returns:
        The operational state of the external service and a message
        providing additional information about that state (e.g. a
        description of the error if one is encountered while checking the
        service status).
    """
deprovision(force: bool = False) -> None

Deprovisions all resources used by the service.

Parameters:

Name Type Description Default
force bool

if True, the service will be deprovisioned even if it is in a failed state.

False

Raises:

Type Description
NotImplementedError

if the service does not implement deprovisioning functionality.

Source code in src/zenml/services/service.py
412
413
414
415
416
417
418
419
420
421
422
423
424
425
def deprovision(self, force: bool = False) -> None:
    """Deprovisions all resources used by the service.

    Args:
        force: if True, the service will be deprovisioned even if it is
            in a failed state.

    Raises:
        NotImplementedError: if the service does not implement
            deprovisioning functionality.
    """
    raise NotImplementedError(
        f"Deprovisioning resources not implemented for {self}."
    )
from_json(json_str: str) -> BaseTypedModel classmethod

Loads a service from a JSON string.

Parameters:

Name Type Description Default
json_str str

the JSON string to load from.

required

Returns:

Type Description
BaseTypedModel

The loaded service object.

Source code in src/zenml/services/service.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
@classmethod
def from_json(cls, json_str: str) -> "BaseTypedModel":
    """Loads a service from a JSON string.

    Args:
        json_str: the JSON string to load from.

    Returns:
        The loaded service object.
    """
    service_dict = json.loads(json_str)
    class_: Type[BaseService] = source_utils.load_and_validate_class(
        source=service_dict["type"], expected_class=BaseService
    )
    return class_.from_dict(service_dict)
from_model(model: ServiceResponse) -> BaseService classmethod

Loads a service from a model.

Parameters:

Name Type Description Default
model ServiceResponse

The ServiceResponse to load from.

required

Returns:

Type Description
BaseService

The loaded service object.

Raises:

Type Description
ValueError

if the service source is not found in the model.

Source code in src/zenml/services/service.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
@classmethod
def from_model(cls, model: "ServiceResponse") -> "BaseService":
    """Loads a service from a model.

    Args:
        model: The ServiceResponse to load from.

    Returns:
        The loaded service object.

    Raises:
        ValueError: if the service source is not found in the model.
    """
    if not model.service_source:
        raise ValueError("Service source not found in the model.")
    class_: Type[BaseService] = source_utils.load_and_validate_class(
        source=model.service_source, expected_class=BaseService
    )
    return class_(
        uuid=model.id,
        admin_state=model.admin_state,
        config=model.config,
        status=model.status,
        service_type=model.service_type.model_dump(),
        endpoint=model.endpoint,
    )
get_healthcheck_url() -> Optional[str]

Gets the healthcheck URL for the endpoint.

Returns:

Type Description
Optional[str]

the healthcheck URL for the endpoint

Source code in src/zenml/services/service.py
497
498
499
500
501
502
503
504
505
506
507
508
def get_healthcheck_url(self) -> Optional[str]:
    """Gets the healthcheck URL for the endpoint.

    Returns:
        the healthcheck URL for the endpoint
    """
    return (
        self.endpoint.monitor.get_healthcheck_uri(self.endpoint)
        if (self.endpoint and self.endpoint.monitor)
        and isinstance(self.endpoint.monitor, HTTPEndpointHealthMonitor)
        else None
    )
get_logs(follow: bool = False, tail: Optional[int] = None) -> Generator[str, bool, None] abstractmethod

Retrieve the service logs.

This method should be overridden by subclasses that implement concrete service tracking functionality.

Parameters:

Name Type Description Default
follow bool

if True, the logs will be streamed as they are written

False
tail Optional[int]

only retrieve the last NUM lines of log output.

None

Returns:

Type Description
None

A generator that can be accessed to get the service logs.

Source code in src/zenml/services/service.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
@abstractmethod
def get_logs(
    self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
    """Retrieve the service logs.

    This method should be overridden by subclasses that implement
    concrete service tracking functionality.

    Args:
        follow: if True, the logs will be streamed as they are written
        tail: only retrieve the last NUM lines of log output.

    Returns:
        A generator that can be accessed to get the service logs.
    """
get_prediction_url() -> Optional[str]

Gets the prediction URL for the endpoint.

Returns:

Type Description
Optional[str]

the prediction URL for the endpoint

Source code in src/zenml/services/service.py
482
483
484
485
486
487
488
489
490
491
492
493
494
495
def get_prediction_url(self) -> Optional[str]:
    """Gets the prediction URL for the endpoint.

    Returns:
        the prediction URL for the endpoint
    """
    prediction_url = None
    if isinstance(self, BaseDeploymentService) and self.prediction_url:
        prediction_url = self.prediction_url
    elif self.endpoint:
        prediction_url = (
            self.endpoint.status.uri if self.endpoint.status else None
        )
    return prediction_url
get_service_status_message() -> str

Get a service status message.

Returns:

Type Description
str

A message providing information about the current operational

str

state of the service.

Source code in src/zenml/services/service.py
310
311
312
313
314
315
316
317
318
319
320
321
def get_service_status_message(self) -> str:
    """Get a service status message.

    Returns:
        A message providing information about the current operational
        state of the service.
    """
    return (
        f"  Administrative state: `{self.admin_state.value}`\n"
        f"  Operational state: `{self.status.state.value}`\n"
        f"  Last status message: '{self.status.last_error}'\n"
    )
poll_service_status(timeout: int = 0) -> bool

Polls the external service status.

It does this until the service operational state matches the administrative state, the service enters a failed state, or the timeout is reached.

Parameters:

Name Type Description Default
timeout int

maximum time to wait for the service operational state to match the administrative state, in seconds

0

Returns:

Type Description
bool

True if the service operational state matches the administrative

bool

state, False otherwise.

Source code in src/zenml/services/service.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
def poll_service_status(self, timeout: int = 0) -> bool:
    """Polls the external service status.

    It does this until the service operational state matches the
    administrative state, the service enters a failed state, or the timeout
    is reached.

    Args:
        timeout: maximum time to wait for the service operational state
            to match the administrative state, in seconds

    Returns:
        True if the service operational state matches the administrative
        state, False otherwise.
    """
    time_remaining = timeout
    while True:
        if self.admin_state == ServiceState.ACTIVE and self.is_running:
            return True
        if self.admin_state == ServiceState.INACTIVE and self.is_stopped:
            return True
        if self.is_failed:
            return False
        if time_remaining <= 0:
            break
        time.sleep(1)
        time_remaining -= 1

    if timeout > 0:
        logger.error(
            f"Timed out waiting for service {self} to become "
            f"{self.admin_state.value}:\n"
            + self.get_service_status_message()
        )

    return False
provision() -> None

Provisions resources to run the service.

Raises:

Type Description
NotImplementedError

if the service does not implement provisioning functionality

Source code in src/zenml/services/service.py
402
403
404
405
406
407
408
409
410
def provision(self) -> None:
    """Provisions resources to run the service.

    Raises:
        NotImplementedError: if the service does not implement provisioning functionality
    """
    raise NotImplementedError(
        f"Provisioning resources not implemented for {self}."
    )
start(timeout: int = 0) -> None

Start the service and optionally wait for it to become active.

Parameters:

Name Type Description Default
timeout int

amount of time to wait for the service to become active. If set to 0, the method will return immediately after checking the service status.

0
Source code in src/zenml/services/service.py
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
@update_service_status(
    pre_status=ServiceState.PENDING_STARTUP,
    post_status=ServiceState.ACTIVE,
)
def start(self, timeout: int = 0) -> None:
    """Start the service and optionally wait for it to become active.

    Args:
        timeout: amount of time to wait for the service to become active.
            If set to 0, the method will return immediately after checking
            the service status.
    """
    with console.status(f"Starting service '{self}'.\n"):
        self.admin_state = ServiceState.ACTIVE
        self.provision()
        if timeout > 0 and not self.poll_service_status(timeout):
            logger.error(
                f"Failed to start service {self}\n"
                + self.get_service_status_message()
            )
stop(timeout: int = 0, force: bool = False) -> None

Stop the service and optionally wait for it to shutdown.

Parameters:

Name Type Description Default
timeout int

amount of time to wait for the service to shutdown. If set to 0, the method will return immediately after checking the service status.

0
force bool

if True, the service will be stopped even if it is not currently running.

False
Source code in src/zenml/services/service.py
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
@update_service_status(
    pre_status=ServiceState.PENDING_SHUTDOWN,
    post_status=ServiceState.INACTIVE,
)
def stop(self, timeout: int = 0, force: bool = False) -> None:
    """Stop the service and optionally wait for it to shutdown.

    Args:
        timeout: amount of time to wait for the service to shutdown.
            If set to 0, the method will return immediately after checking
            the service status.
        force: if True, the service will be stopped even if it is not
            currently running.
    """
    with console.status(f"Stopping service '{self}'.\n"):
        self.admin_state = ServiceState.INACTIVE
        self.deprovision(force)
        if timeout > 0:
            self.poll_service_status(timeout)
            if not self.is_stopped:
                logger.error(
                    f"Failed to stop service {self}. Last state: "
                    f"'{self.status.state.value}'. Last error: "
                    f"'{self.status.last_error}'"
                )
update(config: ServiceConfig) -> None

Update the service configuration.

Parameters:

Name Type Description Default
config ServiceConfig

the new service configuration.

required
Source code in src/zenml/services/service.py
427
428
429
430
431
432
433
def update(self, config: ServiceConfig) -> None:
    """Update the service configuration.

    Args:
        config: the new service configuration.
    """
    self.config = config
update_status() -> None

Update the status of the service.

Check the current operational state of the external service and update the local operational status information to reflect it.

This method should be overridden by subclasses that implement concrete service status tracking functionality.

Source code in src/zenml/services/service.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def update_status(self) -> None:
    """Update the status of the service.

    Check the current operational state of the external service
    and update the local operational status information to reflect it.

    This method should be overridden by subclasses that implement
    concrete service status tracking functionality.
    """
    logger.debug(
        "Running status check for service '%s' ...",
        self,
    )
    try:
        state, err = self.check_status()
        logger.debug(
            "Status check results for service '%s': %s [%s]",
            self,
            state.name,
            err,
        )
        self.status.update_state(state, err)

        # don't bother checking the endpoint state if the service is not active
        if self.status.state == ServiceState.INACTIVE:
            return

        if self.endpoint:
            self.endpoint.update_status()
    except Exception as e:
        logger.error(
            f"Failed to update status for service '{self}': {e}",
            exc_info=True,
        )
        self.status.update_state(ServiceState.ERROR, str(e))
ServiceConfig(**data: Any)

Bases: BaseTypedModel

Generic service configuration.

Concrete service classes should extend this class and add additional attributes that they want to see reflected and used in the service configuration.

Attributes:

Name Type Description
name str

name for the service instance

description str

description of the service

pipeline_name str

name of the pipeline that spun up the service

pipeline_step_name str

name of the pipeline step that spun up the service

run_name str

name of the pipeline run that spun up the service.

Initialize the service configuration.

Parameters:

Name Type Description Default
**data Any

keyword arguments.

{}

Raises:

Type Description
ValueError

if neither 'name' nor 'model_name' is set.

Source code in src/zenml/services/service.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def __init__(self, **data: Any):
    """Initialize the service configuration.

    Args:
        **data: keyword arguments.

    Raises:
        ValueError: if neither 'name' nor 'model_name' is set.
    """
    super().__init__(**data)
    if self.name or self.model_name:
        self.service_name = data.get(
            "service_name",
            f"{ZENM_ENDPOINT_PREFIX}{self.name or self.model_name}",
        )
    else:
        raise ValueError("Either 'name' or 'model_name' must be set.")
Functions
get_service_labels() -> Dict[str, str]

Get the service labels.

Returns:

Type Description
Dict[str, str]

a dictionary of service labels.

Source code in src/zenml/services/service.py
149
150
151
152
153
154
155
156
157
158
159
def get_service_labels(self) -> Dict[str, str]:
    """Get the service labels.

    Returns:
        a dictionary of service labels.
    """
    labels = {}
    for k, v in self.model_dump().items():
        label = f"zenml_{k}".upper()
        labels[label] = str(v)
    return labels
Functions
update_service_status(pre_status: Optional[ServiceState] = None, post_status: Optional[ServiceState] = None, error_status: ServiceState = ServiceState.ERROR) -> Callable[[T], T]

A decorator to update the service status before and after a method call.

This decorator is used to wrap service methods and update the service status before and after the method call. If the method raises an exception, the service status is updated to reflect the error state.

Parameters:

Name Type Description Default
pre_status Optional[ServiceState]

the status to update before the method call.

None
post_status Optional[ServiceState]

the status to update after the method call.

None
error_status ServiceState

the status to update if the method raises an exception.

ERROR

Returns:

Type Description
Callable[[T], T]

Callable[..., Any]: The wrapped method with exception handling.

Source code in src/zenml/services/service.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def update_service_status(
    pre_status: Optional[ServiceState] = None,
    post_status: Optional[ServiceState] = None,
    error_status: ServiceState = ServiceState.ERROR,
) -> Callable[[T], T]:
    """A decorator to update the service status before and after a method call.

    This decorator is used to wrap service methods and update the service status
    before and after the method call. If the method raises an exception, the
    service status is updated to reflect the error state.

    Args:
        pre_status: the status to update before the method call.
        post_status: the status to update after the method call.
        error_status: the status to update if the method raises an exception.

    Returns:
        Callable[..., Any]: The wrapped method with exception handling.
    """

    def decorator(func: T) -> T:
        @wraps(func)
        def wrapper(self: "BaseService", *args: Any, **kwargs: Any) -> Any:
            if pre_status:
                self.status.update_state(pre_status, "")
            try:
                logger.debug(f"Calling {func.__name__} method...")
                result = func(self, *args, **kwargs)
                logger.debug(f"{func.__name__} method executed successfully.")
                if post_status:
                    self.status.update_state(post_status, "")
                return result
            except Exception as e:
                logger.error(
                    f"Error occurred in {func.__name__} method: {str(e)}"
                )
                self.status.update_state(error_status, str(e))
                raise

        return wrapper  # type: ignore

    return decorator
Modules

service_endpoint

Implementation of a ZenML service endpoint.

Classes
BaseServiceEndpoint(*args: Any, **kwargs: Any)

Bases: BaseTypedModel

Base service class.

This class implements generic functionality concerning the life-cycle management and tracking of an external service endpoint (e.g. a HTTP/HTTPS API or generic TCP endpoint exposed by a service).

Attributes:

Name Type Description
admin_state ServiceState

the administrative state of the service endpoint

config ServiceEndpointConfig

service endpoint configuration

status ServiceEndpointStatus

service endpoint status

monitor Optional[BaseServiceEndpointHealthMonitor]

optional service endpoint health monitor

Initialize the service endpoint.

Parameters:

Name Type Description Default
*args Any

positional arguments.

()
**kwargs Any

keyword arguments.

{}
Source code in src/zenml/services/service_endpoint.py
111
112
113
114
115
116
117
118
119
120
121
122
123
def __init__(
    self,
    *args: Any,
    **kwargs: Any,
) -> None:
    """Initialize the service endpoint.

    Args:
        *args: positional arguments.
        **kwargs: keyword arguments.
    """
    super().__init__(*args, **kwargs)
    self.config.name = self.config.name or self.__class__.__name__
Functions
check_status() -> Tuple[ServiceState, str]

Check the the current operational state of the external service endpoint.

Returns:

Type Description
ServiceState

The operational state of the external service endpoint and a

str

message providing additional information about that state

Tuple[ServiceState, str]

(e.g. a description of the error, if one is encountered while

Tuple[ServiceState, str]

checking the service status).

Source code in src/zenml/services/service_endpoint.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def check_status(self) -> Tuple[ServiceState, str]:
    """Check the the current operational state of the external service endpoint.

    Returns:
        The operational state of the external service endpoint and a
        message providing additional information about that state
        (e.g. a description of the error, if one is encountered while
        checking the service status).
    """
    if not self.monitor:
        # no health monitor configured; assume service operational state
        # always matches the admin state
        return self.admin_state, ""
    return self.monitor.check_endpoint_status(self)
is_active() -> bool

Check if the service endpoint is active.

This means that it is responsive and can receive requests). This method will use the configured health monitor to actively check the endpoint status and will return the result.

Returns:

Type Description
bool

True if the service endpoint is active, otherwise False.

Source code in src/zenml/services/service_endpoint.py
158
159
160
161
162
163
164
165
166
167
168
169
def is_active(self) -> bool:
    """Check if the service endpoint is active.

    This means that it is responsive and can receive requests). This method
    will use the configured health monitor to actively check the endpoint
    status and will return the result.

    Returns:
        True if the service endpoint is active, otherwise False.
    """
    self.update_status()
    return self.status.state == ServiceState.ACTIVE
is_inactive() -> bool

Check if the service endpoint is inactive.

This means that it is unresponsive and cannot receive requests. This method will use the configured health monitor to actively check the endpoint status and will return the result.

Returns:

Type Description
bool

True if the service endpoint is inactive, otherwise False.

Source code in src/zenml/services/service_endpoint.py
171
172
173
174
175
176
177
178
179
180
181
182
def is_inactive(self) -> bool:
    """Check if the service endpoint is inactive.

    This means that it is unresponsive and cannot receive requests. This
    method will use the configured health monitor to actively check the
    endpoint status and will return the result.

    Returns:
        True if the service endpoint is inactive, otherwise False.
    """
    self.update_status()
    return self.status.state == ServiceState.INACTIVE
update_status() -> None

Check the the current operational state of the external service endpoint.

It updates the local operational status information accordingly.

Source code in src/zenml/services/service_endpoint.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def update_status(self) -> None:
    """Check the the current operational state of the external service endpoint.

    It updates the local operational status information accordingly.
    """
    logger.debug(
        "Running health check for service endpoint '%s' ...",
        self.config.name,
    )
    state, err = self.check_status()
    logger.debug(
        "Health check results for service endpoint '%s': %s [%s]",
        self.config.name,
        state.name,
        err,
    )
    self.status.update_state(state, err)
ServiceEndpointConfig

Bases: BaseTypedModel

Generic service endpoint configuration.

Concrete service classes should extend this class and add additional attributes that they want to see reflected and use in the endpoint configuration.

Attributes:

Name Type Description
name str

unique name for the service endpoint

description str

description of the service endpoint

ServiceEndpointProtocol

Bases: StrEnum

Possible endpoint protocol values.

ServiceEndpointStatus

Bases: ServiceStatus

Status information describing the operational state of a service endpoint.

For example, this could be a HTTP/HTTPS API or generic TCP endpoint exposed by a service. Concrete service classes should extend this class and add additional attributes that make up the operational state of the service endpoint.

Attributes:

Name Type Description
protocol ServiceEndpointProtocol

the TCP protocol used by the service endpoint

hostname Optional[str]

the hostname where the service endpoint is accessible

port Optional[int]

the current TCP port where the service endpoint is accessible

Attributes
uri: Optional[str] property

Get the URI of the service endpoint.

Returns:

Type Description
Optional[str]

The URI of the service endpoint or None, if the service endpoint

Optional[str]

operational status doesn't have the required information.

Functions

service_monitor

Implementation of the service health monitor.

Classes
BaseServiceEndpointHealthMonitor

Bases: BaseTypedModel

Base class used for service endpoint health monitors.

Attributes:

Name Type Description
config ServiceEndpointHealthMonitorConfig

health monitor configuration for endpoint

Functions
check_endpoint_status(endpoint: BaseServiceEndpoint) -> Tuple[ServiceState, str] abstractmethod

Check the the current operational state of the external service endpoint.

Parameters:

Name Type Description Default
endpoint BaseServiceEndpoint

service endpoint to check

required

This method should be overridden by subclasses that implement concrete service endpoint tracking functionality.

Returns:

Type Description
ServiceState

The operational state of the external service endpoint and an

str

optional error message, if an error is encountered while checking

Tuple[ServiceState, str]

the service endpoint status.

Source code in src/zenml/services/service_monitor.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@abstractmethod
def check_endpoint_status(
    self, endpoint: "BaseServiceEndpoint"
) -> Tuple[ServiceState, str]:
    """Check the the current operational state of the external service endpoint.

    Args:
        endpoint: service endpoint to check

    This method should be overridden by subclasses that implement
    concrete service endpoint tracking functionality.

    Returns:
        The operational state of the external service endpoint and an
        optional error message, if an error is encountered while checking
        the service endpoint status.
    """
HTTPEndpointHealthMonitor

Bases: BaseServiceEndpointHealthMonitor

HTTP service endpoint health monitor.

Attributes:

Name Type Description
config HTTPEndpointHealthMonitorConfig

health monitor configuration for HTTP endpoint

Functions
check_endpoint_status(endpoint: BaseServiceEndpoint) -> Tuple[ServiceState, str]

Run a HTTP endpoint API healthcheck.

Parameters:

Name Type Description Default
endpoint BaseServiceEndpoint

service endpoint to check.

required

Returns:

Type Description
ServiceState

The operational state of the external HTTP endpoint and an

str

optional message describing that state (e.g. an error message,

Tuple[ServiceState, str]

if an error is encountered while checking the HTTP endpoint

Tuple[ServiceState, str]

status).

Source code in src/zenml/services/service_monitor.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def check_endpoint_status(
    self, endpoint: "BaseServiceEndpoint"
) -> Tuple[ServiceState, str]:
    """Run a HTTP endpoint API healthcheck.

    Args:
        endpoint: service endpoint to check.

    Returns:
        The operational state of the external HTTP endpoint and an
        optional message describing that state (e.g. an error message,
        if an error is encountered while checking the HTTP endpoint
        status).
    """
    from zenml.services.service_endpoint import ServiceEndpointProtocol

    if endpoint.status.protocol not in [
        ServiceEndpointProtocol.HTTP,
        ServiceEndpointProtocol.HTTPS,
    ]:
        return (
            ServiceState.ERROR,
            "endpoint protocol is not HTTP nor HTTPS.",
        )

    check_uri = self.get_healthcheck_uri(endpoint)
    if not check_uri:
        return ServiceState.ERROR, "no HTTP healthcheck URI available"

    logger.debug("Running HTTP healthcheck for URI: %s", check_uri)

    try:
        if self.config.use_head_request:
            r = requests.head(
                check_uri,
                timeout=self.config.http_timeout,
            )
        else:
            r = requests.get(
                check_uri,
                timeout=self.config.http_timeout,
            )
        if r.status_code == self.config.http_status_code:
            # the endpoint is healthy
            return ServiceState.ACTIVE, ""
        error = f"HTTP endpoint healthcheck returned unexpected status code: {r.status_code}"
    except requests.ConnectionError as e:
        error = f"HTTP endpoint healthcheck connection error: {str(e)}"
    except requests.Timeout as e:
        error = f"HTTP endpoint healthcheck request timed out: {str(e)}"
    except requests.RequestException as e:
        error = (
            f"unexpected error encountered while running HTTP endpoint "
            f"healthcheck: {str(e)}"
        )

    return ServiceState.ERROR, error
get_healthcheck_uri(endpoint: BaseServiceEndpoint) -> Optional[str]

Get the healthcheck URI for the given service endpoint.

Parameters:

Name Type Description Default
endpoint BaseServiceEndpoint

service endpoint to get the healthcheck URI for

required

Returns:

Type Description
Optional[str]

The healthcheck URI for the given service endpoint or None, if

Optional[str]

the service endpoint doesn't have a healthcheck URI.

Source code in src/zenml/services/service_monitor.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def get_healthcheck_uri(
    self, endpoint: "BaseServiceEndpoint"
) -> Optional[str]:
    """Get the healthcheck URI for the given service endpoint.

    Args:
        endpoint: service endpoint to get the healthcheck URI for

    Returns:
        The healthcheck URI for the given service endpoint or None, if
        the service endpoint doesn't have a healthcheck URI.
    """
    uri = endpoint.status.uri
    if not uri:
        return None
    if not self.config.healthcheck_uri_path:
        return uri
    return (
        f"{uri.rstrip('/')}/{self.config.healthcheck_uri_path.lstrip('/')}"
    )
HTTPEndpointHealthMonitorConfig

Bases: ServiceEndpointHealthMonitorConfig

HTTP service endpoint health monitor configuration.

Attributes:

Name Type Description
healthcheck_uri_path str

URI subpath to use to perform service endpoint healthchecks. If not set, the service endpoint URI will be used instead.

use_head_request bool

set to True to use a HEAD request instead of a GET when calling the healthcheck URI.

http_status_code int

HTTP status code to expect in the health check response.

http_timeout int

HTTP health check request timeout in seconds.

ServiceEndpointHealthMonitorConfig

Bases: BaseTypedModel

Generic service health monitor configuration.

Concrete service classes should extend this class and add additional attributes that they want to see reflected and use in the health monitor configuration.

TCPEndpointHealthMonitor

Bases: BaseServiceEndpointHealthMonitor

TCP service endpoint health monitor.

Attributes:

Name Type Description
config TCPEndpointHealthMonitorConfig

health monitor configuration for TCP endpoint

Functions
check_endpoint_status(endpoint: BaseServiceEndpoint) -> Tuple[ServiceState, str]

Run a TCP endpoint healthcheck.

Parameters:

Name Type Description Default
endpoint BaseServiceEndpoint

service endpoint to check.

required

Returns:

Type Description
ServiceState

The operational state of the external TCP endpoint and an

str

optional message describing that state (e.g. an error message,

Tuple[ServiceState, str]

if an error is encountered while checking the TCP endpoint

Tuple[ServiceState, str]

status).

Source code in src/zenml/services/service_monitor.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def check_endpoint_status(
    self, endpoint: "BaseServiceEndpoint"
) -> Tuple[ServiceState, str]:
    """Run a TCP endpoint healthcheck.

    Args:
        endpoint: service endpoint to check.

    Returns:
        The operational state of the external TCP endpoint and an
        optional message describing that state (e.g. an error message,
        if an error is encountered while checking the TCP endpoint
        status).
    """
    if not endpoint.status.port or not endpoint.status.hostname:
        return (
            ServiceState.ERROR,
            "TCP port and hostname values are not known",
        )

    logger.debug(
        "Running TCP healthcheck for TCP port: %d", endpoint.status.port
    )

    if port_is_open(endpoint.status.hostname, endpoint.status.port):
        # the endpoint is healthy
        return ServiceState.ACTIVE, ""

    return (
        ServiceState.ERROR,
        "TCP endpoint healthcheck error: TCP port is not "
        "open or not accessible",
    )
TCPEndpointHealthMonitorConfig

Bases: ServiceEndpointHealthMonitorConfig

TCP service endpoint health monitor configuration.

Functions

service_status

Implementation of the ServiceStatus class.

Classes
ServiceStatus

Bases: BaseTypedModel

Information about the status of a service or process.

This information describes the operational status of an external process or service tracked by ZenML. This could be a process, container, Kubernetes deployment etc.

Concrete service classes should extend this class and add additional attributes that make up the operational state of the service.

Attributes:

Name Type Description
state ServiceState

the current operational state

last_state ServiceState

the operational state prior to the last status update

last_error str

the error encountered during the last status update

Functions
clear_error() -> None

Clear the last error message.

Source code in src/zenml/services/service_status.py
64
65
66
def clear_error(self) -> None:
    """Clear the last error message."""
    self.last_error = ""
update_state(new_state: Optional[ServiceState] = None, error: str = '') -> None

Update the current operational state to reflect a new state value and/or error.

Parameters:

Name Type Description Default
new_state Optional[ServiceState]

new operational state discovered by the last service status update

None
error str

error message describing an operational failure encountered during the last service status update

''
Source code in src/zenml/services/service_status.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def update_state(
    self,
    new_state: Optional[ServiceState] = None,
    error: str = "",
) -> None:
    """Update the current operational state to reflect a new state value and/or error.

    Args:
        new_state: new operational state discovered by the last service
            status update
        error: error message describing an operational failure encountered
            during the last service status update
    """
    if new_state and self.state != new_state:
        self.last_state = self.state
        self.state = new_state
    if error:
        self.last_error = error
Functions