Skip to content

Vllm

zenml.integrations.vllm special

Initialization for the ZenML vLLM integration.

VLLMIntegration (Integration)

Definition of vLLM integration for ZenML.

Source code in zenml/integrations/vllm/__init__.py
class VLLMIntegration(Integration):
    """Definition of vLLM integration for ZenML."""

    NAME = VLLM

    REQUIREMENTS = ["vllm>=0.6.0,<0.7.0", "openai>=1.0.0"]

    @classmethod
    def activate(cls) -> None:
        """Activates the integration."""
        from zenml.integrations.vllm import services

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the vLLM integration.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.vllm.flavors import VLLMModelDeployerFlavor

        return [VLLMModelDeployerFlavor]

activate() classmethod

Activates the integration.

Source code in zenml/integrations/vllm/__init__.py
@classmethod
def activate(cls) -> None:
    """Activates the integration."""
    from zenml.integrations.vllm import services

flavors() classmethod

Declare the stack component flavors for the vLLM integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/vllm/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the vLLM integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.vllm.flavors import VLLMModelDeployerFlavor

    return [VLLMModelDeployerFlavor]

flavors special

vLLM integration flavors.

vllm_model_deployer_flavor

vLLM model deployer flavor.

VLLMModelDeployerConfig (BaseModelDeployerConfig)

Configuration for vLLM Inference model deployer.

Source code in zenml/integrations/vllm/flavors/vllm_model_deployer_flavor.py
class VLLMModelDeployerConfig(BaseModelDeployerConfig):
    """Configuration for vLLM Inference model deployer."""

    service_path: str = ""
VLLMModelDeployerFlavor (BaseModelDeployerFlavor)

vLLM model deployer flavor.

