Services
        zenml.services
  
      special
  
    Initialization of the ZenML services module.
A service is a process or set of processes that outlive a pipeline run.
        container
  
      special
  
    Initialization of a containerized ZenML service.
        container_service
    Implementation of a containerized ZenML service.
        
ContainerService            (BaseService)
        
    A service represented by a containerized process.
This class extends the base service class with functionality concerning the life-cycle management and tracking of external services implemented as docker containers.
To define a containerized service, subclass this class and implement the
run method. Upon start, the service will spawn a container that
ends up calling the run method.
For example,
from zenml.services import ServiceType, ContainerService, ContainerServiceConfig
import time
class SleepingServiceConfig(ContainerServiceConfig):
    wake_up_after: int
class SleepingService(ContainerService):
    SERVICE_TYPE = ServiceType(
        name="sleeper",
        description="Sleeping container",
        type="container",
        flavor="sleeping",
    )
    config: SleepingServiceConfig
    def run(self) -> None:
        time.sleep(self.config.wake_up_after)
service = SleepingService(config=SleepingServiceConfig(wake_up_after=10))
service.start()
NOTE: the SleepingService class and its parent module have to be
discoverable as part of a ZenML Integration, otherwise the daemon will
fail with the following error:
TypeError: Cannot load service with unregistered service type:
name='sleeper' type='container' flavor='sleeping' description='Sleeping container'
Attributes:
| Name | Type | Description | 
|---|---|---|
| config | ContainerServiceConfig | service configuration | 
| status | ContainerServiceStatus | service status | 
| endpoint | Optional[zenml.services.container.container_service_endpoint.ContainerServiceEndpoint] | optional service endpoint | 
Source code in zenml/services/container/container_service.py
          class ContainerService(BaseService):
    """A service represented by a containerized process.
    This class extends the base service class with functionality concerning
    the life-cycle management and tracking of external services implemented as
    docker containers.
    To define a containerized service, subclass this class and implement the
    `run` method. Upon `start`, the service will spawn a container that
    ends up calling the `run` method.
    For example,
    ```python
    from zenml.services import ServiceType, ContainerService, ContainerServiceConfig
    import time
    class SleepingServiceConfig(ContainerServiceConfig):
        wake_up_after: int
    class SleepingService(ContainerService):
        SERVICE_TYPE = ServiceType(
            name="sleeper",
            description="Sleeping container",
            type="container",
            flavor="sleeping",
        )
        config: SleepingServiceConfig
        def run(self) -> None:
            time.sleep(self.config.wake_up_after)
    service = SleepingService(config=SleepingServiceConfig(wake_up_after=10))
    service.start()
    ```
    NOTE: the `SleepingService` class and its parent module have to be
    discoverable as part of a ZenML `Integration`, otherwise the daemon will
    fail with the following error:
    ```
    TypeError: Cannot load service with unregistered service type:
    name='sleeper' type='container' flavor='sleeping' description='Sleeping container'
    ```
    Attributes:
        config: service configuration
        status: service status
        endpoint: optional service endpoint
    """
    config: ContainerServiceConfig = Field(
        default_factory=ContainerServiceConfig
    )
    status: ContainerServiceStatus = Field(
        default_factory=ContainerServiceStatus
    )
    # TODO [ENG-705]: allow multiple endpoints per service
    endpoint: Optional[ContainerServiceEndpoint] = None
    _docker_client: Optional[DockerClient] = None
    @property
    def docker_client(self) -> DockerClient:
        """Initialize and/or return the docker client.
        Returns:
            The docker client.
        """
        if self._docker_client is None:
            self._docker_client = (
                docker_utils._try_get_docker_client_from_env()
            )
        return self._docker_client
    @property
    def container_id(self) -> str:
        """Get the ID of the docker container for a service.
        Returns:
            The ID of the docker container for the service.
        """
        return f"zenml-{str(self.uuid)}"
    def get_service_status_message(self) -> str:
        """Get a message about the current operational state of the service.
        Returns:
            A message providing information about the current operational
            state of the service.
        """
        msg = super().get_service_status_message()
        msg += f"  Container ID: `{self.container_id}`\n"
        if self.status.log_file:
            msg += (
                f"For more information on the service status, please see the "
                f"following log file: {self.status.log_file}\n"
            )
        return msg
    def check_status(self) -> Tuple[ServiceState, str]:
        """Check the the current operational state of the docker container.
        Returns:
            The operational state of the docker container and a message
            providing additional information about that state (e.g. a
            description of the error, if one is encountered).
        """
        container: Optional[Container] = None
        try:
            container = self.docker_client.containers.get(self.container_id)
        except docker_errors.NotFound:
            # container doesn't exist yet or was removed
            pass
        if container is None:
            return ServiceState.INACTIVE, "Docker container is not present"
        elif container.status == "running":
            return ServiceState.ACTIVE, "Docker container is running"
        elif container.status == "exited":
            return (
                ServiceState.ERROR,
                "Docker container has exited.",
            )
        else:
            return (
                ServiceState.INACTIVE,
                f"Docker container is {container.status}",
            )
    def _setup_runtime_path(self) -> None:
        """Set up the runtime path for the service.
        This method sets up the runtime path for the service.
        """
        # reuse the config file and logfile location from a previous run,
        # if available
        if not self.status.runtime_path or not os.path.exists(
            self.status.runtime_path
        ):
            if self.config.root_runtime_path:
                if self.config.singleton:
                    self.status.runtime_path = self.config.root_runtime_path
                else:
                    self.status.runtime_path = os.path.join(
                        self.config.root_runtime_path,
                        str(self.uuid),
                    )
                create_dir_recursive_if_not_exists(self.status.runtime_path)
            else:
                self.status.runtime_path = tempfile.mkdtemp(
                    prefix="zenml-service-"
                )
    def _get_container_cmd(self) -> Tuple[List[str], Dict[str, str]]:
        """Get the command to run the service container.
        The default implementation provided by this class is the following:
          * this ContainerService instance and its configuration
          are serialized as JSON and saved to a file
          * the entrypoint.py script is launched as a docker container
          and pointed to the serialized service file
          * the entrypoint script re-creates the ContainerService instance
          from the serialized configuration, then calls the `run`
          method that must be implemented by the subclass
        Subclasses that need a different command to launch the container
        should override this method.
        Returns:
            Command needed to launch the docker container and the environment
            variables to set, in the formats accepted by subprocess.Popen.
        """
        # to avoid circular imports, import here
        from zenml.services.container import entrypoint
        assert self.status.config_file is not None
        assert self.status.log_file is not None
        with open(self.status.config_file, "w") as f:
            f.write(self.model_dump_json(indent=4))
        pathlib.Path(self.status.log_file).touch()
        command = [
            "python",
            "-m",
            entrypoint.__name__,
            "--config-file",
            os.path.join(SERVICE_CONTAINER_PATH, SERVICE_CONFIG_FILE_NAME),
        ]
        command_env = {
            ENV_ZENML_SERVICE_CONTAINER: "true",
        }
        for k, v in os.environ.items():
            if k.startswith("ZENML_"):
                command_env[k] = v
        # the global configuration is mounted into the container at a
        # different location
        command_env[ENV_ZENML_CONFIG_PATH] = (
            SERVICE_CONTAINER_GLOBAL_CONFIG_PATH
        )
        return command, command_env
    def _get_container_volumes(self) -> Dict[str, Dict[str, str]]:
        """Get the volumes to mount into the service container.
        The default implementation provided by this class mounts the
        following directories into the container:
          * the service runtime path
          * the global configuration directory
        Subclasses that need to mount additional volumes should override
        this method.
        Returns:
            A dictionary mapping host paths to dictionaries containing
            the mount options for each volume.
        """
        volumes: Dict[str, Dict[str, str]] = {}
        assert self.status.runtime_path is not None
        volumes[self.status.runtime_path] = {
            "bind": SERVICE_CONTAINER_PATH,
            "mode": "rw",
        }
        volumes[get_global_config_directory()] = {
            "bind": SERVICE_CONTAINER_GLOBAL_CONFIG_PATH,
            "mode": "rw",
        }
        return volumes
    @property
    def container(self) -> Optional[Container]:
        """Get the docker container for the service.
        Returns:
            The docker container for the service, or None if the container
            does not exist.
        """
        try:
            return self.docker_client.containers.get(self.container_id)
        except docker_errors.NotFound:
            # container doesn't exist yet or was removed
            return None
    def _start_container(self) -> None:
        """Start the service docker container associated with this service."""
        container = self.container
        if container:
            # the container exists, check if it is running
            if container.status == "running":
                logger.debug(
                    "Container for service '%s' is already running",
                    self,
                )
                return
            # the container is stopped or in an error state, remove it
            logger.debug(
                "Removing previous container for service '%s'",
                self,
            )
            container.remove(force=True)
        logger.debug("Starting container for service '%s'...", self)
        try:
            self.docker_client.images.get(self.config.image)
        except docker_errors.ImageNotFound:
            logger.debug(
                "Pulling container image '%s' for service '%s'...",
                self.config.image,
                self,
            )
            self.docker_client.images.pull(self.config.image)
        self._setup_runtime_path()
        ports: Dict[int, Optional[int]] = {}
        if self.endpoint:
            self.endpoint.prepare_for_start()
            if self.endpoint.status.port:
                ports[self.endpoint.status.port] = self.endpoint.status.port
        command, env = self._get_container_cmd()
        volumes = self._get_container_volumes()
        try:
            uid_args: Dict[str, Any] = {}
            if sys.platform == "win32":
                # File permissions are not checked on Windows. This if clause
                # prevents mypy from complaining about unused 'type: ignore'
                # statements
                pass
            else:
                # Run the container in the context of the local UID/GID
                # to ensure that the local database can be shared
                # with the container.
                logger.debug(
                    "Setting UID and GID to local user/group " "in container."
                )
                uid_args = dict(
                    user=os.getuid(),
                    group_add=[os.getgid()],
                )
            container = self.docker_client.containers.run(
                name=self.container_id,
                image=self.config.image,
                entrypoint=command,
                detach=True,
                volumes=volumes,
                environment=env,
                remove=False,
                auto_remove=False,
                ports=ports,
                labels={
                    "zenml-service-uuid": str(self.uuid),
                },
                working_dir=SERVICE_CONTAINER_PATH,
                extra_hosts={"host.docker.internal": "host-gateway"},
                **uid_args,
            )
            logger.debug(
                "Docker container for service '%s' started with ID: %s",
                self,
                self.container_id,
            )
        except docker_errors.DockerException as e:
            logger.error(
                "Docker container for service '%s' failed to start: %s",
                self,
                e,
            )
    def _stop_daemon(self, force: bool = False) -> None:
        """Stop the service docker container associated with this service.
        Args:
            force: if True, the service container will be forcefully stopped
        """
        container = self.container
        if not container:
            # service container is not running
            logger.debug(
                "Docker container for service '%s' no longer running",
                self,
            )
            return
        logger.debug("Stopping container for service '%s' ...", self)
        if force:
            container.kill()
            container.remove(force=True)
        else:
            container.stop()
            container.remove()
    def provision(self) -> None:
        """Provision the service."""
        self._start_container()
    def deprovision(self, force: bool = False) -> None:
        """Deprovision the service.
        Args:
            force: if True, the service container will be forcefully stopped
        """
        self._stop_daemon(force)
    def get_logs(
        self, follow: bool = False, tail: Optional[int] = None
    ) -> Generator[str, bool, None]:
        """Retrieve the service logs.
        Args:
            follow: if True, the logs will be streamed as they are written
            tail: only retrieve the last NUM lines of log output.
        Yields:
            A generator that can be accessed to get the service logs.
        """
        if not self.status.log_file or not os.path.exists(
            self.status.log_file
        ):
            return
        with open(self.status.log_file, "r") as f:
            if tail:
                # TODO[ENG-864]: implement a more efficient tailing mechanism that
                #   doesn't read the entire file
                lines = f.readlines()[-tail:]
                for line in lines:
                    yield line.rstrip("\n")
                if not follow:
                    return
            line = ""
            while True:
                partial_line = f.readline()
                if partial_line:
                    line += partial_line
                    if line.endswith("\n"):
                        stop = yield line.rstrip("\n")
                        if stop:
                            break
                        line = ""
                elif follow:
                    time.sleep(1)
                else:
                    break
    @abstractmethod
    def run(self) -> None:
        """Run the containerized service logic associated with this service.
        Subclasses must implement this method to provide the containerized
        service functionality. This method will be executed in the context of
        the running container, not in the context of the process that calls the
        `start` method.
        """
container: Optional[docker.models.containers.Container]
  
      property
      readonly
  
    Get the docker container for the service.
Returns:
| Type | Description | 
|---|---|
| Optional[docker.models.containers.Container] | The docker container for the service, or None if the container does not exist. | 
container_id: str
  
      property
      readonly
  
    Get the ID of the docker container for a service.
Returns:
| Type | Description | 
|---|---|
| str | The ID of the docker container for the service. | 
docker_client: DockerClient
  
      property
      readonly
  
    Initialize and/or return the docker client.
Returns:
| Type | Description | 
|---|---|
| DockerClient | The docker client. | 
check_status(self)
    Check the the current operational state of the docker container.
Returns:
| Type | Description | 
|---|---|
| Tuple[zenml.services.service_status.ServiceState, str] | The operational state of the docker container and a message providing additional information about that state (e.g. a description of the error, if one is encountered). | 
Source code in zenml/services/container/container_service.py
          def check_status(self) -> Tuple[ServiceState, str]:
    """Check the the current operational state of the docker container.
    Returns:
        The operational state of the docker container and a message
        providing additional information about that state (e.g. a
        description of the error, if one is encountered).
    """
    container: Optional[Container] = None
    try:
        container = self.docker_client.containers.get(self.container_id)
    except docker_errors.NotFound:
        # container doesn't exist yet or was removed
        pass
    if container is None:
        return ServiceState.INACTIVE, "Docker container is not present"
    elif container.status == "running":
        return ServiceState.ACTIVE, "Docker container is running"
    elif container.status == "exited":
        return (
            ServiceState.ERROR,
            "Docker container has exited.",
        )
    else:
        return (
            ServiceState.INACTIVE,
            f"Docker container is {container.status}",
        )
deprovision(self, force=False)
    Deprovision the service.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| force | bool | if True, the service container will be forcefully stopped | False | 
Source code in zenml/services/container/container_service.py
          def deprovision(self, force: bool = False) -> None:
    """Deprovision the service.
    Args:
        force: if True, the service container will be forcefully stopped
    """
    self._stop_daemon(force)
get_logs(self, follow=False, tail=None)
    Retrieve the service logs.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| follow | bool | if True, the logs will be streamed as they are written | False | 