Source code in zenml/integrations/vllm/flavors/vllm_model_deployer_flavor.py
class VLLMModelDeployerFlavor(BaseModelDeployerFlavor):
    """vLLM model deployer flavor."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return VLLM_MODEL_DEPLOYER

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_deployer/vllm.png"

    @property
    def config_class(self) -> Type[VLLMModelDeployerConfig]:
        """Returns `VLLMModelDeployerConfig` config class.

        Returns:
            The config class.
        """
        return VLLMModelDeployerConfig

    @property
    def implementation_class(self) -> Type["VLLMModelDeployer"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.vllm.model_deployers import VLLMModelDeployer

        return VLLMModelDeployer
config_class: Type[zenml.integrations.vllm.flavors.vllm_model_deployer_flavor.VLLMModelDeployerConfig] property readonly

Returns VLLMModelDeployerConfig config class.

Returns:

Type Description
Type[zenml.integrations.vllm.flavors.vllm_model_deployer_flavor.VLLMModelDeployerConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[VLLMModelDeployer] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[VLLMModelDeployer]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

model_deployers special

Initialization of the vLLM model deployers.

vllm_model_deployer

Implementation of the vLLM Model Deployer.

VLLMModelDeployer (BaseModelDeployer)

vLLM Inference Server.

Source code in zenml/integrations/vllm/model_deployers/vllm_model_deployer.py
class VLLMModelDeployer(BaseModelDeployer):
    """vLLM Inference Server."""

    NAME: ClassVar[str] = "VLLM"
    FLAVOR: ClassVar[Type[BaseModelDeployerFlavor]] = VLLMModelDeployerFlavor

    _service_path: Optional[str] = None

    @property
    def config(self) -> VLLMModelDeployerConfig:
        """Returns the `VLLMModelDeployerConfig` config.

        Returns:
            The configuration.
        """
        return cast(VLLMModelDeployerConfig, self._config)

    @staticmethod
    def get_service_path(id_: UUID) -> str:
        """Get the path where local vLLM service information is stored.

        This includes the deployment service configuration, PID and log files
        are stored.

        Args:
            id_: The ID of the vLLM model deployer.

        Returns:
            The service path.
        """
        service_path = os.path.join(
            GlobalConfiguration().local_stores_path,
            str(id_),
        )
        create_dir_recursive_if_not_exists(service_path)
        return service_path

    @property
    def local_path(self) -> str:
        """Returns the path to the root directory.

        This is where all configurations for vLLM deployment daemon processes
        are stored.

        If the service path is not set in the config by the user, the path is
        set to a local default path according to the component ID.

        Returns:
            The path to the local service root directory.
        """
        if self._service_path is not None:
            return self._service_path

        if self.config.service_path:
            self._service_path = self.config.service_path
        else:
            self._service_path = self.get_service_path(self.id)

        create_dir_recursive_if_not_exists(self._service_path)
        return self._service_path

    @staticmethod
    def get_model_server_info(  # type: ignore[override]
        service_instance: "VLLMDeploymentService",
    ) -> Dict[str, Optional[str]]:
        """Return implementation specific information on the model server.

        Args:
            service_instance: vLLM deployment service object

        Returns:
            A dictionary containing the model server information.
        """
        return {
            "HEALTH_CHECK_URL": service_instance.get_healthcheck_url(),
            "PREDICTION_URL": service_instance.get_prediction_url(),
            "SERVICE_PATH": service_instance.status.runtime_path,
            "DAEMON_PID": str(service_instance.status.pid),
        }

    def perform_deploy_model(
        self,
        id: UUID,
        config: ServiceConfig,
        timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
    ) -> BaseService:
        """Create a new vLLM deployment service or update an existing one.

        This should serve the supplied model and deployment configuration.

        This method has two modes of operation, depending on the `replace`
        argument value:

          * if `replace` is False, calling this method will create a new vLLM
            deployment server to reflect the model and other configuration
            parameters specified in the supplied vLLM service `config`.

          * if `replace` is True, this method will first attempt to find an
            existing vLLM deployment service that is *equivalent* to the
            supplied configuration parameters. Two or more vLLM deployment
            services are considered equivalent if they have the same
            `pipeline_name`, `pipeline_step_name` and `model_name` configuration
            parameters. To put it differently, two vLLM deployment services
            are equivalent if they serve versions of the same model deployed by
            the same pipeline step. If an equivalent vLLM deployment is found,
            it will be updated in place to reflect the new configuration
            parameters.

        Callers should set `replace` to True if they want a continuous model
        deployment workflow that doesn't spin up a new vLLM deployment
        server for each new model version. If multiple equivalent vLLM
        deployment servers are found, one is selected at random to be updated
        and the others are deleted.

        Args:
            id: the UUID of the vLLM model deployer.
            config: the configuration of the model to be deployed with vLLM.
            timeout: the timeout in seconds to wait for the vLLM server
                to be provisioned and successfully started or updated. If set
                to 0, the method will return immediately after the vLLM
                server is provisioned, without waiting for it to fully start.

        Returns:
            The ZenML vLLM deployment service object that can be used to
            interact with the vLLM model http server.
        """
        config = cast(VLLMServiceConfig, config)
        service = self._create_new_service(
            id=id, timeout=timeout, config=config
        )
        logger.info(f"Created a new vLLM deployment service: {service}")
        return service

    def _clean_up_existing_service(
        self,
        timeout: int,
        force: bool,
        existing_service: VLLMDeploymentService,
    ) -> None:
        # stop the older service
        existing_service.stop(timeout=timeout, force=force)

        # delete the old configuration file
        if existing_service.status.runtime_path:
            shutil.rmtree(existing_service.status.runtime_path)

    # the step will receive a config from the user that mentions the number
    # of workers etc.the step implementation will create a new config using
    # all values from the user and add values like pipeline name, model_uri
    def _create_new_service(
        self, id: UUID, timeout: int, config: VLLMServiceConfig
    ) -> VLLMDeploymentService:
        """Creates a new VLLMDeploymentService.

        Args:
            id: the ID of the vLLM deployment service to be created or updated.
            timeout: the timeout in seconds to wait for the vLLM server
                to be provisioned and successfully started or updated.
            config: the configuration of the model to be deployed with vLLM.

        Returns:
            The VLLMDeploymentService object that can be used to interact
            with the vLLM model server.
        """
        # set the root runtime path with the stack component's UUID
        config.root_runtime_path = self.local_path
        # create a new service for the new model
        service = VLLMDeploymentService(uuid=id, config=config)
        service.start(timeout=timeout)

        return service

    def perform_stop_model(
        self,
        service: BaseService,
        timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
        force: bool = False,
    ) -> BaseService:
        """Method to stop a model server.

        Args:
            service: The service to stop.
            timeout: Timeout in seconds to wait for the service to stop.
            force: If True, force the service to stop.

        Returns:
            The stopped service.
        """
        service.stop(timeout=timeout, force=force)
        return service

    def perform_start_model(
        self,
        service: BaseService,
        timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
    ) -> BaseService:
        """Method to start a model server.

        Args:
            service: The service to start.
            timeout: Timeout in seconds to wait for the service to start.

        Returns:
            The started service.
        """
        service.start(timeout=timeout)
        return service

    def perform_delete_model(
        self,
        service: BaseService,
        timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
        force: bool = False,
    ) -> None:
        """Method to delete all configuration of a model server.

        Args:
            service: The service to delete.
            timeout: Timeout in seconds to wait for the service to stop.
            force: If True, force the service to stop.
        """
        service = cast(VLLMDeploymentService, service)
        self._clean_up_existing_service(
            existing_service=service, timeout=timeout, force=force
        )
config: VLLMModelDeployerConfig property readonly

Returns the VLLMModelDeployerConfig config.

Returns:

Type Description
VLLMModelDeployerConfig

The configuration.

local_path: str property readonly

Returns the path to the root directory.

This is where all configurations for vLLM deployment daemon processes are stored.

If the service path is not set in the config by the user, the path is set to a local default path according to the component ID.

Returns:

Type Description
str

The path to the local service root directory.

FLAVOR (BaseModelDeployerFlavor)

vLLM model deployer flavor.

Source code in zenml/integrations/vllm/model_deployers/vllm_model_deployer.py
class VLLMModelDeployerFlavor(BaseModelDeployerFlavor):
    """vLLM model deployer flavor."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return VLLM_MODEL_DEPLOYER

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_deployer/vllm.png"

    @property
    def config_class(self) -> Type[VLLMModelDeployerConfig]:
        """Returns `VLLMModelDeployerConfig` config class.

        Returns:
            The config class.
        """
        return VLLMModelDeployerConfig

    @property
    def implementation_class(self) -> Type["VLLMModelDeployer"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.vllm.model_deployers import VLLMModelDeployer

        return VLLMModelDeployer
config_class: Type[zenml.integrations.vllm.flavors.vllm_model_deployer_flavor.VLLMModelDeployerConfig] property readonly

Returns VLLMModelDeployerConfig config class.

Returns:

Type Description
Type[zenml.integrations.vllm.flavors.vllm_model_deployer_flavor.VLLMModelDeployerConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[VLLMModelDeployer] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[VLLMModelDeployer]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

get_model_server_info(service_instance) staticmethod

Return implementation specific information on the model server.

Parameters:

Name Type Description Default
service_instance VLLMDeploymentService

vLLM deployment service object

required

Returns:

Type Description
Dict[str, Optional[str]]

A dictionary containing the model server information.

Source code in zenml/integrations/vllm/model_deployers/vllm_model_deployer.py
@staticmethod
def get_model_server_info(  # type: ignore[override]
    service_instance: "VLLMDeploymentService",
) -> Dict[str, Optional[str]]:
    """Return implementation specific information on the model server.

    Args:
        service_instance: vLLM deployment service object

    Returns:
        A dictionary containing the model server information.
    """
    return {
        "HEALTH_CHECK_URL": service_instance.get_healthcheck_url(),
        "PREDICTION_URL": service_instance.get_prediction_url(),
        "SERVICE_PATH": service_instance.status.runtime_path,
        "DAEMON_PID": str(service_instance.status.pid),
    }
get_service_path(id_) staticmethod

Get the path where local vLLM service information is stored.

This includes the deployment service configuration, PID and log files are stored.

Parameters:

Name Type Description Default
id_ UUID

The ID of the vLLM model deployer.

required

Returns:

Type Description
str

The service path.

Source code in zenml/integrations/vllm/model_deployers/vllm_model_deployer.py
@staticmethod
def get_service_path(id_: UUID) -> str:
    """Get the path where local vLLM service information is stored.

    This includes the deployment service configuration, PID and log files
    are stored.

    Args:
        id_: The ID of the vLLM model deployer.

    Returns:
        The service path.
    """
    service_path = os.path.join(
        GlobalConfiguration().local_stores_path,
        str(id_),
    )
    create_dir_recursive_if_not_exists(service_path)
    return service_path
perform_delete_model(self, service, timeout=60, force=False)

Method to delete all configuration of a model server.

Parameters:

Name Type Description Default
service BaseService

The service to delete.

required
timeout int

Timeout in seconds to wait for the service to stop.

60
force bool

If True, force the service to stop.

False
Source code in zenml/integrations/vllm/model_deployers/vllm_model_deployer.py
def perform_delete_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Method to delete all configuration of a model server.

    Args:
        service: The service to delete.
        timeout: Timeout in seconds to wait for the service to stop.
        force: If True, force the service to stop.
    """
    service = cast(VLLMDeploymentService, service)
    self._clean_up_existing_service(
        existing_service=service, timeout=timeout, force=force
    )
perform_deploy_model(self, id, config, timeout=60)

Create a new vLLM deployment service or update an existing one.

This should serve the supplied model and deployment configuration.

This method has two modes of operation, depending on the replace argument value:

  • if replace is False, calling this method will create a new vLLM deployment server to reflect the model and other configuration parameters specified in the supplied vLLM service config.

  • if replace is True, this method will first attempt to find an existing vLLM deployment service that is equivalent to the supplied configuration parameters. Two or more vLLM deployment services are considered equivalent if they have the same pipeline_name, pipeline_step_name and model_name configuration parameters. To put it differently, two vLLM deployment services are equivalent if they serve versions of the same model deployed by the same pipeline step. If an equivalent vLLM deployment is found, it will be updated in place to reflect the new configuration parameters.

Callers should set replace to True if they want a continuous model deployment workflow that doesn't spin up a new vLLM deployment server for each new model version. If multiple equivalent vLLM deployment servers are found, one is selected at random to be updated and the others are deleted.

Parameters:

Name Type Description Default
id UUID

the UUID of the vLLM model deployer.

required
config ServiceConfig

the configuration of the model to be deployed with vLLM.

required
timeout int

the timeout in seconds to wait for the vLLM server to be provisioned and successfully started or updated. If set to 0, the method will return immediately after the vLLM server is provisioned, without waiting for it to fully start.

60

Returns:

Type Description
BaseService

The ZenML vLLM deployment service object that can be used to interact with the vLLM model http server.

Source code in zenml/integrations/vllm/model_deployers/vllm_model_deployer.py
def perform_deploy_model(
    self,
    id: UUID,
    config: ServiceConfig,
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
) -> BaseService:
    """Create a new vLLM deployment service or update an existing one.

    This should serve the supplied model and deployment configuration.

    This method has two modes of operation, depending on the `replace`
    argument value:

      * if `replace` is False, calling this method will create a new vLLM
        deployment server to reflect the model and other configuration
        parameters specified in the supplied vLLM service `config`.

      * if `replace` is True, this method will first attempt to find an
        existing vLLM deployment service that is *equivalent* to the
        supplied configuration parameters. Two or more vLLM deployment
        services are considered equivalent if they have the same
        `pipeline_name`, `pipeline_step_name` and `model_name` configuration
        parameters. To put it differently, two vLLM deployment services
        are equivalent if they serve versions of the same model deployed by
        the same pipeline step. If an equivalent vLLM deployment is found,
        it will be updated in place to reflect the new configuration
        parameters.

    Callers should set `replace` to True if they want a continuous model
    deployment workflow that doesn't spin up a new vLLM deployment
    server for each new model version. If multiple equivalent vLLM
    deployment servers are found, one is selected at random to be updated
    and the others are deleted.

    Args:
        id: the UUID of the vLLM model deployer.
        config: the configuration of the model to be deployed with vLLM.
        timeout: the timeout in seconds to wait for the vLLM server
            to be provisioned and successfully started or updated. If set
            to 0, the method will return immediately after the vLLM
            server is provisioned, without waiting for it to fully start.

    Returns:
        The ZenML vLLM deployment service object that can be used to
        interact with the vLLM model http server.
    """
    config = cast(VLLMServiceConfig, config)
    service = self._create_new_service(
        id=id, timeout=timeout, config=config
    )
    logger.info(f"Created a new vLLM deployment service: {service}")
    return service
perform_start_model(self, service, timeout=60)

Method to start a model server.

Parameters:

Name Type Description Default
service BaseService

The service to start.

required
timeout int

Timeout in seconds to wait for the service to start.

60

Returns:

Type Description
BaseService

The started service.

Source code in zenml/integrations/vllm/model_deployers/vllm_model_deployer.py
def perform_start_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
) -> BaseService:
    """Method to start a model server.

    Args:
        service: The service to start.
        timeout: Timeout in seconds to wait for the service to start.

    Returns:
        The started service.
    """
    service.start(timeout=timeout)
    return service
perform_stop_model(self, service, timeout=60, force=False)

Method to stop a model server.

Parameters:

Name Type Description Default
service BaseService

The service to stop.

required
timeout int

Timeout in seconds to wait for the service to stop.

60
force bool

If True, force the service to stop.

False

Returns:

Type Description
BaseService

The stopped service.

Source code in zenml/integrations/vllm/model_deployers/vllm_model_deployer.py
def perform_stop_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
    force: bool = False,
) -> BaseService:
    """Method to stop a model server.

    Args:
        service: The service to stop.
        timeout: Timeout in seconds to wait for the service to stop.
        force: If True, force the service to stop.

    Returns:
        The stopped service.
    """
    service.stop(timeout=timeout, force=force)
    return service

services special

Initialization of the vLLM Inference Server.

vllm_deployment

Implementation of the vLLM Inference Server Service.

VLLMDeploymentEndpoint (LocalDaemonServiceEndpoint)

A service endpoint exposed by the vLLM deployment daemon.

Attributes:

Name Type Description
config VLLMDeploymentEndpointConfig

service endpoint configuration

Source code in zenml/integrations/vllm/services/vllm_deployment.py
class VLLMDeploymentEndpoint(LocalDaemonServiceEndpoint):
    """A service endpoint exposed by the vLLM deployment daemon.

    Attributes:
        config: service endpoint configuration
    """

    config: VLLMDeploymentEndpointConfig
    monitor: HTTPEndpointHealthMonitor

    @property
    def prediction_url(self) -> Optional[str]:
        """Gets the prediction URL for the endpoint.

        Returns:
            the prediction URL for the endpoint
        """
        uri = self.status.uri
        if not uri:
            return None
        return os.path.join(uri, self.config.prediction_url_path)
prediction_url: Optional[str] property readonly

Gets the prediction URL for the endpoint.

Returns:

Type Description
Optional[str]

the prediction URL for the endpoint

VLLMDeploymentEndpointConfig (LocalDaemonServiceEndpointConfig)

vLLM deployment service configuration.

Attributes:

Name Type Description
prediction_url_path str

URI subpath for prediction requests

Source code in zenml/integrations/vllm/services/vllm_deployment.py
class VLLMDeploymentEndpointConfig(LocalDaemonServiceEndpointConfig):
    """vLLM deployment service configuration.

    Attributes:
        prediction_url_path: URI subpath for prediction requests
    """

    prediction_url_path: str
VLLMDeploymentService (LocalDaemonService, BaseDeploymentService)

vLLM Inference Server Deployment Service.

Source code in zenml/integrations/vllm/services/vllm_deployment.py
class VLLMDeploymentService(LocalDaemonService, BaseDeploymentService):
    """vLLM Inference Server Deployment Service."""

    SERVICE_TYPE = ServiceType(
        name="vllm-deployment",
        type="model-serving",
        flavor="vllm",
        description="vLLM Inference prediction service",
    )
    config: VLLMServiceConfig
    endpoint: VLLMDeploymentEndpoint

    def __init__(self, config: VLLMServiceConfig, **attrs: Any):
        """Initialize the vLLM deployment service.

        Args:
            config: service configuration
            attrs: additional attributes to set on the service
        """
        if isinstance(config, VLLMServiceConfig) and "endpoint" not in attrs:
            endpoint = VLLMDeploymentEndpoint(
                config=VLLMDeploymentEndpointConfig(
                    protocol=ServiceEndpointProtocol.HTTP,
                    port=config.port,
                    ip_address=config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
                    prediction_url_path=VLLM_PREDICTION_URL_PATH,
                ),
                monitor=HTTPEndpointHealthMonitor(
                    config=HTTPEndpointHealthMonitorConfig(
                        healthcheck_uri_path=VLLM_HEALTHCHECK_URL_PATH,
                    )
                ),
            )
            attrs["endpoint"] = endpoint
        super().__init__(config=config, **attrs)

    def run(self) -> None:
        """Start the service."""
        logger.info(
            "Starting vLLM inference server service as blocking "
            "process... press CTRL+C once to stop it."
        )

        self.endpoint.prepare_for_start()

        import uvloop
        from vllm.entrypoints.openai.api_server import (
            run_server,
        )
        from vllm.entrypoints.openai.cli_args import (
            make_arg_parser,
        )
        from vllm.utils import (
            FlexibleArgumentParser,
        )

        try:
            parser: argparse.ArgumentParser = make_arg_parser(
                FlexibleArgumentParser()
            )
            args: argparse.Namespace = parser.parse_args()
            # Override port with the available port
            self.config.port = self.endpoint.status.port or self.config.port
            # Update the arguments in place
            args.__dict__.update(self.config.model_dump())
            uvloop.run(run_server(args=args))
        except KeyboardInterrupt:
            logger.info("Stopping vLLM prediction service...")

    @property
    def prediction_url(self) -> Optional[str]:
        """Gets the prediction URL for the endpoint.

        Returns:
            the prediction URL for the endpoint
        """
        if not self.is_running:
            return None
        return self.endpoint.prediction_url

    def predict(self, data: "Any") -> "Any":
        """Make a prediction using the service.

        Args:
            data: data to make a prediction on

        Returns:
            The prediction result.

        Raises:
            Exception: if the service is not running
            ValueError: if the prediction endpoint is unknown.
        """
        if not self.is_running:
            raise Exception(
                "vLLM Inference service is not running. "
                "Please start the service before making predictions."
            )
        if self.endpoint.prediction_url is not None:
            from openai import OpenAI

            client = OpenAI(
                api_key="EMPTY",
                base_url=self.endpoint.prediction_url,
            )
            models = client.models.list()
            model = models.data[0].id
            result = client.completions.create(model=model, prompt=data)
            # TODO: We can add support for client.chat.completions.create
        else:
            raise ValueError("No endpoint known for prediction.")
        return result
prediction_url: Optional[str] property readonly

Gets the prediction URL for the endpoint.

Returns:

Type Description
Optional[str]

the prediction URL for the endpoint

__init__(self, config, **attrs) special

Initialize the vLLM deployment service.

Parameters:

Name Type Description Default
config VLLMServiceConfig

service configuration

required
attrs Any

additional attributes to set on the service

{}
Source code in zenml/integrations/vllm/services/vllm_deployment.py
def __init__(self, config: VLLMServiceConfig, **attrs: Any):
    """Initialize the vLLM deployment service.

    Args:
        config: service configuration
        attrs: additional attributes to set on the service
    """
    if isinstance(config, VLLMServiceConfig) and "endpoint" not in attrs:
        endpoint = VLLMDeploymentEndpoint(
            config=VLLMDeploymentEndpointConfig(
                protocol=ServiceEndpointProtocol.HTTP,
                port=config.port,
                ip_address=config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
                prediction_url_path=VLLM_PREDICTION_URL_PATH,
            ),
            monitor=HTTPEndpointHealthMonitor(
                config=HTTPEndpointHealthMonitorConfig(
                    healthcheck_uri_path=VLLM_HEALTHCHECK_URL_PATH,
                )
            ),
        )
        attrs["endpoint"] = endpoint
    super().__init__(config=config, **attrs)
predict(self, data)

Make a prediction using the service.

Parameters:

Name Type Description Default
data Any

data to make a prediction on

required

Returns:

Type Description
Any

The prediction result.

Exceptions:

Type Description
Exception

if the service is not running

ValueError

if the prediction endpoint is unknown.

Source code in zenml/integrations/vllm/services/vllm_deployment.py
def predict(self, data: "Any") -> "Any":
    """Make a prediction using the service.

    Args:
        data: data to make a prediction on

    Returns:
        The prediction result.

    Raises:
        Exception: if the service is not running
        ValueError: if the prediction endpoint is unknown.
    """
    if not self.is_running:
        raise Exception(
            "vLLM Inference service is not running. "
            "Please start the service before making predictions."
        )
    if self.endpoint.prediction_url is not None:
        from openai import OpenAI

        client = OpenAI(
            api_key="EMPTY",
            base_url=self.endpoint.prediction_url,
        )
        models = client.models.list()
        model = models.data[0].id
        result = client.completions.create(model=model, prompt=data)
        # TODO: We can add support for client.chat.completions.create
    else:
        raise ValueError("No endpoint known for prediction.")
    return result
run(self)

Start the service.

Source code in zenml/integrations/vllm/services/vllm_deployment.py
def run(self) -> None:
    """Start the service."""
    logger.info(
        "Starting vLLM inference server service as blocking "
        "process... press CTRL+C once to stop it."
    )

    self.endpoint.prepare_for_start()

    import uvloop
    from vllm.entrypoints.openai.api_server import (
        run_server,
    )
    from vllm.entrypoints.openai.cli_args import (
        make_arg_parser,
    )
    from vllm.utils import (
        FlexibleArgumentParser,
    )

    try:
        parser: argparse.ArgumentParser = make_arg_parser(
            FlexibleArgumentParser()
        )
        args: argparse.Namespace = parser.parse_args()
        # Override port with the available port
        self.config.port = self.endpoint.status.port or self.config.port
        # Update the arguments in place
        args.__dict__.update(self.config.model_dump())
        uvloop.run(run_server(args=args))
    except KeyboardInterrupt:
        logger.info("Stopping vLLM prediction service...")
VLLMServiceConfig (LocalDaemonServiceConfig)

vLLM service configurations.

Source code in zenml/integrations/vllm/services/vllm_deployment.py
class VLLMServiceConfig(LocalDaemonServiceConfig):
    """vLLM service configurations."""

    model: str
    port: int
    host: Optional[str] = None
    blocking: bool = True
    # If unspecified, model name or path will be used.
    tokenizer: Optional[str] = None
    served_model_name: Optional[Union[str, List[str]]] = None
    # Trust remote code from huggingface.
    trust_remote_code: Optional[bool] = False
    # ['auto', 'slow', 'mistral']
    tokenizer_mode: Optional[str] = "auto"
    # ['auto', 'half', 'float16', 'bfloat16', 'float', 'float32']
    dtype: Optional[str] = "auto"
    # The specific model version to use. It can be a branch name, a tag name, or a commit id.
    # If unspecified, will use the default version.
    revision: Optional[str] = None