| tail | Optional[int] | only retrieve the last NUM lines of log output. | None | 
Yields:
| Type | Description | 
|---|---|
| Generator[str, bool, NoneType] | A generator that can be accessed to get the service logs. | 
Source code in zenml/services/container/container_service.py
          def get_logs(
    self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
    """Retrieve the service logs.
    Args:
        follow: if True, the logs will be streamed as they are written
        tail: only retrieve the last NUM lines of log output.
    Yields:
        A generator that can be accessed to get the service logs.
    """
    if not self.status.log_file or not os.path.exists(
        self.status.log_file
    ):
        return
    with open(self.status.log_file, "r") as f:
        if tail:
            # TODO[ENG-864]: implement a more efficient tailing mechanism that
            #   doesn't read the entire file
            lines = f.readlines()[-tail:]
            for line in lines:
                yield line.rstrip("\n")
            if not follow:
                return
        line = ""
        while True:
            partial_line = f.readline()
            if partial_line:
                line += partial_line
                if line.endswith("\n"):
                    stop = yield line.rstrip("\n")
                    if stop:
                        break
                    line = ""
            elif follow:
                time.sleep(1)
            else:
                break
get_service_status_message(self)
    Get a message about the current operational state of the service.
Returns:
| Type | Description | 
|---|---|
| str | A message providing information about the current operational state of the service. | 
Source code in zenml/services/container/container_service.py
          def get_service_status_message(self) -> str:
    """Get a message about the current operational state of the service.
    Returns:
        A message providing information about the current operational
        state of the service.
    """
    msg = super().get_service_status_message()
    msg += f"  Container ID: `{self.container_id}`\n"
    if self.status.log_file:
        msg += (
            f"For more information on the service status, please see the "
            f"following log file: {self.status.log_file}\n"
        )
    return msg
model_post_init(/, self, context)
    This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| self | BaseModel | The BaseModel instance. | required | 
| context | Any | The context. | required | 
Source code in zenml/services/container/container_service.py
          def init_private_attributes(self: BaseModel, context: Any, /) -> None:
    """This function is meant to behave like a BaseModel method to initialise private attributes.
    It takes context as an argument since that's what pydantic-core passes when calling it.
    Args:
        self: The BaseModel instance.
        context: The context.
    """
    if getattr(self, '__pydantic_private__', None) is None:
        pydantic_private = {}
        for name, private_attr in self.__private_attributes__.items():
            default = private_attr.get_default()
            if default is not PydanticUndefined:
                pydantic_private[name] = default
        object_setattr(self, '__pydantic_private__', pydantic_private)
provision(self)
    Provision the service.
Source code in zenml/services/container/container_service.py
          def provision(self) -> None:
    """Provision the service."""
    self._start_container()
run(self)
    Run the containerized service logic associated with this service.
Subclasses must implement this method to provide the containerized
service functionality. This method will be executed in the context of
the running container, not in the context of the process that calls the
start method.
Source code in zenml/services/container/container_service.py
          @abstractmethod
def run(self) -> None:
    """Run the containerized service logic associated with this service.
    Subclasses must implement this method to provide the containerized
    service functionality. This method will be executed in the context of
    the running container, not in the context of the process that calls the
    `start` method.
    """
        
ContainerServiceConfig            (ServiceConfig)
        
    containerized service configuration.
Attributes:
| Name | Type | Description | 
|---|---|---|
| root_runtime_path | Optional[str] | the root path where the service stores its files. | 
| singleton | bool | set to True to store the service files directly in the
 | 
| image | str | the container image to use for the service. | 
Source code in zenml/services/container/container_service.py
          class ContainerServiceConfig(ServiceConfig):
    """containerized service configuration.
    Attributes:
        root_runtime_path: the root path where the service stores its files.
        singleton: set to True to store the service files directly in the
            `root_runtime_path` directory instead of creating a subdirectory for
            each service instance. Only has effect if the `root_runtime_path` is
            also set.
        image: the container image to use for the service.
    """
    root_runtime_path: Optional[str] = None
    singleton: bool = False
    image: str = DOCKER_ZENML_SERVER_DEFAULT_IMAGE
        
ContainerServiceStatus            (ServiceStatus)
        
    containerized service status.
Attributes:
| Name | Type | Description | 
|---|---|---|
| runtime_path | Optional[str] | the path where the service files (e.g. the configuration file used to start the service daemon and the logfile) are located | 
Source code in zenml/services/container/container_service.py
          class ContainerServiceStatus(ServiceStatus):
    """containerized service status.
    Attributes:
        runtime_path: the path where the service files (e.g. the configuration
            file used to start the service daemon and the logfile) are located
    """
    runtime_path: Optional[str] = None
    @property
    def config_file(self) -> Optional[str]:
        """Get the path to the service configuration file.
        Returns:
            The path to the configuration file, or None, if the
            service has never been started before.
        """
        if not self.runtime_path:
            return None
        return os.path.join(self.runtime_path, SERVICE_CONFIG_FILE_NAME)
    @property
    def log_file(self) -> Optional[str]:
        """Get the path to the log file where the service output is/has been logged.
        Returns:
            The path to the log file, or None, if the service has never been
            started before.
        """
        if not self.runtime_path:
            return None
        return os.path.join(self.runtime_path, SERVICE_LOG_FILE_NAME)
config_file: Optional[str]
  
      property
      readonly
  
    Get the path to the service configuration file.
Returns:
| Type | Description | 
|---|---|
| Optional[str] | The path to the configuration file, or None, if the service has never been started before. | 
log_file: Optional[str]
  
      property
      readonly
  
    Get the path to the log file where the service output is/has been logged.
Returns:
| Type | Description | 
|---|---|
| Optional[str] | The path to the log file, or None, if the service has never been started before. | 
        container_service_endpoint
    Implementation of a containerized service endpoint.
        
ContainerServiceEndpoint            (BaseServiceEndpoint)
        
    A service endpoint exposed by a containerized process.
This class extends the base service endpoint class with functionality concerning the life-cycle management and tracking of endpoints exposed by external services implemented as containerized processes.
Attributes:
| Name | Type | Description | 
|---|---|---|
| config | ContainerServiceEndpointConfig | service endpoint configuration | 
| status | ContainerServiceEndpointStatus | service endpoint status | 
| monitor | Union[zenml.services.service_monitor.HTTPEndpointHealthMonitor, zenml.services.service_monitor.TCPEndpointHealthMonitor] | optional service endpoint health monitor | 
Source code in zenml/services/container/container_service_endpoint.py
          class ContainerServiceEndpoint(BaseServiceEndpoint):
    """A service endpoint exposed by a containerized process.
    This class extends the base service endpoint class with functionality
    concerning the life-cycle management and tracking of endpoints exposed
    by external services implemented as containerized processes.
    Attributes:
        config: service endpoint configuration
        status: service endpoint status
        monitor: optional service endpoint health monitor
    """
    config: ContainerServiceEndpointConfig = Field(
        default_factory=ContainerServiceEndpointConfig
    )
    status: ContainerServiceEndpointStatus = Field(
        default_factory=ContainerServiceEndpointStatus
    )
    monitor: Optional[
        Union[HTTPEndpointHealthMonitor, TCPEndpointHealthMonitor]
    ] = Field(..., discriminator="type", union_mode="left_to_right")
    def _lookup_free_port(self) -> int:
        """Search for a free TCP port for the service endpoint.
        If a preferred TCP port value is explicitly requested through the
        endpoint configuration, it will be checked first. If a port was
        previously used the last time the service was running (i.e. as
        indicated in the service endpoint status), it will be checked next for
        availability.
        As a last resort, this call will search for a free TCP port, if
        `allocate_port` is set to True in the endpoint configuration.
        Returns:
            An available TCP port number
        Raises:
            IOError: if the preferred TCP port is busy and `allocate_port` is
                disabled in the endpoint configuration, or if no free TCP port
                could be otherwise allocated.
        """
        # If a port value is explicitly configured, attempt to use it first
        if self.config.port:
            if port_available(self.config.port):
                return self.config.port
            if not self.config.allocate_port:
                raise IOError(f"TCP port {self.config.port} is not available.")
        # Attempt to reuse the port used when the services was last running
        if self.status.port and port_available(self.status.port):
            return self.status.port
        port = scan_for_available_port()
        if port:
            return port
        raise IOError("No free TCP ports found")
    def prepare_for_start(self) -> None:
        """Prepare the service endpoint for starting.
        This method is called before the service is started.
        """
        self.status.protocol = self.config.protocol
        self.status.port = self._lookup_free_port()
        # Container endpoints are always exposed on the local host
        self.status.hostname = DEFAULT_LOCAL_SERVICE_IP_ADDRESS
prepare_for_start(self)
    Prepare the service endpoint for starting.
This method is called before the service is started.
Source code in zenml/services/container/container_service_endpoint.py
          def prepare_for_start(self) -> None:
    """Prepare the service endpoint for starting.
    This method is called before the service is started.
    """
    self.status.protocol = self.config.protocol
    self.status.port = self._lookup_free_port()
    # Container endpoints are always exposed on the local host
    self.status.hostname = DEFAULT_LOCAL_SERVICE_IP_ADDRESS
        
ContainerServiceEndpointConfig            (ServiceEndpointConfig)
        
    Local daemon service endpoint configuration.
Attributes:
| Name | Type | Description | 
|---|---|---|
| protocol | ServiceEndpointProtocol | the TCP protocol implemented by the service endpoint | 
| port | Optional[int] | preferred TCP port value for the service endpoint. If the port
is in use when the service is started, setting  | 
| allocate_port | bool | set to True to allocate a free TCP port for the service endpoint automatically. | 
Source code in zenml/services/container/container_service_endpoint.py
          class ContainerServiceEndpointConfig(ServiceEndpointConfig):
    """Local daemon service endpoint configuration.
    Attributes:
        protocol: the TCP protocol implemented by the service endpoint
        port: preferred TCP port value for the service endpoint. If the port
            is in use when the service is started, setting `allocate_port` to
            True will also try to allocate a new port value, otherwise an
            exception will be raised.
        allocate_port: set to True to allocate a free TCP port for the
            service endpoint automatically.
    """
    protocol: ServiceEndpointProtocol = ServiceEndpointProtocol.TCP
    port: Optional[int] = None
    allocate_port: bool = True
        
ContainerServiceEndpointStatus            (ServiceEndpointStatus)
        
    Local daemon service endpoint status.
Source code in zenml/services/container/container_service_endpoint.py
          class ContainerServiceEndpointStatus(ServiceEndpointStatus):
    """Local daemon service endpoint status."""
        entrypoint
    Implementation of a containerized service entrypoint.
This executable file is utilized as an entrypoint for all ZenML services that are implemented as locally running docker containers.
launch_service(service_config_file)
    Instantiate and launch a ZenML local service from its configuration file.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| service_config_file | str | the path to the service configuration file. | required | 
Exceptions:
| Type | Description | 
|---|---|
| TypeError | if the service configuration file is the wrong type. | 
Source code in zenml/services/container/entrypoint.py
          def launch_service(service_config_file: str) -> None:
    """Instantiate and launch a ZenML local service from its configuration file.
    Args:
        service_config_file: the path to the service configuration file.
    Raises:
        TypeError: if the service configuration file is the wrong type.
    """
    # doing zenml imports here to avoid polluting the stdout/stderr
    # with messages before daemonization is complete
    from zenml.integrations.registry import integration_registry
    from zenml.logger import get_logger
    from zenml.services import ContainerService
    logger = get_logger(__name__)
    logger.info("Loading service configuration from %s", service_config_file)
    with open(service_config_file, "r") as f:
        config = f.read()
    integration_registry.activate_integrations()
    logger.debug(
        "Running containerized service with configuration:\n %s", config
    )
    service = cast("ContainerService", ContainerService.from_json(config))
    if not isinstance(service, ContainerService):
        raise TypeError(
            f"Expected service type ContainerService but got "
            f"{type(service)} instead"
        )
    service.run()
        local
  
      special
  
    Initialization of a local ZenML service.
        local_daemon_entrypoint
    Implementation of a local daemon entrypoint.
This executable file is utilized as an entrypoint for all ZenML services that are implemented as locally running daemon processes.
        local_service
    Implementation of a local ZenML service.
        
LocalDaemonService            (BaseService)
        
    A service represented by a local daemon process.
This class extends the base service class with functionality concerning the life-cycle management and tracking of external services implemented as local daemon processes.
To define a local daemon service, subclass this class and implement the
run method. Upon start, the service will spawn a daemon process that
ends up calling the run method.
For example,
from zenml.services import ServiceType, LocalDaemonService, LocalDaemonServiceConfig
import time
class SleepingDaemonConfig(LocalDaemonServiceConfig):
    wake_up_after: int
class SleepingDaemon(LocalDaemonService):
    SERVICE_TYPE = ServiceType(
        name="sleeper",
        description="Sleeping daemon",
        type="daemon",
        flavor="sleeping",
    )
    config: SleepingDaemonConfig
    def run(self) -> None:
        time.sleep(self.config.wake_up_after)
daemon = SleepingDaemon(config=SleepingDaemonConfig(wake_up_after=10))
daemon.start()
NOTE: the SleepingDaemon class and its parent module have to be
discoverable as part of a ZenML Integration, otherwise the daemon will
fail with the following error:
TypeError: Cannot load service with unregistered service type:
name='sleeper' type='daemon' flavor='sleeping' description='Sleeping daemon'
Attributes:
| Name | Type | Description | 
|---|---|---|
| config | LocalDaemonServiceConfig | service configuration | 
| status | LocalDaemonServiceStatus | service status | 
| endpoint | Optional[zenml.services.local.local_service_endpoint.LocalDaemonServiceEndpoint] | optional service endpoint | 
Source code in zenml/services/local/local_service.py
          class LocalDaemonService(BaseService):
    """A service represented by a local daemon process.
    This class extends the base service class with functionality concerning
    the life-cycle management and tracking of external services implemented as
    local daemon processes.
    To define a local daemon service, subclass this class and implement the
    `run` method. Upon `start`, the service will spawn a daemon process that
    ends up calling the `run` method.
    For example,
    ```python
    from zenml.services import ServiceType, LocalDaemonService, LocalDaemonServiceConfig
    import time
    class SleepingDaemonConfig(LocalDaemonServiceConfig):
        wake_up_after: int
    class SleepingDaemon(LocalDaemonService):
        SERVICE_TYPE = ServiceType(
            name="sleeper",
            description="Sleeping daemon",
            type="daemon",
            flavor="sleeping",
        )
        config: SleepingDaemonConfig
        def run(self) -> None:
            time.sleep(self.config.wake_up_after)
    daemon = SleepingDaemon(config=SleepingDaemonConfig(wake_up_after=10))
    daemon.start()
    ```
    NOTE: the `SleepingDaemon` class and its parent module have to be
    discoverable as part of a ZenML `Integration`, otherwise the daemon will
    fail with the following error:
    ```
    TypeError: Cannot load service with unregistered service type:
    name='sleeper' type='daemon' flavor='sleeping' description='Sleeping daemon'
    ```
    Attributes:
        config: service configuration
        status: service status
        endpoint: optional service endpoint
    """
    config: LocalDaemonServiceConfig = Field(
        default_factory=LocalDaemonServiceConfig
    )
    status: LocalDaemonServiceStatus = Field(
        default_factory=LocalDaemonServiceStatus
    )
    # TODO [ENG-705]: allow multiple endpoints per service
    endpoint: Optional[LocalDaemonServiceEndpoint] = None
    def get_service_status_message(self) -> str:
        """Get a message about the current operational state of the service.
        Returns:
            A message providing information about the current operational
            state of the service.
        """
        msg = super().get_service_status_message()
        pid = self.status.pid
        if pid:
            msg += f"  Daemon PID: `{self.status.pid}`\n"
        if self.status.log_file:
            msg += (
                f"For more information on the service status, please see the "
                f"following log file: {self.status.log_file}\n"
            )
        return msg
    def check_status(self) -> Tuple[ServiceState, str]:
        """Check the the current operational state of the daemon process.
        Returns:
            The operational state of the daemon process and a message
            providing additional information about that state (e.g. a
            description of the error, if one is encountered).
        """
        if not self.status.pid:
            return ServiceState.INACTIVE, "service daemon is not running"
        # the daemon is running
        return ServiceState.ACTIVE, ""
    def _get_daemon_cmd(self) -> Tuple[List[str], Dict[str, str]]:
        """Get the command to run the service daemon.
        The default implementation provided by this class is the following:
          * this LocalDaemonService instance and its configuration
          are serialized as JSON and saved to a temporary file
          * the local_daemon_entrypoint.py script is launched as a subprocess
          and pointed to the serialized service file
          * the entrypoint script re-creates the LocalDaemonService instance
          from the serialized configuration, reconfigures itself as a daemon
          and detaches itself from the parent process, then calls the `run`
          method that must be implemented by the subclass
        Subclasses that need a different command to launch the service daemon
        should override this method.
        Returns:
            Command needed to launch the daemon process and the environment
            variables to set, in the formats accepted by subprocess.Popen.
        """
        # to avoid circular imports, import here
        import zenml.services.local.local_daemon_entrypoint as daemon_entrypoint
        self.status.silent_daemon = self.config.silent_daemon
        # reuse the config file and logfile location from a previous run,
        # if available
        if not self.status.runtime_path or not os.path.exists(
            self.status.runtime_path
        ):
            if self.config.root_runtime_path:
                if self.config.singleton:
                    self.status.runtime_path = self.config.root_runtime_path
                else:
                    self.status.runtime_path = os.path.join(
                        self.config.root_runtime_path,
                        str(self.uuid),
                    )
                create_dir_recursive_if_not_exists(self.status.runtime_path)
            else:
                self.status.runtime_path = tempfile.mkdtemp(
                    prefix="zenml-service-"
                )
        assert self.status.config_file is not None
        assert self.status.pid_file is not None
        with open(self.status.config_file, "w") as f:
            f.write(self.model_dump_json(indent=4))
        # delete the previous PID file, in case a previous daemon process
        # crashed and left a stale PID file
        if os.path.exists(self.status.pid_file):
            os.remove(self.status.pid_file)
        command = [
            sys.executable,
            "-m",
            daemon_entrypoint.__name__,
            "--config-file",
            self.status.config_file,
            "--pid-file",
            self.status.pid_file,
        ]
        if self.status.log_file:
            pathlib.Path(self.status.log_file).touch()
            command += ["--log-file", self.status.log_file]
        command_env = os.environ.copy()
        return command, command_env
    def _start_daemon(self) -> None:
        """Start the service daemon process associated with this service."""
        pid = self.status.pid
        if pid:
            # service daemon is already running
            logger.debug(
                "Daemon process for service '%s' is already running with PID %d",
                self,
                pid,
            )
            return
        logger.debug("Starting daemon for service '%s'...", self)
        if self.endpoint:
            self.endpoint.prepare_for_start()
        command, command_env = self._get_daemon_cmd()
        logger.debug(
            "Running command to start daemon for service '%s': %s",
            self,
            " ".join(command),
        )
        p = subprocess.Popen(command, env=command_env)
        p.wait()
        pid = self.status.pid
        if pid:
            logger.debug(
                "Daemon process for service '%s' started with PID: %d",
                self,
                pid,
            )
        else:
            logger.error(
                "Daemon process for service '%s' failed to start.",
                self,
            )
    def _stop_daemon(self, force: bool = False) -> None:
        """Stop the service daemon process associated with this service.
        Args:
            force: if True, the service daemon will be forcefully stopped
        """
        pid = self.status.pid
        if not pid:
            # service daemon is not running
            logger.debug(
                "Daemon process for service '%s' no longer running",
                self,
            )
            return
        logger.debug("Stopping daemon for service '%s' ...", self)
        try:
            p = psutil.Process(pid)
        except psutil.Error:
            logger.error(
                "Could not find process for for service '%s' ...", self
            )
            return
        if force:
            p.kill()
        else:
            p.terminate()
    def provision(self) -> None:
        """Provision the service."""
        self._start_daemon()
    def deprovision(self, force: bool = False) -> None:
        """Deprovision the service.
        Args:
            force: if True, the service daemon will be forcefully stopped
        """
        self._stop_daemon(force)
    def start(self, timeout: int = 0) -> None:
        """Start the service and optionally wait for it to become active.
        Args:
            timeout: amount of time to wait for the service to become active.
                If set to 0, the method will return immediately after checking
                the service status.
        """
        if not self.config.blocking:
            super().start(timeout)
        else:
            self.run()
    def get_logs(
        self, follow: bool = False, tail: Optional[int] = None
    ) -> Generator[str, bool, None]:
        """Retrieve the service logs.
        Args:
            follow: if True, the logs will be streamed as they are written
            tail: only retrieve the last NUM lines of log output.
        Yields:
            A generator that can be accessed to get the service logs.
        """
        if not self.status.log_file or not os.path.exists(
            self.status.log_file
        ):
            return
        with open(self.status.log_file, "r") as f:
            if tail:
                # TODO[ENG-864]: implement a more efficient tailing mechanism that
                #   doesn't read the entire file
                lines = f.readlines()[-tail:]
                for line in lines:
                    yield line.rstrip("\n")
                if not follow:
                    return
            line = ""
            while True:
                partial_line = f.readline()
                if partial_line:
                    line += partial_line
                    if line.endswith("\n"):
                        stop = yield line.rstrip("\n")
                        if stop:
                            break
                        line = ""
                elif follow:
                    time.sleep(1)
                else:
                    break
    @abstractmethod
    def run(self) -> None:
        """Run the service daemon process associated with this service.
        Subclasses must implement this method to provide the service daemon
        functionality. This method will be executed in the context of the
        running daemon, not in the context of the process that calls the
        `start` method.
        """
check_status(self)
    Check the the current operational state of the daemon process.
Returns:
| Type | Description | 
|---|---|
| Tuple[zenml.services.service_status.ServiceState, str] | The operational state of the daemon process and a message providing additional information about that state (e.g. a description of the error, if one is encountered). | 
Source code in zenml/services/local/local_service.py
          def check_status(self) -> Tuple[ServiceState, str]:
    """Check the the current operational state of the daemon process.
    Returns:
        The operational state of the daemon process and a message
        providing additional information about that state (e.g. a
        description of the error, if one is encountered).
    """
    if not self.status.pid:
        return ServiceState.INACTIVE, "service daemon is not running"
    # the daemon is running
    return ServiceState.ACTIVE, ""
deprovision(self, force=False)
    Deprovision the service.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| force | bool | if True, the service daemon will be forcefully stopped | False | 
Source code in zenml/services/local/local_service.py
          def deprovision(self, force: bool = False) -> None:
    """Deprovision the service.
    Args:
        force: if True, the service daemon will be forcefully stopped
    """
    self._stop_daemon(force)
get_logs(self, follow=False, tail=None)
    Retrieve the service logs.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| follow | bool | if True, the logs will be streamed as they are written | False | 
| tail | Optional[int] | only retrieve the last NUM lines of log output. | None | 
Yields:
| Type | Description | 
|---|---|
| Generator[str, bool, NoneType] | A generator that can be accessed to get the service logs. | 
Source code in zenml/services/local/local_service.py
          def get_logs(
    self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
    """Retrieve the service logs.
    Args:
        follow: if True, the logs will be streamed as they are written
        tail: only retrieve the last NUM lines of log output.
    Yields:
        A generator that can be accessed to get the service logs.
    """
    if not self.status.log_file or not os.path.exists(
        self.status.log_file
    ):
        return
    with open(self.status.log_file, "r") as f:
        if tail:
            # TODO[ENG-864]: implement a more efficient tailing mechanism that
            #   doesn't read the entire file
            lines = f.readlines()[-tail:]
            for line in lines:
                yield line.rstrip("\n")
            if not follow:
                return
        line = ""
        while True:
            partial_line = f.readline()
            if partial_line:
                line += partial_line
                if line.endswith("\n"):
                    stop = yield line.rstrip("\n")
                    if stop:
                        break
                    line = ""
            elif follow:
                time.sleep(1)
            else:
                break
get_service_status_message(self)
    Get a message about the current operational state of the service.
Returns:
| Type | Description | 
|---|---|
| str | A message providing information about the current operational state of the service. | 
Source code in zenml/services/local/local_service.py
          def get_service_status_message(self) -> str:
    """Get a message about the current operational state of the service.
    Returns:
        A message providing information about the current operational
        state of the service.
    """
    msg = super().get_service_status_message()
    pid = self.status.pid
    if pid:
        msg += f"  Daemon PID: `{self.status.pid}`\n"
    if self.status.log_file:
        msg += (
            f"For more information on the service status, please see the "
            f"following log file: {self.status.log_file}\n"
        )
    return msg
provision(self)
    Provision the service.
Source code in zenml/services/local/local_service.py
          def provision(self) -> None:
    """Provision the service."""
    self._start_daemon()
run(self)
    Run the service daemon process associated with this service.
Subclasses must implement this method to provide the service daemon
functionality. This method will be executed in the context of the
running daemon, not in the context of the process that calls the
start method.
Source code in zenml/services/local/local_service.py
          @abstractmethod
def run(self) -> None:
    """Run the service daemon process associated with this service.
    Subclasses must implement this method to provide the service daemon
    functionality. This method will be executed in the context of the
    running daemon, not in the context of the process that calls the
    `start` method.
    """
start(self, timeout=0)
    Start the service and optionally wait for it to become active.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| timeout | int | amount of time to wait for the service to become active. If set to 0, the method will return immediately after checking the service status. | 0 | 
Source code in zenml/services/local/local_service.py
          def start(self, timeout: int = 0) -> None:
    """Start the service and optionally wait for it to become active.
    Args:
        timeout: amount of time to wait for the service to become active.
            If set to 0, the method will return immediately after checking
            the service status.
    """
    if not self.config.blocking:
        super().start(timeout)
    else:
        self.run()
        
LocalDaemonServiceConfig            (ServiceConfig)
        
    Local daemon service configuration.
Attributes:
| Name | Type | Description | 
|---|---|---|
| silent_daemon | bool | set to True to suppress the output of the daemon (i.e. redirect stdout and stderr to /dev/null). If False, the daemon output will be redirected to a logfile. | 
| root_runtime_path | Optional[str] | the root path where the service daemon will store service configuration files | 
| singleton | bool | set to True to store the service daemon configuration files
directly in the  | 
| blocking | bool | set to True to run the service the context of the current process and block until the service is stopped instead of running the service as a daemon process. Useful for operating systems that do not support daemon processes. | 
Source code in zenml/services/local/local_service.py
          class LocalDaemonServiceConfig(ServiceConfig):
    """Local daemon service configuration.
    Attributes:
        silent_daemon: set to True to suppress the output of the daemon
            (i.e. redirect stdout and stderr to /dev/null). If False, the
            daemon output will be redirected to a logfile.
        root_runtime_path: the root path where the service daemon will store
            service configuration files
        singleton: set to True to store the service daemon configuration files
            directly in the `root_runtime_path` directory instead of creating
            a subdirectory for each service instance. Only has effect if the
            `root_runtime_path` is also set.
        blocking: set to True to run the service the context of the current
            process and block until the service is stopped instead of running
            the service as a daemon process. Useful for operating systems
            that do not support daemon processes.
    """
    silent_daemon: bool = False
    root_runtime_path: Optional[str] = None
    singleton: bool = False
    blocking: bool = False
        
LocalDaemonServiceStatus            (ServiceStatus)
        
    Local daemon service status.
Attributes:
| Name | Type | Description | 
|---|---|---|
| runtime_path | Optional[str] | the path where the service daemon runtime files (the configuration file used to start the service daemon and the logfile) are located | 
| silent_daemon | bool | flag indicating whether the output of the daemon is suppressed (redirected to /dev/null). | 
Source code in zenml/services/local/local_service.py
          class LocalDaemonServiceStatus(ServiceStatus):
    """Local daemon service status.
    Attributes:
        runtime_path: the path where the service daemon runtime files (the
            configuration file used to start the service daemon and the
            logfile) are located
        silent_daemon: flag indicating whether the output of the daemon
            is suppressed (redirected to /dev/null).
    """
    runtime_path: Optional[str] = None
    # TODO [ENG-704]: remove field duplication between XServiceStatus and
    #   XServiceConfig (e.g. keep a private reference to the config in the
    #   status)
    silent_daemon: bool = False
    @property
    def config_file(self) -> Optional[str]:
        """Get the path to the configuration file used to start the service daemon.
        Returns:
            The path to the configuration file, or None, if the
            service has never been started before.
        """
        if not self.runtime_path:
            return None
        return os.path.join(self.runtime_path, SERVICE_DAEMON_CONFIG_FILE_NAME)
    @property
    def log_file(self) -> Optional[str]:
        """Get the path to the log file where the service output is/has been logged.
        Returns:
            The path to the log file, or None, if the service has never been
            started before, or if the service daemon output is suppressed.
        """
        if not self.runtime_path or self.silent_daemon:
            return None
        return os.path.join(self.runtime_path, SERVICE_DAEMON_LOG_FILE_NAME)
    @property
    def pid_file(self) -> Optional[str]:
        """Get the path to a daemon PID file.
        This is where the last known PID of the daemon process is stored.
        Returns:
            The path to the PID file, or None, if the service has never been
            started before.
        """
        if not self.runtime_path or self.silent_daemon:
            return None
        return os.path.join(self.runtime_path, SERVICE_DAEMON_PID_FILE_NAME)
    @property
    def pid(self) -> Optional[int]:
        """Return the PID of the currently running daemon.
        Returns:
            The PID of the daemon, or None, if the service has never been
            started before.
        """
        pid_file = self.pid_file
        if not pid_file:
            return None
        if sys.platform == "win32":
            logger.warning(
                "Daemon functionality is currently not supported on Windows."
            )
            return None
        else:
            import zenml.services.local.local_daemon_entrypoint as daemon_entrypoint
            from zenml.utils.daemon import get_daemon_pid_if_running
            logger.debug(f"Checking PID file {pid_file}.")
            pid = get_daemon_pid_if_running(pid_file)
            if not pid:
                logger.debug(
                    f"Process with PID file {pid_file} is no longer running."
                )
                return None
            # let's be extra careful here and check that the PID really
            # belongs to a process that is a local ZenML daemon.
            # this avoids the situation where a PID file is left over from
            # a previous daemon run, but another process is using the same
            # PID.
            try:
                p = psutil.Process(pid)
                cmd_line = p.cmdline()
                # Empty cmd_line implies no process
                if not cmd_line:
                    logger.debug(f"Process with PID {pid} not found!")
                    return None
                config_file = self.config_file
                if (
                    config_file is not None
                    and (
                        daemon_entrypoint.__name__ not in cmd_line
                        or config_file not in cmd_line
                    )
                    and (
                        daemon_entrypoint.__name__ not in cmd_line[0]
                        or config_file not in cmd_line[0]
                    )
                ):
                    logger.warning(
                        f"Process with PID {pid} is not a ZenML local daemon "
                        f"service."
                    )
                return pid
            except NoSuchProcess:
                return None
config_file: Optional[str]
  
      property
      readonly
  
    Get the path to the configuration file used to start the service daemon.
Returns:
| Type | Description | 
|---|---|
| Optional[str] | The path to the configuration file, or None, if the service has never been started before. | 
log_file: Optional[str]
  
      property
      readonly
  
    Get the path to the log file where the service output is/has been logged.
Returns:
| Type | Description | 
|---|---|
| Optional[str] | The path to the log file, or None, if the service has never been started before, or if the service daemon output is suppressed. | 
pid: Optional[int]
  
      property
      readonly
  
    Return the PID of the currently running daemon.
Returns:
| Type | Description | 
|---|---|
| Optional[int] | The PID of the daemon, or None, if the service has never been started before. | 
pid_file: Optional[str]
  
      property
      readonly
  
    Get the path to a daemon PID file.
This is where the last known PID of the daemon process is stored.
Returns:
| Type | Description | 
|---|---|
| Optional[str] | The path to the PID file, or None, if the service has never been started before. | 
        local_service_endpoint
    Implementation of a local service endpoint.
        
LocalDaemonServiceEndpoint            (BaseServiceEndpoint)
        
    A service endpoint exposed by a local daemon process.
This class extends the base service endpoint class with functionality concerning the life-cycle management and tracking of endpoints exposed by external services implemented as local daemon processes.
Attributes:
| Name | Type | Description | 
|---|---|---|
| config | LocalDaemonServiceEndpointConfig | service endpoint configuration | 
| status | LocalDaemonServiceEndpointStatus | service endpoint status | 
| monitor | Union[zenml.services.service_monitor.HTTPEndpointHealthMonitor, zenml.services.service_monitor.TCPEndpointHealthMonitor] | optional service endpoint health monitor | 
Source code in zenml/services/local/local_service_endpoint.py
          class LocalDaemonServiceEndpoint(BaseServiceEndpoint):
    """A service endpoint exposed by a local daemon process.
    This class extends the base service endpoint class with functionality
    concerning the life-cycle management and tracking of endpoints exposed
    by external services implemented as local daemon processes.
    Attributes:
        config: service endpoint configuration
        status: service endpoint status
        monitor: optional service endpoint health monitor
    """
    config: LocalDaemonServiceEndpointConfig = Field(
        default_factory=LocalDaemonServiceEndpointConfig
    )
    status: LocalDaemonServiceEndpointStatus = Field(
        default_factory=LocalDaemonServiceEndpointStatus
    )
    monitor: Optional[
        Union[HTTPEndpointHealthMonitor, TCPEndpointHealthMonitor]
    ] = Field(..., discriminator="type", union_mode="left_to_right")
    def _lookup_free_port(self) -> int:
        """Search for a free TCP port for the service endpoint.
        If a preferred TCP port value is explicitly requested through the
        endpoint configuration, it will be checked first. If a port was
        previously used the last time the service was running (i.e. as
        indicated in the service endpoint status), it will be checked next for
        availability.
        As a last resort, this call will search for a free TCP port, if
        `allocate_port` is set to True in the endpoint configuration.
        Returns:
            An available TCP port number
        Raises:
            IOError: if the preferred TCP port is busy and `allocate_port` is
                disabled in the endpoint configuration, or if no free TCP port
                could be otherwise allocated.
        """
        # If a port value is explicitly configured, attempt to use it first
        if self.config.port:
            if port_available(self.config.port, self.config.ip_address):
                return self.config.port
            if not self.config.allocate_port:
                raise IOError(f"TCP port {self.config.port} is not available.")
        # Attempt to reuse the port used when the services was last running
        if self.status.port and port_available(self.status.port):
            return self.status.port
        port = scan_for_available_port()
        if port:
            return port
        raise IOError("No free TCP ports found")
    def prepare_for_start(self) -> None:
        """Prepare the service endpoint for starting.
        This method is called before the service is started.
        """
        self.status.protocol = self.config.protocol
        self.status.hostname = self.config.ip_address
        self.status.port = self._lookup_free_port()
prepare_for_start(self)
    Prepare the service endpoint for starting.
This method is called before the service is started.
Source code in zenml/services/local/local_service_endpoint.py
          def prepare_for_start(self) -> None:
    """Prepare the service endpoint for starting.
    This method is called before the service is started.
    """
    self.status.protocol = self.config.protocol
    self.status.hostname = self.config.ip_address
    self.status.port = self._lookup_free_port()
        
LocalDaemonServiceEndpointConfig            (ServiceEndpointConfig)
        
    Local daemon service endpoint configuration.
Attributes:
| Name | Type | Description | 
|---|---|---|
| protocol | ServiceEndpointProtocol | the TCP protocol implemented by the service endpoint | 
| port | Optional[int] | preferred TCP port value for the service endpoint. If the port
is in use when the service is started, setting  | 
| ip_address | str | the IP address of the service endpoint. If not set, the default localhost IP address will be used. | 
| allocate_port | bool | set to True to allocate a free TCP port for the service endpoint automatically. | 
Source code in zenml/services/local/local_service_endpoint.py
          class LocalDaemonServiceEndpointConfig(ServiceEndpointConfig):
    """Local daemon service endpoint configuration.
    Attributes:
        protocol: the TCP protocol implemented by the service endpoint
        port: preferred TCP port value for the service endpoint. If the port
            is in use when the service is started, setting `allocate_port` to
            True will also try to allocate a new port value, otherwise an
            exception will be raised.
        ip_address: the IP address of the service endpoint. If not set, the
            default localhost IP address will be used.
        allocate_port: set to True to allocate a free TCP port for the
            service endpoint automatically.
    """
    protocol: ServiceEndpointProtocol = ServiceEndpointProtocol.TCP
    port: Optional[int] = None
    ip_address: str = DEFAULT_LOCAL_SERVICE_IP_ADDRESS
    allocate_port: bool = True
        
LocalDaemonServiceEndpointStatus            (ServiceEndpointStatus)
        
    Local daemon service endpoint status.
Source code in zenml/services/local/local_service_endpoint.py
          class LocalDaemonServiceEndpointStatus(ServiceEndpointStatus):
    """Local daemon service endpoint status."""
        service
    Implementation of the ZenML Service class.
        
BaseDeploymentService            (BaseService)
        
    Base class for deployment services.
Source code in zenml/services/service.py
          class BaseDeploymentService(BaseService):
    """Base class for deployment services."""
    @property
    def prediction_url(self) -> Optional[str]:
        """Gets the prediction URL for the endpoint.
        Returns:
            the prediction URL for the endpoint
        """
        return None
    @property
    def healthcheck_url(self) -> Optional[str]:
        """Gets the healthcheck URL for the endpoint.
        Returns:
            the healthcheck URL for the endpoint
        """
        return None
healthcheck_url: Optional[str]
  
      property
      readonly
  
    Gets the healthcheck URL for the endpoint.
Returns:
| Type | Description | 
|---|---|
| Optional[str] | the healthcheck URL for the endpoint | 
prediction_url: Optional[str]
  
      property
      readonly
  
    Gets the prediction URL for the endpoint.
Returns:
| Type | Description | 
|---|---|
| Optional[str] | the prediction URL for the endpoint | 
        
BaseService            (BaseTypedModel)
        
    Base service class.
This class implements generic functionality concerning the life-cycle management and tracking of an external service (e.g. process, container, Kubernetes deployment etc.).
Attributes:
| Name | Type | Description | 
|---|---|---|
| SERVICE_TYPE | ClassVar[zenml.services.service_type.ServiceType] | a service type descriptor with information describing the service class. Every concrete service class must define this. | 
| admin_state | ServiceState | the administrative state of the service. | 
| uuid | UUID | unique UUID identifier for the service instance. | 
| config | ServiceConfig | service configuration | 
| status | ServiceStatus | service status | 
| endpoint | Optional[zenml.services.service_endpoint.BaseServiceEndpoint] | optional service endpoint | 
Source code in zenml/services/service.py
          class BaseService(BaseTypedModel):
    """Base service class.
    This class implements generic functionality concerning the life-cycle
    management and tracking of an external service (e.g. process, container,
    Kubernetes deployment etc.).
    Attributes:
        SERVICE_TYPE: a service type descriptor with information describing
            the service class. Every concrete service class must define this.
        admin_state: the administrative state of the service.
        uuid: unique UUID identifier for the service instance.
        config: service configuration
        status: service status
        endpoint: optional service endpoint
    """
    SERVICE_TYPE: ClassVar[ServiceType]
    uuid: UUID
    admin_state: ServiceState = ServiceState.INACTIVE
    config: ServiceConfig
    status: ServiceStatus
    # TODO [ENG-703]: allow multiple endpoints per service
    endpoint: Optional[BaseServiceEndpoint] = None
    def __init__(
        self,
        **attrs: Any,
    ) -> None:
        """Initialize the service instance.
        Args:
            **attrs: keyword arguments.
        """
        super().__init__(**attrs)
        self.config.name = self.config.name or self.__class__.__name__
    @classmethod
    def from_model(cls, model: "ServiceResponse") -> "BaseService":
        """Loads a service from a model.
        Args:
            model: The ServiceResponse to load from.
        Returns:
            The loaded service object.
        Raises:
            ValueError: if the service source is not found in the model.
        """
        if not model.service_source:
            raise ValueError("Service source not found in the model.")
        class_: Type[BaseService] = source_utils.load_and_validate_class(
            source=model.service_source, expected_class=BaseService
        )
        return class_(
            uuid=model.id,
            admin_state=model.admin_state,
            config=model.config,
            status=model.status,
            service_type=model.service_type.model_dump(),
            endpoint=model.endpoint,
        )
    @classmethod
    def from_json(cls, json_str: str) -> "BaseTypedModel":
        """Loads a service from a JSON string.
        Args:
            json_str: the JSON string to load from.
        Returns:
            The loaded service object.
        """
        service_dict = json.loads(json_str)
        class_: Type[BaseService] = source_utils.load_and_validate_class(
            source=service_dict["type"], expected_class=BaseService
        )
        return class_.from_dict(service_dict)
    @abstractmethod
    def check_status(self) -> Tuple[ServiceState, str]:
        """Check the the current operational state of the external service.
        This method should be overridden by subclasses that implement
        concrete service tracking functionality.
        Returns:
            The operational state of the external service and a message
            providing additional information about that state (e.g. a
            description of the error if one is encountered while checking the
            service status).
        """
    @abstractmethod
    def get_logs(
        self, follow: bool = False, tail: Optional[int] = None
    ) -> Generator[str, bool, None]:
        """Retrieve the service logs.
        This method should be overridden by subclasses that implement
        concrete service tracking functionality.
        Args:
            follow: if True, the logs will be streamed as they are written
            tail: only retrieve the last NUM lines of log output.
        Returns:
            A generator that can be accessed to get the service logs.
        """
    def update_status(self) -> None:
        """Update the status of the service.
        Check the current operational state of the external service
        and update the local operational status information to reflect it.
        This method should be overridden by subclasses that implement
        concrete service status tracking functionality.
        """
        logger.debug(
            "Running status check for service '%s' ...",
            self,
        )
        try:
            state, err = self.check_status()
            logger.debug(
                "Status check results for service '%s': %s [%s]",
                self,
                state.name,
                err,
            )
            self.status.update_state(state, err)
            # don't bother checking the endpoint state if the service is not active
            if self.status.state == ServiceState.INACTIVE:
                return
            if self.endpoint:
                self.endpoint.update_status()
        except Exception as e:
            logger.error(
                f"Failed to update status for service '{self}': {e}",
                exc_info=True,
            )
            self.status.update_state(ServiceState.ERROR, str(e))
    def get_service_status_message(self) -> str:
        """Get a service status message.
        Returns:
            A message providing information about the current operational
            state of the service.
        """
        return (
            f"  Administrative state: `{self.admin_state.value}`\n"
            f"  Operational state: `{self.status.state.value}`\n"
            f"  Last status message: '{self.status.last_error}'\n"
        )
    def poll_service_status(self, timeout: int = 0) -> bool:
        """Polls the external service status.
        It does this until the service operational state matches the
        administrative state, the service enters a failed state, or the timeout
        is reached.
        Args:
            timeout: maximum time to wait for the service operational state
                to match the administrative state, in seconds
        Returns:
            True if the service operational state matches the administrative
            state, False otherwise.
        """
        time_remaining = timeout
        while True:
            if self.admin_state == ServiceState.ACTIVE and self.is_running:
                return True
            if self.admin_state == ServiceState.INACTIVE and self.is_stopped:
                return True
            if self.is_failed:
                return False
            if time_remaining <= 0:
                break
            time.sleep(1)
            time_remaining -= 1
        if timeout > 0:
            logger.error(
                f"Timed out waiting for service {self} to become "
                f"{self.admin_state.value}:\n"
                + self.get_service_status_message()
            )
        return False
    @property
    def is_running(self) -> bool:
        """Check if the service is currently running.
        This method will actively poll the external service to get its status
        and will return the result.
        Returns:
            True if the service is running and active (i.e. the endpoints are
            responsive, if any are configured), otherwise False.
        """
        self.update_status()
        return self.status.state == ServiceState.ACTIVE and (
            not self.endpoint or self.endpoint.is_active()
        )
    @property
    def is_stopped(self) -> bool:
        """Check if the service is currently stopped.
        This method will actively poll the external service to get its status
        and will return the result.
        Returns:
            True if the service is stopped, otherwise False.
        """
        self.update_status()
        return self.status.state == ServiceState.INACTIVE
    @property
    def is_failed(self) -> bool:
        """Check if the service is currently failed.
        This method will actively poll the external service to get its status
        and will return the result.
        Returns:
            True if the service is in a failure state, otherwise False.
        """
        self.update_status()
        return self.status.state == ServiceState.ERROR
    def provision(self) -> None:
        """Provisions resources to run the service.
        Raises:
            NotImplementedError: if the service does not implement provisioning functionality
        """
        raise NotImplementedError(
            f"Provisioning resources not implemented for {self}."
        )
    def deprovision(self, force: bool = False) -> None:
        """Deprovisions all resources used by the service.
        Args:
            force: if True, the service will be deprovisioned even if it is
                in a failed state.
        Raises:
            NotImplementedError: if the service does not implement
                deprovisioning functionality.
        """
        raise NotImplementedError(
            f"Deprovisioning resources not implemented for {self}."
        )
    def update(self, config: ServiceConfig) -> None:
        """Update the service configuration.
        Args:
            config: the new service configuration.
        """
        self.config = config
    @update_service_status(
        pre_status=ServiceState.PENDING_STARTUP,
        post_status=ServiceState.ACTIVE,
    )
    def start(self, timeout: int = 0) -> None:
        """Start the service and optionally wait for it to become active.
        Args:
            timeout: amount of time to wait for the service to become active.
                If set to 0, the method will return immediately after checking
                the service status.
        """
        with console.status(f"Starting service '{self}'.\n"):
            self.admin_state = ServiceState.ACTIVE
            self.provision()
            if timeout > 0 and not self.poll_service_status(timeout):
                logger.error(
                    f"Failed to start service {self}\n"
                    + self.get_service_status_message()
                )
    @update_service_status(
        pre_status=ServiceState.PENDING_SHUTDOWN,
        post_status=ServiceState.INACTIVE,
    )
    def stop(self, timeout: int = 0, force: bool = False) -> None:
        """Stop the service and optionally wait for it to shutdown.
        Args:
            timeout: amount of time to wait for the service to shutdown.
                If set to 0, the method will return immediately after checking
                the service status.
            force: if True, the service will be stopped even if it is not
                currently running.
        """
        with console.status(f"Stopping service '{self}'.\n"):
            self.admin_state = ServiceState.INACTIVE
            self.deprovision(force)
            if timeout > 0:
                self.poll_service_status(timeout)
                if not self.is_stopped:
                    logger.error(
                        f"Failed to stop service {self}. Last state: "
                        f"'{self.status.state.value}'. Last error: "
                        f"'{self.status.last_error}'"
                    )
    def get_prediction_url(self) -> Optional[str]:
        """Gets the prediction URL for the endpoint.
        Returns:
            the prediction URL for the endpoint
        """
        prediction_url = None
        if isinstance(self, BaseDeploymentService) and self.prediction_url:
            prediction_url = self.prediction_url
        elif self.endpoint:
            prediction_url = (
                self.endpoint.status.uri if self.endpoint.status else None
            )
        return prediction_url
    def get_healthcheck_url(self) -> Optional[str]:
        """Gets the healthcheck URL for the endpoint.
        Returns:
            the healthcheck URL for the endpoint
        """
        return (
            self.endpoint.monitor.get_healthcheck_uri(self.endpoint)
            if (self.endpoint and self.endpoint.monitor)
            and isinstance(self.endpoint.monitor, HTTPEndpointHealthMonitor)
            else None
        )
    def __repr__(self) -> str:
        """String representation of the service.
        Returns:
            A string representation of the service.
        """
        return f"{self.__class__.__qualname__}[{self.uuid}] (type: {self.SERVICE_TYPE.type}, flavor: {self.SERVICE_TYPE.flavor})"
    def __str__(self) -> str:
        """String representation of the service.
        Returns:
            A string representation of the service.
        """
        return self.__repr__()
    model_config = ConfigDict(
        # validate attribute assignments
        validate_assignment=True,
    )
is_failed: bool
  
      property
      readonly
  
    Check if the service is currently failed.
This method will actively poll the external service to get its status and will return the result.
Returns:
| Type | Description | 
|---|---|
| bool | True if the service is in a failure state, otherwise False. | 
is_running: bool
  
      property
      readonly
  
    Check if the service is currently running.
This method will actively poll the external service to get its status and will return the result.
Returns:
| Type | Description | 
|---|---|
| bool | True if the service is running and active (i.e. the endpoints are responsive, if any are configured), otherwise False. | 
is_stopped: bool
  
      property
      readonly
  
    Check if the service is currently stopped.
This method will actively poll the external service to get its status and will return the result.
Returns:
| Type | Description | 
|---|---|
| bool | True if the service is stopped, otherwise False. | 
__init__(self, **attrs)
  
      special
  
    Initialize the service instance.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| **attrs | Any | keyword arguments. | {} | 
Source code in zenml/services/service.py
          def __init__(
    self,
    **attrs: Any,
) -> None:
    """Initialize the service instance.
    Args:
        **attrs: keyword arguments.
    """
    super().__init__(**attrs)
    self.config.name = self.config.name or self.__class__.__name__
__repr__(self)
  
      special
  
    String representation of the service.
Returns:
| Type | Description | 
|---|---|
| str | A string representation of the service. | 
Source code in zenml/services/service.py
          def __repr__(self) -> str:
    """String representation of the service.
    Returns:
        A string representation of the service.
    """
    return f"{self.__class__.__qualname__}[{self.uuid}] (type: {self.SERVICE_TYPE.type}, flavor: {self.SERVICE_TYPE.flavor})"
__str__(self)
  
      special
  
    String representation of the service.
Returns:
| Type | Description | 
|---|---|
| str | A string representation of the service. | 
Source code in zenml/services/service.py
          def __str__(self) -> str:
    """String representation of the service.
    Returns:
        A string representation of the service.
    """
    return self.__repr__()
check_status(self)
    Check the the current operational state of the external service.
This method should be overridden by subclasses that implement concrete service tracking functionality.
Returns:
| Type | Description | 
|---|---|
| Tuple[zenml.services.service_status.ServiceState, str] | The operational state of the external service and a message providing additional information about that state (e.g. a description of the error if one is encountered while checking the service status). | 
Source code in zenml/services/service.py
          @abstractmethod
def check_status(self) -> Tuple[ServiceState, str]:
    """Check the the current operational state of the external service.
    This method should be overridden by subclasses that implement
    concrete service tracking functionality.
    Returns:
        The operational state of the external service and a message
        providing additional information about that state (e.g. a
        description of the error if one is encountered while checking the
        service status).
    """
deprovision(self, force=False)
    Deprovisions all resources used by the service.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| force | bool | if True, the service will be deprovisioned even if it is in a failed state. | False | 
Exceptions:
| Type | Description | 
|---|---|
| NotImplementedError | if the service does not implement deprovisioning functionality. | 
Source code in zenml/services/service.py
          def deprovision(self, force: bool = False) -> None:
    """Deprovisions all resources used by the service.
    Args:
        force: if True, the service will be deprovisioned even if it is
            in a failed state.
    Raises:
        NotImplementedError: if the service does not implement
            deprovisioning functionality.
    """
    raise NotImplementedError(
        f"Deprovisioning resources not implemented for {self}."
    )
from_json(json_str)
  
      classmethod
  
    Loads a service from a JSON string.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| json_str | str | the JSON string to load from. | required | 
Returns:
| Type | Description | 
|---|---|
| BaseTypedModel | The loaded service object. | 
Source code in zenml/services/service.py
          @classmethod
def from_json(cls, json_str: str) -> "BaseTypedModel":
    """Loads a service from a JSON string.
    Args:
        json_str: the JSON string to load from.
    Returns:
        The loaded service object.
    """
    service_dict = json.loads(json_str)
    class_: Type[BaseService] = source_utils.load_and_validate_class(
        source=service_dict["type"], expected_class=BaseService
    )
    return class_.from_dict(service_dict)
from_model(model)
  
      classmethod
  
    Loads a service from a model.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| model | ServiceResponse | The ServiceResponse to load from. | required | 
Returns:
| Type | Description | 
|---|---|
| BaseService | The loaded service object. | 
Exceptions:
| Type | Description | 
|---|---|
| ValueError | if the service source is not found in the model. | 
Source code in zenml/services/service.py
          @classmethod
def from_model(cls, model: "ServiceResponse") -> "BaseService":
    """Loads a service from a model.
    Args:
        model: The ServiceResponse to load from.
    Returns:
        The loaded service object.
    Raises:
        ValueError: if the service source is not found in the model.
    """
    if not model.service_source:
        raise ValueError("Service source not found in the model.")
    class_: Type[BaseService] = source_utils.load_and_validate_class(
        source=model.service_source, expected_class=BaseService
    )
    return class_(
        uuid=model.id,
        admin_state=model.admin_state,
        config=model.config,
        status=model.status,
        service_type=model.service_type.model_dump(),
        endpoint=model.endpoint,
    )
get_healthcheck_url(self)
    Gets the healthcheck URL for the endpoint.
Returns:
| Type | Description | 
|---|---|
| Optional[str] | the healthcheck URL for the endpoint | 
Source code in zenml/services/service.py
          def get_healthcheck_url(self) -> Optional[str]:
    """Gets the healthcheck URL for the endpoint.
    Returns:
        the healthcheck URL for the endpoint
    """
    return (
        self.endpoint.monitor.get_healthcheck_uri(self.endpoint)
        if (self.endpoint and self.endpoint.monitor)
        and isinstance(self.endpoint.monitor, HTTPEndpointHealthMonitor)
        else None
    )
get_logs(self, follow=False, tail=None)
    Retrieve the service logs.
This method should be overridden by subclasses that implement concrete service tracking functionality.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| follow | bool | if True, the logs will be streamed as they are written | False | 
| tail | Optional[int] | only retrieve the last NUM lines of log output. | None | 
Returns:
| Type | Description | 
|---|---|
| Generator[str, bool, NoneType] | A generator that can be accessed to get the service logs. | 
Source code in zenml/services/service.py
          @abstractmethod
def get_logs(
    self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
    """Retrieve the service logs.
    This method should be overridden by subclasses that implement
    concrete service tracking functionality.
    Args:
        follow: if True, the logs will be streamed as they are written
        tail: only retrieve the last NUM lines of log output.
    Returns:
        A generator that can be accessed to get the service logs.
    """
get_prediction_url(self)
    Gets the prediction URL for the endpoint.
Returns:
| Type | Description | 
|---|---|
| Optional[str] | the prediction URL for the endpoint | 
Source code in zenml/services/service.py
          def get_prediction_url(self) -> Optional[str]:
    """Gets the prediction URL for the endpoint.
    Returns:
        the prediction URL for the endpoint
    """
    prediction_url = None
    if isinstance(self, BaseDeploymentService) and self.prediction_url:
        prediction_url = self.prediction_url
    elif self.endpoint:
        prediction_url = (
            self.endpoint.status.uri if self.endpoint.status else None
        )
    return prediction_url
get_service_status_message(self)
    Get a service status message.
Returns:
| Type | Description | 
|---|---|
| str | A message providing information about the current operational state of the service. | 
Source code in zenml/services/service.py
          def get_service_status_message(self) -> str:
    """Get a service status message.
    Returns:
        A message providing information about the current operational
        state of the service.
    """
    return (
        f"  Administrative state: `{self.admin_state.value}`\n"
        f"  Operational state: `{self.status.state.value}`\n"
        f"  Last status message: '{self.status.last_error}'\n"
    )
poll_service_status(self, timeout=0)
    Polls the external service status.
It does this until the service operational state matches the administrative state, the service enters a failed state, or the timeout is reached.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| timeout | int | maximum time to wait for the service operational state to match the administrative state, in seconds | 0 | 
Returns:
| Type | Description | 
|---|---|
| bool | True if the service operational state matches the administrative state, False otherwise. | 
Source code in zenml/services/service.py
          def poll_service_status(self, timeout: int = 0) -> bool:
    """Polls the external service status.
    It does this until the service operational state matches the
    administrative state, the service enters a failed state, or the timeout
    is reached.
    Args:
        timeout: maximum time to wait for the service operational state
            to match the administrative state, in seconds
    Returns:
        True if the service operational state matches the administrative
        state, False otherwise.
    """
    time_remaining = timeout
    while True:
        if self.admin_state == ServiceState.ACTIVE and self.is_running:
            return True
        if self.admin_state == ServiceState.INACTIVE and self.is_stopped:
            return True
        if self.is_failed:
            return False
        if time_remaining <= 0:
            break
        time.sleep(1)
        time_remaining -= 1
    if timeout > 0:
        logger.error(
            f"Timed out waiting for service {self} to become "
            f"{self.admin_state.value}:\n"
            + self.get_service_status_message()
        )
    return False
provision(self)
    Provisions resources to run the service.
Exceptions:
| Type | Description | 
|---|---|
| NotImplementedError | if the service does not implement provisioning functionality | 
Source code in zenml/services/service.py
          def provision(self) -> None:
    """Provisions resources to run the service.
    Raises:
        NotImplementedError: if the service does not implement provisioning functionality
    """
    raise NotImplementedError(
        f"Provisioning resources not implemented for {self}."
    )
start(self, timeout=0)
    Start the service and optionally wait for it to become active.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| timeout | int | amount of time to wait for the service to become active. If set to 0, the method will return immediately after checking the service status. | 0 | 
Source code in zenml/services/service.py
          @update_service_status(
    pre_status=ServiceState.PENDING_STARTUP,
    post_status=ServiceState.ACTIVE,
)
def start(self, timeout: int = 0) -> None:
    """Start the service and optionally wait for it to become active.
    Args:
        timeout: amount of time to wait for the service to become active.
            If set to 0, the method will return immediately after checking
            the service status.
    """
    with console.status(f"Starting service '{self}'.\n"):
        self.admin_state = ServiceState.ACTIVE
        self.provision()
        if timeout > 0 and not self.poll_service_status(timeout):
            logger.error(
                f"Failed to start service {self}\n"
                + self.get_service_status_message()
            )
stop(self, timeout=0, force=False)
    Stop the service and optionally wait for it to shutdown.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| timeout | int | amount of time to wait for the service to shutdown. If set to 0, the method will return immediately after checking the service status. | 0 | 
| force | bool | if True, the service will be stopped even if it is not currently running. | False | 
Source code in zenml/services/service.py
          @update_service_status(
    pre_status=ServiceState.PENDING_SHUTDOWN,
    post_status=ServiceState.INACTIVE,
)
def stop(self, timeout: int = 0, force: bool = False) -> None:
    """Stop the service and optionally wait for it to shutdown.
    Args:
        timeout: amount of time to wait for the service to shutdown.
            If set to 0, the method will return immediately after checking
            the service status.
        force: if True, the service will be stopped even if it is not
            currently running.
    """
    with console.status(f"Stopping service '{self}'.\n"):
        self.admin_state = ServiceState.INACTIVE
        self.deprovision(force)
        if timeout > 0:
            self.poll_service_status(timeout)
            if not self.is_stopped:
                logger.error(
                    f"Failed to stop service {self}. Last state: "
                    f"'{self.status.state.value}'. Last error: "
                    f"'{self.status.last_error}'"
                )
update(self, config)
    Update the service configuration.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| config | ServiceConfig | the new service configuration. | required | 
Source code in zenml/services/service.py
          def update(self, config: ServiceConfig) -> None:
    """Update the service configuration.
    Args:
        config: the new service configuration.
    """
    self.config = config
update_status(self)
    Update the status of the service.
Check the current operational state of the external service and update the local operational status information to reflect it.
This method should be overridden by subclasses that implement concrete service status tracking functionality.
Source code in zenml/services/service.py
          def update_status(self) -> None:
    """Update the status of the service.
    Check the current operational state of the external service
    and update the local operational status information to reflect it.
    This method should be overridden by subclasses that implement
    concrete service status tracking functionality.
    """
    logger.debug(
        "Running status check for service '%s' ...",
        self,
    )
    try:
        state, err = self.check_status()
        logger.debug(
            "Status check results for service '%s': %s [%s]",
            self,
            state.name,
            err,
        )
        self.status.update_state(state, err)
        # don't bother checking the endpoint state if the service is not active
        if self.status.state == ServiceState.INACTIVE:
            return
        if self.endpoint:
            self.endpoint.update_status()
    except Exception as e:
        logger.error(
            f"Failed to update status for service '{self}': {e}",
            exc_info=True,
        )
        self.status.update_state(ServiceState.ERROR, str(e))
        
ServiceConfig            (BaseTypedModel)
        
    Generic service configuration.
Concrete service classes should extend this class and add additional attributes that they want to see reflected and used in the service configuration.
Attributes:
| Name | Type | Description | 
|---|---|---|
| name | str | name for the service instance | 
| description | str | description of the service | 
| pipeline_name | str | name of the pipeline that spun up the service | 
| pipeline_step_name | str | name of the pipeline step that spun up the service | 
| run_name | name of the pipeline run that spun up the service. | 
Source code in zenml/services/service.py
          class ServiceConfig(BaseTypedModel):
    """Generic service configuration.
    Concrete service classes should extend this class and add additional
    attributes that they want to see reflected and used in the service
    configuration.
    Attributes:
        name: name for the service instance
        description: description of the service
        pipeline_name: name of the pipeline that spun up the service
        pipeline_step_name: name of the pipeline step that spun up the service
        run_name: name of the pipeline run that spun up the service.
    """
    name: str = ""
    description: str = ""
    pipeline_name: str = ""
    pipeline_step_name: str = ""
    model_name: str = ""
    model_version: str = ""
    service_name: str = ""
    # TODO: In Pydantic v2, the `model_` is a protected namespaces for all
    #  fields defined under base models. If not handled, this raises a warning.
    #  It is possible to suppress this warning message with the following
    #  configuration, however the ultimate solution is to rename these fields.
    #  Even though they do not cause any problems right now, if we are not
    #  careful we might overwrite some fields protected by pydantic.
    model_config = ConfigDict(protected_namespaces=())
    def __init__(self, **data: Any):
        """Initialize the service configuration.
        Args:
            **data: keyword arguments.
        Raises:
            ValueError: if neither 'name' nor 'model_name' is set.
        """
        super().__init__(**data)
        if self.name or self.model_name:
            self.service_name = data.get(
                "service_name",
                f"{ZENM_ENDPOINT_PREFIX}{self.name or self.model_name}",
            )
        else:
            raise ValueError("Either 'name' or 'model_name' must be set.")
    def get_service_labels(self) -> Dict[str, str]:
        """Get the service labels.
        Returns:
            a dictionary of service labels.
        """
        labels = {}
        for k, v in self.model_dump().items():
            label = f"zenml_{k}".upper()
            labels[label] = str(v)
        return labels
__init__(self, **data)
  
      special
  
    Initialize the service configuration.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| **data | Any | keyword arguments. | {} | 
Exceptions:
| Type | Description | 
|---|---|
| ValueError | if neither 'name' nor 'model_name' is set. | 
Source code in zenml/services/service.py
          def __init__(self, **data: Any):
    """Initialize the service configuration.
    Args:
        **data: keyword arguments.
    Raises:
        ValueError: if neither 'name' nor 'model_name' is set.
    """
    super().__init__(**data)
    if self.name or self.model_name:
        self.service_name = data.get(
            "service_name",
            f"{ZENM_ENDPOINT_PREFIX}{self.name or self.model_name}",
        )
    else:
        raise ValueError("Either 'name' or 'model_name' must be set.")
get_service_labels(self)
    Get the service labels.
Returns:
| Type | Description | 
|---|---|
| Dict[str, str] | a dictionary of service labels. | 
Source code in zenml/services/service.py
          def get_service_labels(self) -> Dict[str, str]:
    """Get the service labels.
    Returns:
        a dictionary of service labels.
    """
    labels = {}
    for k, v in self.model_dump().items():
        label = f"zenml_{k}".upper()
        labels[label] = str(v)
    return labels
update_service_status(pre_status=None, post_status=None, error_status=<ServiceState.ERROR: 'error'>)
    A decorator to update the service status before and after a method call.
This decorator is used to wrap service methods and update the service status before and after the method call. If the method raises an exception, the service status is updated to reflect the error state.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| pre_status | Optional[zenml.services.service_status.ServiceState] | the status to update before the method call. | None | 
| post_status | Optional[zenml.services.service_status.ServiceState] | the status to update after the method call. | None | 
| error_status | ServiceState | the status to update if the method raises an exception. | <ServiceState.ERROR: 'error'> | 
Returns:
| Type | Description | 
|---|---|
| Callable[..., Any] | The wrapped method with exception handling. | 
Source code in zenml/services/service.py
          def update_service_status(
    pre_status: Optional[ServiceState] = None,
    post_status: Optional[ServiceState] = None,
    error_status: ServiceState = ServiceState.ERROR,
) -> Callable[[T], T]:
    """A decorator to update the service status before and after a method call.
    This decorator is used to wrap service methods and update the service status
    before and after the method call. If the method raises an exception, the
    service status is updated to reflect the error state.
    Args:
        pre_status: the status to update before the method call.
        post_status: the status to update after the method call.
        error_status: the status to update if the method raises an exception.
    Returns:
        Callable[..., Any]: The wrapped method with exception handling.
    """
    def decorator(func: T) -> T:
        @wraps(func)
        def wrapper(self: "BaseService", *args: Any, **kwargs: Any) -> Any:
            if pre_status:
                self.status.update_state(pre_status, "")
            try:
                logger.info(f"Calling {func.__name__} method...")
                result = func(self, *args, **kwargs)
                logger.info(f"{func.__name__} method executed successfully.")
                if post_status:
                    self.status.update_state(post_status, "")
                return result
            except Exception as e:
                logger.error(
                    f"Error occurred in {func.__name__} method: {str(e)}"
                )
                self.status.update_state(error_status, str(e))
                raise
        return wrapper  # type: ignore
    return decorator
        service_endpoint
    Implementation of a ZenML service endpoint.
        
BaseServiceEndpoint            (BaseTypedModel)
        
    Base service class.
This class implements generic functionality concerning the life-cycle management and tracking of an external service endpoint (e.g. a HTTP/HTTPS API or generic TCP endpoint exposed by a service).
Attributes:
| Name | Type | Description | 
|---|---|---|
| admin_state | ServiceState | the administrative state of the service endpoint | 
| config | LocalDaemonServiceEndpointConfig | service endpoint configuration | 
| status | LocalDaemonServiceEndpointStatus | service endpoint status | 
| monitor | Union[zenml.services.service_monitor.HTTPEndpointHealthMonitor, zenml.services.service_monitor.TCPEndpointHealthMonitor] | optional service endpoint health monitor | 
Source code in zenml/services/service_endpoint.py
          class BaseServiceEndpoint(BaseTypedModel):
    """Base service class.
    This class implements generic functionality concerning the life-cycle
    management and tracking of an external service endpoint (e.g. a HTTP/HTTPS
    API or generic TCP endpoint exposed by a service).
    Attributes:
        admin_state: the administrative state of the service endpoint
        config: service endpoint configuration
        status: service endpoint status
        monitor: optional service endpoint health monitor
    """
    admin_state: ServiceState = ServiceState.INACTIVE
    config: ServiceEndpointConfig
    status: ServiceEndpointStatus
    # TODO [ENG-701]: allow multiple monitors per endpoint
    monitor: Optional[BaseServiceEndpointHealthMonitor] = None
    def __init__(
        self,
        *args: Any,
        **kwargs: Any,
    ) -> None:
        """Initialize the service endpoint.
        Args:
            *args: positional arguments.
            **kwargs: keyword arguments.
        """
        super().__init__(*args, **kwargs)
        self.config.name = self.config.name or self.__class__.__name__
    def check_status(self) -> Tuple[ServiceState, str]:
        """Check the the current operational state of the external service endpoint.
        Returns:
            The operational state of the external service endpoint and a
            message providing additional information about that state
            (e.g. a description of the error, if one is encountered while
            checking the service status).
        """
        if not self.monitor:
            # no health monitor configured; assume service operational state
            # always matches the admin state
            return self.admin_state, ""
        return self.monitor.check_endpoint_status(self)
    def update_status(self) -> None:
        """Check the the current operational state of the external service endpoint.
        It updates the local operational status information accordingly.
        """
        logger.debug(
            "Running health check for service endpoint '%s' ...",
            self.config.name,
        )
        state, err = self.check_status()
        logger.debug(
            "Health check results for service endpoint '%s': %s [%s]",
            self.config.name,
            state.name,
            err,
        )
        self.status.update_state(state, err)
    def is_active(self) -> bool:
        """Check if the service endpoint is active.
        This means that it is responsive and can receive requests). This method
        will use the configured health monitor to actively check the endpoint
        status and will return the result.
        Returns:
            True if the service endpoint is active, otherwise False.
        """
        self.update_status()
        return self.status.state == ServiceState.ACTIVE
    def is_inactive(self) -> bool:
        """Check if the service endpoint is inactive.
        This means that it is unresponsive and cannot receive requests. This
        method will use the configured health monitor to actively check the
        endpoint status and will return the result.
        Returns:
            True if the service endpoint is inactive, otherwise False.
        """
        self.update_status()
        return self.status.state == ServiceState.INACTIVE
__init__(self, *args, **kwargs)
  
      special
  
    Initialize the service endpoint.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| *args | Any | positional arguments. | () | 
| **kwargs | Any | keyword arguments. | {} | 
Source code in zenml/services/service_endpoint.py
          def __init__(
    self,
    *args: Any,
    **kwargs: Any,
) -> None:
    """Initialize the service endpoint.
    Args:
        *args: positional arguments.
        **kwargs: keyword arguments.
    """
    super().__init__(*args, **kwargs)
    self.config.name = self.config.name or self.__class__.__name__
check_status(self)
    Check the the current operational state of the external service endpoint.
Returns:
| Type | Description | 
|---|---|
| Tuple[zenml.services.service_status.ServiceState, str] | The operational state of the external service endpoint and a message providing additional information about that state (e.g. a description of the error, if one is encountered while checking the service status). | 
Source code in zenml/services/service_endpoint.py
          def check_status(self) -> Tuple[ServiceState, str]:
    """Check the the current operational state of the external service endpoint.
    Returns:
        The operational state of the external service endpoint and a
        message providing additional information about that state
        (e.g. a description of the error, if one is encountered while
        checking the service status).
    """
    if not self.monitor:
        # no health monitor configured; assume service operational state
        # always matches the admin state
        return self.admin_state, ""
    return self.monitor.check_endpoint_status(self)
is_active(self)
    Check if the service endpoint is active.
This means that it is responsive and can receive requests). This method will use the configured health monitor to actively check the endpoint status and will return the result.
Returns:
| Type | Description | 
|---|---|
| bool | True if the service endpoint is active, otherwise False. | 
Source code in zenml/services/service_endpoint.py
          def is_active(self) -> bool:
    """Check if the service endpoint is active.
    This means that it is responsive and can receive requests). This method
    will use the configured health monitor to actively check the endpoint
    status and will return the result.
    Returns:
        True if the service endpoint is active, otherwise False.
    """
    self.update_status()
    return self.status.state == ServiceState.ACTIVE
is_inactive(self)
    Check if the service endpoint is inactive.
This means that it is unresponsive and cannot receive requests. This method will use the configured health monitor to actively check the endpoint status and will return the result.
Returns:
| Type | Description | 
|---|---|
| bool | True if the service endpoint is inactive, otherwise False. | 
Source code in zenml/services/service_endpoint.py
          def is_inactive(self) -> bool:
    """Check if the service endpoint is inactive.
    This means that it is unresponsive and cannot receive requests. This
    method will use the configured health monitor to actively check the
    endpoint status and will return the result.
    Returns:
        True if the service endpoint is inactive, otherwise False.
    """
    self.update_status()
    return self.status.state == ServiceState.INACTIVE
update_status(self)
    Check the the current operational state of the external service endpoint.
It updates the local operational status information accordingly.
Source code in zenml/services/service_endpoint.py
          def update_status(self) -> None:
    """Check the the current operational state of the external service endpoint.
    It updates the local operational status information accordingly.
    """
    logger.debug(
        "Running health check for service endpoint '%s' ...",
        self.config.name,
    )
    state, err = self.check_status()
    logger.debug(
        "Health check results for service endpoint '%s': %s [%s]",
        self.config.name,
        state.name,
        err,
    )
    self.status.update_state(state, err)
        
ServiceEndpointConfig            (BaseTypedModel)
        
    Generic service endpoint configuration.
Concrete service classes should extend this class and add additional attributes that they want to see reflected and use in the endpoint configuration.
Attributes:
| Name | Type | Description | 
|---|---|---|
| name | str | unique name for the service endpoint | 
| description | str | description of the service endpoint | 
Source code in zenml/services/service_endpoint.py
          class ServiceEndpointConfig(BaseTypedModel):
    """Generic service endpoint configuration.
    Concrete service classes should extend this class and add additional
    attributes that they want to see reflected and use in the endpoint
    configuration.
    Attributes:
        name: unique name for the service endpoint
        description: description of the service endpoint
    """
    name: str = ""
    description: str = ""
        
ServiceEndpointProtocol            (StrEnum)
        
    Possible endpoint protocol values.
Source code in zenml/services/service_endpoint.py
          class ServiceEndpointProtocol(StrEnum):
    """Possible endpoint protocol values."""
    TCP = "tcp"
    HTTP = "http"
    HTTPS = "https"
        
ServiceEndpointStatus            (ServiceStatus)
        
    Status information describing the operational state of a service endpoint.
For example, this could be a HTTP/HTTPS API or generic TCP endpoint exposed by a service. Concrete service classes should extend this class and add additional attributes that make up the operational state of the service endpoint.
Attributes:
| Name | Type | Description | 
|---|---|---|
| protocol | ServiceEndpointProtocol | the TCP protocol used by the service endpoint | 
| hostname | Optional[str] | the hostname where the service endpoint is accessible | 
| port | Optional[int] | the current TCP port where the service endpoint is accessible | 
Source code in zenml/services/service_endpoint.py
          class ServiceEndpointStatus(ServiceStatus):
    """Status information describing the operational state of a service endpoint.
    For example, this could be a HTTP/HTTPS API or generic TCP endpoint exposed
    by a service. Concrete service classes should extend this class and add
    additional attributes that make up the operational state of the service
    endpoint.
    Attributes:
        protocol: the TCP protocol used by the service endpoint
        hostname: the hostname where the service endpoint is accessible
        port: the current TCP port where the service endpoint is accessible
    """
    protocol: ServiceEndpointProtocol = ServiceEndpointProtocol.TCP
    hostname: Optional[str] = None
    port: Optional[int] = None
    @property
    def uri(self) -> Optional[str]:
        """Get the URI of the service endpoint.
        Returns:
            The URI of the service endpoint or None, if the service endpoint
            operational status doesn't have the required information.
        """
        if not self.hostname or not self.port or not self.protocol:
            # the service is not yet in a state in which the endpoint hostname
            # port and protocol are known
            return None
        hostname = self.hostname
        if hostname == "0.0.0.0":  # nosec
            hostname = DEFAULT_LOCAL_SERVICE_IP_ADDRESS
        return f"{self.protocol.value}://{hostname}:{self.port}"
uri: Optional[str]
  
      property
      readonly
  
    Get the URI of the service endpoint.
Returns:
| Type | Description | 
|---|---|
| Optional[str] | The URI of the service endpoint or None, if the service endpoint operational status doesn't have the required information. | 
        service_monitor
    Implementation of the service health monitor.
        
BaseServiceEndpointHealthMonitor            (BaseTypedModel)
        
    Base class used for service endpoint health monitors.
Attributes:
| Name | Type | Description | 
|---|---|---|
| config | ServiceEndpointHealthMonitorConfig | health monitor configuration for endpoint | 
Source code in zenml/services/service_monitor.py
          class BaseServiceEndpointHealthMonitor(BaseTypedModel):
    """Base class used for service endpoint health monitors.
    Attributes:
        config: health monitor configuration for endpoint
    """
    config: ServiceEndpointHealthMonitorConfig = Field(
        default_factory=ServiceEndpointHealthMonitorConfig
    )
    @abstractmethod
    def check_endpoint_status(
        self, endpoint: "BaseServiceEndpoint"
    ) -> Tuple[ServiceState, str]:
        """Check the the current operational state of the external service endpoint.
        Args:
            endpoint: service endpoint to check
        This method should be overridden by subclasses that implement
        concrete service endpoint tracking functionality.
        Returns:
            The operational state of the external service endpoint and an
            optional error message, if an error is encountered while checking
            the service endpoint status.
        """
check_endpoint_status(self, endpoint)
    Check the the current operational state of the external service endpoint.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| endpoint | BaseServiceEndpoint | service endpoint to check | required | 
This method should be overridden by subclasses that implement concrete service endpoint tracking functionality.
Returns:
| Type | Description | 
|---|---|
| Tuple[zenml.services.service_status.ServiceState, str] | The operational state of the external service endpoint and an optional error message, if an error is encountered while checking the service endpoint status. | 
Source code in zenml/services/service_monitor.py
          @abstractmethod
def check_endpoint_status(
    self, endpoint: "BaseServiceEndpoint"
) -> Tuple[ServiceState, str]:
    """Check the the current operational state of the external service endpoint.
    Args:
        endpoint: service endpoint to check
    This method should be overridden by subclasses that implement
    concrete service endpoint tracking functionality.
    Returns:
        The operational state of the external service endpoint and an
        optional error message, if an error is encountered while checking
        the service endpoint status.
    """
        
HTTPEndpointHealthMonitor            (BaseServiceEndpointHealthMonitor)
        
    HTTP service endpoint health monitor.
Attributes:
| Name | Type | Description | 
|---|---|---|
| config | HTTPEndpointHealthMonitorConfig | health monitor configuration for HTTP endpoint | 
Source code in zenml/services/service_monitor.py
          class HTTPEndpointHealthMonitor(BaseServiceEndpointHealthMonitor):
    """HTTP service endpoint health monitor.
    Attributes:
        config: health monitor configuration for HTTP endpoint
    """
    config: HTTPEndpointHealthMonitorConfig = Field(
        default_factory=HTTPEndpointHealthMonitorConfig
    )
    def get_healthcheck_uri(
        self, endpoint: "BaseServiceEndpoint"
    ) -> Optional[str]:
        """Get the healthcheck URI for the given service endpoint.
        Args:
            endpoint: service endpoint to get the healthcheck URI for
        Returns:
            The healthcheck URI for the given service endpoint or None, if
            the service endpoint doesn't have a healthcheck URI.
        """
        uri = endpoint.status.uri
        if not uri:
            return None
        if not self.config.healthcheck_uri_path:
            return uri
        return (
            f"{uri.rstrip('/')}/{self.config.healthcheck_uri_path.lstrip('/')}"
        )
    def check_endpoint_status(
        self, endpoint: "BaseServiceEndpoint"
    ) -> Tuple[ServiceState, str]:
        """Run a HTTP endpoint API healthcheck.
        Args:
            endpoint: service endpoint to check.
        Returns:
            The operational state of the external HTTP endpoint and an
            optional message describing that state (e.g. an error message,
            if an error is encountered while checking the HTTP endpoint
            status).
        """
        from zenml.services.service_endpoint import ServiceEndpointProtocol
        if endpoint.status.protocol not in [
            ServiceEndpointProtocol.HTTP,
            ServiceEndpointProtocol.HTTPS,
        ]:
            return (
                ServiceState.ERROR,
                "endpoint protocol is not HTTP nor HTTPS.",
            )
        check_uri = self.get_healthcheck_uri(endpoint)
        if not check_uri:
            return ServiceState.ERROR, "no HTTP healthcheck URI available"
        logger.debug("Running HTTP healthcheck for URI: %s", check_uri)
        try:
            if self.config.use_head_request:
                r = requests.head(
                    check_uri,
                    timeout=self.config.http_timeout,
                )
            else:
                r = requests.get(
                    check_uri,
                    timeout=self.config.http_timeout,
                )
            if r.status_code == self.config.http_status_code:
                # the endpoint is healthy
                return ServiceState.ACTIVE, ""
            error = f"HTTP endpoint healthcheck returned unexpected status code: {r.status_code}"
        except requests.ConnectionError as e:
            error = f"HTTP endpoint healthcheck connection error: {str(e)}"
        except requests.Timeout as e:
            error = f"HTTP endpoint healthcheck request timed out: {str(e)}"
        except requests.RequestException as e:
            error = (
                f"unexpected error encountered while running HTTP endpoint "
                f"healthcheck: {str(e)}"
            )
        return ServiceState.ERROR, error
check_endpoint_status(self, endpoint)
    Run a HTTP endpoint API healthcheck.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| endpoint | BaseServiceEndpoint | service endpoint to check. | required | 
Returns:
| Type | Description | 
|---|---|
| Tuple[zenml.services.service_status.ServiceState, str] | The operational state of the external HTTP endpoint and an optional message describing that state (e.g. an error message, if an error is encountered while checking the HTTP endpoint status). | 
Source code in zenml/services/service_monitor.py
          def check_endpoint_status(
    self, endpoint: "BaseServiceEndpoint"
) -> Tuple[ServiceState, str]:
    """Run a HTTP endpoint API healthcheck.
    Args:
        endpoint: service endpoint to check.
    Returns:
        The operational state of the external HTTP endpoint and an
        optional message describing that state (e.g. an error message,
        if an error is encountered while checking the HTTP endpoint
        status).
    """
    from zenml.services.service_endpoint import ServiceEndpointProtocol
    if endpoint.status.protocol not in [
        ServiceEndpointProtocol.HTTP,
        ServiceEndpointProtocol.HTTPS,
    ]:
        return (
            ServiceState.ERROR,
            "endpoint protocol is not HTTP nor HTTPS.",
        )
    check_uri = self.get_healthcheck_uri(endpoint)
    if not check_uri:
        return ServiceState.ERROR, "no HTTP healthcheck URI available"
    logger.debug("Running HTTP healthcheck for URI: %s", check_uri)
    try:
        if self.config.use_head_request:
            r = requests.head(
                check_uri,
                timeout=self.config.http_timeout,
            )
        else:
            r = requests.get(
                check_uri,
                timeout=self.config.http_timeout,
            )
        if r.status_code == self.config.http_status_code:
            # the endpoint is healthy
            return ServiceState.ACTIVE, ""
        error = f"HTTP endpoint healthcheck returned unexpected status code: {r.status_code}"
    except requests.ConnectionError as e:
        error = f"HTTP endpoint healthcheck connection error: {str(e)}"
    except requests.Timeout as e:
        error = f"HTTP endpoint healthcheck request timed out: {str(e)}"
    except requests.RequestException as e:
        error = (
            f"unexpected error encountered while running HTTP endpoint "
            f"healthcheck: {str(e)}"
        )
    return ServiceState.ERROR, error
get_healthcheck_uri(self, endpoint)
    Get the healthcheck URI for the given service endpoint.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| endpoint | BaseServiceEndpoint | service endpoint to get the healthcheck URI for | required | 
Returns:
| Type | Description | 
|---|---|
| Optional[str] | The healthcheck URI for the given service endpoint or None, if the service endpoint doesn't have a healthcheck URI. | 
Source code in zenml/services/service_monitor.py
          def get_healthcheck_uri(
    self, endpoint: "BaseServiceEndpoint"
) -> Optional[str]:
    """Get the healthcheck URI for the given service endpoint.
    Args:
        endpoint: service endpoint to get the healthcheck URI for
    Returns:
        The healthcheck URI for the given service endpoint or None, if
        the service endpoint doesn't have a healthcheck URI.
    """
    uri = endpoint.status.uri
    if not uri:
        return None
    if not self.config.healthcheck_uri_path:
        return uri
    return (
        f"{uri.rstrip('/')}/{self.config.healthcheck_uri_path.lstrip('/')}"
    )
        
HTTPEndpointHealthMonitorConfig            (ServiceEndpointHealthMonitorConfig)
        
    HTTP service endpoint health monitor configuration.
Attributes:
| Name | Type | Description | 
|---|---|---|
| healthcheck_uri_path | str | URI subpath to use to perform service endpoint healthchecks. If not set, the service endpoint URI will be used instead. | 
| use_head_request | bool | set to True to use a HEAD request instead of a GET when calling the healthcheck URI. | 
| http_status_code | int | HTTP status code to expect in the health check response. | 
| http_timeout | int | HTTP health check request timeout in seconds. | 
Source code in zenml/services/service_monitor.py
          class HTTPEndpointHealthMonitorConfig(ServiceEndpointHealthMonitorConfig):
    """HTTP service endpoint health monitor configuration.
    Attributes:
        healthcheck_uri_path: URI subpath to use to perform service endpoint
            healthchecks. If not set, the service endpoint URI will be used
            instead.
        use_head_request: set to True to use a HEAD request instead of a GET
            when calling the healthcheck URI.
        http_status_code: HTTP status code to expect in the health check
            response.
        http_timeout: HTTP health check request timeout in seconds.
    """
    healthcheck_uri_path: str = ""
    use_head_request: bool = False
    http_status_code: int = 200
    http_timeout: int = DEFAULT_HTTP_HEALTHCHECK_TIMEOUT
        
ServiceEndpointHealthMonitorConfig            (BaseTypedModel)
        
    Generic service health monitor configuration.
Concrete service classes should extend this class and add additional attributes that they want to see reflected and use in the health monitor configuration.
Source code in zenml/services/service_monitor.py
          class ServiceEndpointHealthMonitorConfig(BaseTypedModel):
    """Generic service health monitor configuration.
    Concrete service classes should extend this class and add additional
    attributes that they want to see reflected and use in the health monitor
    configuration.
    """
        
TCPEndpointHealthMonitor            (BaseServiceEndpointHealthMonitor)
        
    TCP service endpoint health monitor.
Attributes:
| Name | Type | Description | 
|---|---|---|
| config | TCPEndpointHealthMonitorConfig | health monitor configuration for TCP endpoint | 
Source code in zenml/services/service_monitor.py
          class TCPEndpointHealthMonitor(BaseServiceEndpointHealthMonitor):
    """TCP service endpoint health monitor.
    Attributes:
        config: health monitor configuration for TCP endpoint
    """
    config: TCPEndpointHealthMonitorConfig
    def check_endpoint_status(
        self, endpoint: "BaseServiceEndpoint"
    ) -> Tuple[ServiceState, str]:
        """Run a TCP endpoint healthcheck.
        Args:
            endpoint: service endpoint to check.
        Returns:
            The operational state of the external TCP endpoint and an
            optional message describing that state (e.g. an error message,
            if an error is encountered while checking the TCP endpoint
            status).
        """
        if not endpoint.status.port or not endpoint.status.hostname:
            return (
                ServiceState.ERROR,
                "TCP port and hostname values are not known",
            )
        logger.debug(
            "Running TCP healthcheck for TCP port: %d", endpoint.status.port
        )
        if port_is_open(endpoint.status.hostname, endpoint.status.port):
            # the endpoint is healthy
            return ServiceState.ACTIVE, ""
        return (
            ServiceState.ERROR,
            "TCP endpoint healthcheck error: TCP port is not "
            "open or not accessible",
        )
check_endpoint_status(self, endpoint)
    Run a TCP endpoint healthcheck.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| endpoint | BaseServiceEndpoint | service endpoint to check. | required | 
Returns:
| Type | Description | 
|---|---|
| Tuple[zenml.services.service_status.ServiceState, str] | The operational state of the external TCP endpoint and an optional message describing that state (e.g. an error message, if an error is encountered while checking the TCP endpoint status). | 
Source code in zenml/services/service_monitor.py
          def check_endpoint_status(
    self, endpoint: "BaseServiceEndpoint"
) -> Tuple[ServiceState, str]:
    """Run a TCP endpoint healthcheck.
    Args:
        endpoint: service endpoint to check.
    Returns:
        The operational state of the external TCP endpoint and an
        optional message describing that state (e.g. an error message,
        if an error is encountered while checking the TCP endpoint
        status).
    """
    if not endpoint.status.port or not endpoint.status.hostname:
        return (
            ServiceState.ERROR,
            "TCP port and hostname values are not known",
        )
    logger.debug(
        "Running TCP healthcheck for TCP port: %d", endpoint.status.port
    )
    if port_is_open(endpoint.status.hostname, endpoint.status.port):
        # the endpoint is healthy
        return ServiceState.ACTIVE, ""
    return (
        ServiceState.ERROR,
        "TCP endpoint healthcheck error: TCP port is not "
        "open or not accessible",
    )
        
TCPEndpointHealthMonitorConfig            (ServiceEndpointHealthMonitorConfig)
        
    TCP service endpoint health monitor configuration.
Source code in zenml/services/service_monitor.py
          class TCPEndpointHealthMonitorConfig(ServiceEndpointHealthMonitorConfig):
    """TCP service endpoint health monitor configuration."""
        service_status
    Implementation of the ServiceStatus class.
        
ServiceState            (StrEnum)
        
    Possible states for the service and service endpoint.
Source code in zenml/services/service_status.py
          class ServiceState(StrEnum):
    """Possible states for the service and service endpoint."""
    INACTIVE = "inactive"
    ACTIVE = "active"
    PENDING_STARTUP = "pending_startup"
    PENDING_SHUTDOWN = "pending_shutdown"
    ERROR = "error"
    SCALED_TO_ZERO = "scaled_to_zero"
        
ServiceStatus            (BaseTypedModel)
        
    Information about the status of a service or process.
This information describes the operational status of an external process or service tracked by ZenML. This could be a process, container, Kubernetes deployment etc.
Concrete service classes should extend this class and add additional attributes that make up the operational state of the service.
Attributes:
| Name | Type | Description | 
|---|---|---|
| state | ServiceState | the current operational state | 
| last_state | ServiceState | the operational state prior to the last status update | 
| last_error | str | the error encountered during the last status update | 
Source code in zenml/services/service_status.py
          class ServiceStatus(BaseTypedModel):
    """Information about the status of a service or process.
    This information describes the operational status of an external process or
    service tracked by ZenML. This could be a process, container, Kubernetes
    deployment etc.
    Concrete service classes should extend this class and add additional
    attributes that make up the operational state of the service.
    Attributes:
        state: the current operational state
        last_state: the operational state prior to the last status update
        last_error: the error encountered during the last status update
    """
    state: ServiceState = ServiceState.INACTIVE
    last_state: ServiceState = ServiceState.INACTIVE
    last_error: str = ""
    def update_state(
        self,
        new_state: Optional[ServiceState] = None,
        error: str = "",
    ) -> None:
        """Update the current operational state to reflect a new state value and/or error.
        Args:
            new_state: new operational state discovered by the last service
                status update
            error: error message describing an operational failure encountered
                during the last service status update
        """
        if new_state and self.state != new_state:
            self.last_state = self.state
            self.state = new_state
        if error:
            self.last_error = error
    def clear_error(self) -> None:
        """Clear the last error message."""
        self.last_error = ""
clear_error(self)
    Clear the last error message.
Source code in zenml/services/service_status.py
          def clear_error(self) -> None:
    """Clear the last error message."""
    self.last_error = ""
update_state(self, new_state=None, error='')
    Update the current operational state to reflect a new state value and/or error.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| new_state | Optional[zenml.services.service_status.ServiceState] | new operational state discovered by the last service status update | None | 
| error | str | error message describing an operational failure encountered during the last service status update | '' | 
Source code in zenml/services/service_status.py
          def update_state(
    self,
    new_state: Optional[ServiceState] = None,
    error: str = "",
) -> None:
    """Update the current operational state to reflect a new state value and/or error.
    Args:
        new_state: new operational state discovered by the last service
            status update
        error: error message describing an operational failure encountered
            during the last service status update
    """
    if new_state and self.state != new_state:
        self.last_state = self.state
        self.state = new_state
    if error:
        self.last_error = error
        service_type
    Implementation of a ZenML ServiceType class.
        
ServiceType            (BaseModel)
        
    Service type descriptor.
Attributes:
| Name | Type | Description | 
|---|---|---|
| type | str | service type | 
| flavor | str | service flavor | 
| name | str | name of the service type | 
| description | str | description of the service type | 
| logo_url | str | logo of the service type | 
Source code in zenml/services/service_type.py
          class ServiceType(BaseModel):
    """Service type descriptor.
    Attributes:
        type: service type
        flavor: service flavor
        name: name of the service type
        description: description of the service type
        logo_url: logo of the service type
    """
    type: str
    flavor: str
    name: str = ""
    description: str = ""
    logo_url: str = ""
    model_config = ConfigDict(
        # make the service type immutable and hashable
        frozen=True
    )
        terraform
  
      special
  
    Initialization of a Terraform ZenML service.
        terraform_service
    Implementation of a Terraform ZenML service.
        
TerraformService            (BaseService)
        
    A service represented by a set of resources deployed using a terraform recipe.
This class extends the base service class with functionality concerning the life-cycle management and tracking of external services managed using terraform recipes.
Attributes:
| Name | Type | Description | 
|---|---|---|
| config | TerraformServiceConfig | service configuration | 
| status | TerraformServiceStatus | service status | 
Source code in zenml/services/terraform/terraform_service.py
          class TerraformService(BaseService):
    """A service represented by a set of resources deployed using a terraform recipe.
    This class extends the base service class with functionality concerning
    the life-cycle management and tracking of external services managed using
    terraform recipes.
    Attributes:
        config: service configuration
        status: service status
    """
    config: TerraformServiceConfig
    status: TerraformServiceStatus = Field(
        default_factory=TerraformServiceStatus
    )
    _terraform_client: Optional[python_terraform.Terraform] = None
    @property
    def terraform_client(self) -> python_terraform.Terraform:
        """Initialize and/or return the terraform client.
        Returns:
            The terraform client.
        """
        if self._terraform_client is None:
            working_dir = self.config.directory_path
            if self.config.copy_terraform_files:
                assert self.status.runtime_path is not None
                working_dir = self.status.runtime_path
            self._terraform_client = python_terraform.Terraform(
                working_dir=working_dir,
            )
        return self._terraform_client
    def check_status(self) -> Tuple[ServiceState, str]:
        """Check the the current operational state of the external service.
        If the final output name provided in the config exists as a non-null value,
        then it's reasonable to assume that the service is up and running.
        Returns:
            The operational state of the external service and a message
            providing additional information about that state (e.g. a
            description of the error if one is encountered while checking the
            service status).
        """
        code, out, err = self.terraform_client.plan(
            detailed_exitcode=True,
            refresh=False,
            var=self.get_vars(),
            input=False,
            raise_on_error=False,
        )
        if code == 0:
            return (ServiceState.ACTIVE, "The deployment is active.")
        elif code == 2:
            return (
                ServiceState.INACTIVE,
                "The deployment isn't active or needs an update.",
            )
        else:
            return (ServiceState.ERROR, f"Deployment error: \n{err}")
    def _update_service_config(self) -> None:
        """Update the service configuration file.
        This function is called after the service has been started, to update
        the service configuration file with the runtime path of the service.
        """
        # write the service information in the service config file
        assert self.status.config_file is not None
        with open(self.status.config_file, "w") as f:
            f.write(self.model_dump_json(indent=4))
    def _write_vars_to_file(self, vars: Dict[str, Any]) -> None:
        """Write variables to the variables file.
        Args:
            vars: The variables to write to the file.
        """
        import json
        path = self.terraform_client.working_dir
        variables_file_path = os.path.join(
            path, self.config.variables_file_path
        )
        with open(variables_file_path, "w") as f:
            json.dump(vars, f)
    def _init_and_apply(self) -> None:
        """Function to call terraform init and terraform apply.
        The init call is not repeated if any successful execution has
        happened already, to save time.
        Raises:
            RuntimeError: if init or apply function fails.
        """
        self._update_service_config()
        # this directory gets created after a successful init
        previous_run_dir = os.path.join(
            self.terraform_client.working_dir, ".ignoreme"
        )
        if fileio.exists(previous_run_dir):
            logger.info(
                "Terraform already initialized, "
                "terraform init will not be executed."
            )
        else:
            ret_code, _, _ = self.terraform_client.init(capture_output=False)
            if ret_code != 0:
                raise RuntimeError("The command 'terraform init' failed.")
            fileio.mkdir(previous_run_dir)
        # get variables from the recipe as a python dictionary
        vars = self.get_vars()
        # once init is successful, call terraform apply
        self.terraform_client.apply(
            var=vars,
            input=False,
            capture_output=False,
            raise_on_error=True,
            refresh=False,
        )
        # write variables to the variable file after execution is successful
        self._write_vars_to_file(vars)
    def get_vars(self) -> Dict[str, Any]:
        """Get variables as a dictionary from values.tfvars.json.
        Returns:
            A dictionary of variables to use for the stack recipes
            derived from the tfvars.json file.
        Raises:
            FileNotFoundError: if the values.tfvars.json file is not
                found in the stack recipe.
            TypeError: if the file doesn't contain a dictionary of variables.
        """
        import json
        path = self.terraform_client.working_dir
        variables_file_path = os.path.join(
            path, self.config.variables_file_path
        )
        if not fileio.exists(variables_file_path):
            raise FileNotFoundError(
                "The file values.tfvars.json was not found in the "
                f"recipe's directory at {variables_file_path}. Please "
                "verify if it exists."
            )
        # read values into a dict and return
        with fileio.open(variables_file_path, "r") as f:
            variables = json.load(f)
        if not isinstance(variables, dict):
            raise TypeError(
                "The values.tfvars.json file must contain a dictionary "
                "of variables."
            )
        return variables
    def _destroy(self) -> None:
        """Function to call terraform destroy on the given path."""
        # get variables from the recipe as a python dictionary
        vars = self.get_vars()
        self.terraform_client.destroy(
            var=vars,
            capture_output=False,
            raise_on_error=True,
            force=python_terraform.IsNotFlagged,
            refresh=False,
        )
        # set empty vars to the file
    def _setup_runtime_path(self) -> None:
        """Set up the runtime path for the service.
        This method sets up the runtime path for the service.
        """
        # reuse the config file and logfile location from a previous run,
        # if available
        copy_terraform_files = True
        if not self.status.runtime_path or not os.path.exists(
            self.status.runtime_path
        ):
            if self.config.root_runtime_path:
                if self.config.singleton:
                    self.status.runtime_path = self.config.root_runtime_path
                else:
                    self.status.runtime_path = os.path.join(
                        self.config.root_runtime_path,
                        str(self.uuid),
                    )
                if fileio.isdir(self.status.runtime_path):
                    copy_terraform_files = False
                else:
                    create_dir_recursive_if_not_exists(
                        str(self.status.runtime_path)
                    )
            else:
                self.status.runtime_path = tempfile.mkdtemp(
                    prefix="zenml-service-"
                )
            if copy_terraform_files and self.config.copy_terraform_files:
                copy_dir(
                    self.config.directory_path,
                    self.status.runtime_path,
                )
    def provision(self) -> None:
        """Provision the service."""
        self._setup_runtime_path()
        self.check_installation()
        self._set_log_level()
        self._init_and_apply()
    def deprovision(self, force: bool = False) -> None:
        """Deprovision the service.
        Args:
            force: if True, the service will be deprovisioned even if it is
                in a failed state.
        """
        self.check_installation()
        self._set_log_level()
        self._destroy()
        # in case of singleton services, this will remove the config
        # path as a whole and otherwise, this removes the specific UUID
        # directory
        assert self.status.config_file is not None
        shutil.rmtree(Path(self.status.config_file).parent)
    # overwriting the start/stop function to remove the progress indicator
    # having which doesn't allow tf logs to be shown in stdout
    def start(self, timeout: int = 0) -> None:
        """Start the service and optionally wait for it to become active.
        Args:
            timeout: amount of time to wait for the service to become active.
                If set to 0, the method will return immediately after checking
                the service status.
        """
        self.admin_state = ServiceState.ACTIVE
        self.provision()
    def stop(self, timeout: int = 0, force: bool = False) -> None:
        """Stop the service and optionally wait for it to shutdown.
        Args:
            timeout: amount of time to wait for the service to shutdown.
                If set to 0, the method will return immediately after checking
                the service status.
            force: if True, the service will be forcefully stopped.
        """
        self.admin_state = ServiceState.INACTIVE
        self.deprovision()
    def get_logs(
        self, follow: bool = False, tail: Optional[int] = None
    ) -> Generator[str, bool, None]:
        """Retrieve the service logs.
        Args:
            follow: if True, the logs will be streamed as they are written
            tail: only retrieve the last NUM lines of log output.
        Raises:
            NotImplementedError: not implemented.
        """
        raise NotImplementedError(
            "This method is not available for Terraform services."
        )
    def get_outputs(self, output: Optional[str] = None) -> Dict[str, Any]:
        """Get outputs from the terraform state.
        Args:
            output: if specified, only the output with the given name will be
                returned. Otherwise, all outputs will be returned.
        Returns:
            A dictionary of outputs from the terraform state.
        """
        if output:
            # if output is specified, then full_outputs is just a string
            full_outputs = self.terraform_client.output(
                output, full_value=True
            )
            return {output: full_outputs}
        else:
            # get value of the "value" key in the value of full_outputs
            # and assign it to the key in the output dict
            full_outputs = self.terraform_client.output(full_value=True)
            outputs = {k: v["value"] for k, v in full_outputs.items()}
            return outputs
    def check_installation(self) -> None:
        """Checks if necessary tools are installed on the host system.
        Raises:
            RuntimeError: if any required tool is not installed.
        """
        if not self._is_terraform_installed():
            raise RuntimeError(
                "Terraform is required for stack recipes to run and was not "
                "found installed on your machine or not available on  "
                "your $PATH. Please visit "
                "https://learn.hashicorp.com/tutorials/terraform/install-cli "
                "to install it."
            )
    def _is_terraform_installed(self) -> bool:
        """Checks if terraform is installed on the host system.
        Returns:
            True if terraform is installed, false otherwise.
        """
        # check terraform version to verify installation.
        try:
            self.terraform_client.cmd("-version")
        except FileNotFoundError:
            return False
        return True
    def _set_log_level(self) -> None:
        """Set TF_LOG env var to the log_level provided by the user."""
        os.environ["TF_LOG"] = self.config.log_level
terraform_client: python_terraform.Terraform
  
      property
      readonly
  
    Initialize and/or return the terraform client.
Returns:
| Type | Description | 
|---|---|
| python_terraform.Terraform | The terraform client. | 
check_installation(self)
    Checks if necessary tools are installed on the host system.
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | if any required tool is not installed. | 
Source code in zenml/services/terraform/terraform_service.py
          def check_installation(self) -> None:
    """Checks if necessary tools are installed on the host system.
    Raises:
        RuntimeError: if any required tool is not installed.
    """
    if not self._is_terraform_installed():
        raise RuntimeError(
            "Terraform is required for stack recipes to run and was not "
            "found installed on your machine or not available on  "
            "your $PATH. Please visit "
            "https://learn.hashicorp.com/tutorials/terraform/install-cli "
            "to install it."
        )
check_status(self)
    Check the the current operational state of the external service.
If the final output name provided in the config exists as a non-null value, then it's reasonable to assume that the service is up and running.
Returns:
| Type | Description | 
|---|---|
| Tuple[zenml.services.service_status.ServiceState, str] | The operational state of the external service and a message providing additional information about that state (e.g. a description of the error if one is encountered while checking the service status). | 
Source code in zenml/services/terraform/terraform_service.py
          def check_status(self) -> Tuple[ServiceState, str]:
    """Check the the current operational state of the external service.
    If the final output name provided in the config exists as a non-null value,
    then it's reasonable to assume that the service is up and running.
    Returns:
        The operational state of the external service and a message
        providing additional information about that state (e.g. a
        description of the error if one is encountered while checking the
        service status).
    """
    code, out, err = self.terraform_client.plan(
        detailed_exitcode=True,
        refresh=False,
        var=self.get_vars(),
        input=False,
        raise_on_error=False,
    )
    if code == 0:
        return (ServiceState.ACTIVE, "The deployment is active.")
    elif code == 2:
        return (
            ServiceState.INACTIVE,
            "The deployment isn't active or needs an update.",
        )
    else:
        return (ServiceState.ERROR, f"Deployment error: \n{err}")
deprovision(self, force=False)
    Deprovision the service.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| force | bool | if True, the service will be deprovisioned even if it is in a failed state. | False | 
Source code in zenml/services/terraform/terraform_service.py
          def deprovision(self, force: bool = False) -> None:
    """Deprovision the service.
    Args:
        force: if True, the service will be deprovisioned even if it is
            in a failed state.
    """
    self.check_installation()
    self._set_log_level()
    self._destroy()
    # in case of singleton services, this will remove the config
    # path as a whole and otherwise, this removes the specific UUID
    # directory
    assert self.status.config_file is not None
    shutil.rmtree(Path(self.status.config_file).parent)
get_logs(self, follow=False, tail=None)
    Retrieve the service logs.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| follow | bool | if True, the logs will be streamed as they are written | False | 
| tail | Optional[int] | only retrieve the last NUM lines of log output. | None | 
Exceptions:
| Type | Description | 
|---|---|
| NotImplementedError | not implemented. | 
Source code in zenml/services/terraform/terraform_service.py
          def get_logs(
    self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
    """Retrieve the service logs.
    Args:
        follow: if True, the logs will be streamed as they are written
        tail: only retrieve the last NUM lines of log output.
    Raises:
        NotImplementedError: not implemented.
    """
    raise NotImplementedError(
        "This method is not available for Terraform services."
    )
get_outputs(self, output=None)
    Get outputs from the terraform state.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| output | Optional[str] | if specified, only the output with the given name will be returned. Otherwise, all outputs will be returned. | None | 
Returns:
| Type | Description | 
|---|---|
| Dict[str, Any] | A dictionary of outputs from the terraform state. | 
Source code in zenml/services/terraform/terraform_service.py
          def get_outputs(self, output: Optional[str] = None) -> Dict[str, Any]:
    """Get outputs from the terraform state.
    Args:
        output: if specified, only the output with the given name will be
            returned. Otherwise, all outputs will be returned.
    Returns:
        A dictionary of outputs from the terraform state.
    """
    if output:
        # if output is specified, then full_outputs is just a string
        full_outputs = self.terraform_client.output(
            output, full_value=True
        )
        return {output: full_outputs}
    else:
        # get value of the "value" key in the value of full_outputs
        # and assign it to the key in the output dict
        full_outputs = self.terraform_client.output(full_value=True)
        outputs = {k: v["value"] for k, v in full_outputs.items()}
        return outputs
get_vars(self)
    Get variables as a dictionary from values.tfvars.json.
Returns:
| Type | Description | 
|---|---|
| Dict[str, Any] | A dictionary of variables to use for the stack recipes derived from the tfvars.json file. | 
Exceptions:
| Type | Description | 
|---|---|
| FileNotFoundError | if the values.tfvars.json file is not found in the stack recipe. | 
| TypeError | if the file doesn't contain a dictionary of variables. | 
Source code in zenml/services/terraform/terraform_service.py
          def get_vars(self) -> Dict[str, Any]:
    """Get variables as a dictionary from values.tfvars.json.
    Returns:
        A dictionary of variables to use for the stack recipes
        derived from the tfvars.json file.
    Raises:
        FileNotFoundError: if the values.tfvars.json file is not
            found in the stack recipe.
        TypeError: if the file doesn't contain a dictionary of variables.
    """
    import json
    path = self.terraform_client.working_dir
    variables_file_path = os.path.join(
        path, self.config.variables_file_path
    )
    if not fileio.exists(variables_file_path):
        raise FileNotFoundError(
            "The file values.tfvars.json was not found in the "
            f"recipe's directory at {variables_file_path}. Please "
            "verify if it exists."
        )
    # read values into a dict and return
    with fileio.open(variables_file_path, "r") as f:
        variables = json.load(f)
    if not isinstance(variables, dict):
        raise TypeError(
            "The values.tfvars.json file must contain a dictionary "
            "of variables."
        )
    return variables
model_post_init(/, self, context)
    This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| self | BaseModel | The BaseModel instance. | required | 
| context | Any | The context. | required | 
Source code in zenml/services/terraform/terraform_service.py
          def init_private_attributes(self: BaseModel, context: Any, /) -> None:
    """This function is meant to behave like a BaseModel method to initialise private attributes.
    It takes context as an argument since that's what pydantic-core passes when calling it.
    Args:
        self: The BaseModel instance.
        context: The context.
    """
    if getattr(self, '__pydantic_private__', None) is None:
        pydantic_private = {}
        for name, private_attr in self.__private_attributes__.items():
            default = private_attr.get_default()
            if default is not PydanticUndefined:
                pydantic_private[name] = default
        object_setattr(self, '__pydantic_private__', pydantic_private)
provision(self)
    Provision the service.
Source code in zenml/services/terraform/terraform_service.py
          def provision(self) -> None:
    """Provision the service."""
    self._setup_runtime_path()
    self.check_installation()
    self._set_log_level()
    self._init_and_apply()
start(self, timeout=0)
    Start the service and optionally wait for it to become active.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| timeout | int | amount of time to wait for the service to become active. If set to 0, the method will return immediately after checking the service status. | 0 | 
Source code in zenml/services/terraform/terraform_service.py
          def start(self, timeout: int = 0) -> None:
    """Start the service and optionally wait for it to become active.
    Args:
        timeout: amount of time to wait for the service to become active.
            If set to 0, the method will return immediately after checking
            the service status.
    """
    self.admin_state = ServiceState.ACTIVE
    self.provision()
stop(self, timeout=0, force=False)
    Stop the service and optionally wait for it to shutdown.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| timeout | int | amount of time to wait for the service to shutdown. If set to 0, the method will return immediately after checking the service status. | 0 | 
| force | bool | if True, the service will be forcefully stopped. | False | 
Source code in zenml/services/terraform/terraform_service.py
          def stop(self, timeout: int = 0, force: bool = False) -> None:
    """Stop the service and optionally wait for it to shutdown.
    Args:
        timeout: amount of time to wait for the service to shutdown.
            If set to 0, the method will return immediately after checking
            the service status.
        force: if True, the service will be forcefully stopped.
    """
    self.admin_state = ServiceState.INACTIVE
    self.deprovision()
        
TerraformServiceConfig            (ServiceConfig)
        
    Terraform service configuration.
Attributes:
| Name | Type | Description | 
|---|---|---|
| root_runtime_path | str | the root path where the service stores its files. | 
| singleton | bool | set to True to store the service files directly in the
 | 
| directory_path | str | the path to the directory that hosts all the HCL files. | 
| copy_terraform_files | bool | whether to copy the HCL files to the service runtime directory. | 
| log_level | str | the log level to set the terraform client to. Choose one of TRACE, DEBUG, INFO, WARN or ERROR (case insensitive). | 
| variables_file_path | str | the path to the file that stores all variable values. | 
Source code in zenml/services/terraform/terraform_service.py
          class TerraformServiceConfig(ServiceConfig):
    """Terraform service configuration.
    Attributes:
        root_runtime_path: the root path where the service stores its files.
        singleton: set to True to store the service files directly in the
            `root_runtime_path` directory instead of creating a subdirectory for
            each service instance. Only has effect if the `root_runtime_path` is
            also set.
        directory_path: the path to the directory that hosts all the HCL files.
        copy_terraform_files: whether to copy the HCL files to the service
            runtime directory.
        log_level: the log level to set the terraform client to. Choose one of
            TRACE, DEBUG, INFO, WARN or ERROR (case insensitive).
        variables_file_path: the path to the file that stores all variable values.
    """
    root_runtime_path: str
    singleton: bool = False
    directory_path: str
    copy_terraform_files: bool = False
    log_level: str = "ERROR"
    variables_file_path: str = "values.tfvars.json"
        
TerraformServiceStatus            (ServiceStatus)
        
    Terraform service status.
Attributes:
| Name | Type | Description | 
|---|---|---|
| runtime_path | Optional[str] | the path where the service files (e.g. the configuration file used to start the service daemon and the logfile) are located | 
Source code in zenml/services/terraform/terraform_service.py
          class TerraformServiceStatus(ServiceStatus):
    """Terraform service status.
    Attributes:
        runtime_path: the path where the service files (e.g. the configuration
            file used to start the service daemon and the logfile) are located
    """
    runtime_path: Optional[str] = None
    @property
    def config_file(self) -> Optional[str]:
        """Get the path to the service configuration file.
        Returns:
            The path to the configuration file, or None, if the
            service has never been started before.
        """
        if not self.runtime_path:
            return None
        return os.path.join(self.runtime_path, SERVICE_CONFIG_FILE_NAME)
    @property
    def log_file(self) -> Optional[str]:
        """Get the path to the log file where the service output is/has been logged.
        Returns:
            The path to the log file, or None, if the service has never been
            started before.
        """
        if not self.runtime_path:
            return None
        return os.path.join(self.runtime_path, SERVICE_LOG_FILE_NAME)
config_file: Optional[str]
  
      property
      readonly
  
    Get the path to the service configuration file.
Returns:
| Type | Description | 
|---|---|
| Optional[str] | The path to the configuration file, or None, if the service has never been started before. | 
log_file: Optional[str]
  
      property
      readonly
  
    Get the path to the log file where the service output is/has been logged.
Returns:
| Type | Description | 
|---|---|
| Optional[str] | The path to the log file, or None, if the service has never been started before. |