Skip to content

Mlflow

zenml.integrations.mlflow special

Initialization for the ZenML MLflow integration.

The MLflow integrations currently enables you to use MLflow tracking as a convenient way to visualize your experiment runs within the MLflow UI.

MlflowIntegration (Integration)

Definition of MLflow integration for ZenML.

Source code in zenml/integrations/mlflow/__init__.py
class MlflowIntegration(Integration):
    """Definition of MLflow integration for ZenML."""

    NAME = MLFLOW
    # We need to pin protobuf to a version <=4 here, as this mlflow release
    # does not pin it. They fixed this in a later version, so we can probably
    # remove this once we update the mlflow version.
    REQUIREMENTS = [
        "mlflow>=1.24.0,<=2.2.2",
        "mlserver>=0.5.3",
        "mlserver-mlflow>=0.5.3",
    ]

    @classmethod
    def activate(cls) -> None:
        """Activate the MLflow integration."""
        from zenml.integrations.mlflow import services  # noqa

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the MLflow integration.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.mlflow.flavors import (
            MLFlowExperimentTrackerFlavor,
            MLFlowModelDeployerFlavor,
            MLFlowModelRegistryFlavor,
        )

        return [
            MLFlowModelDeployerFlavor,
            MLFlowExperimentTrackerFlavor,
            MLFlowModelRegistryFlavor,
        ]

activate() classmethod

Activate the MLflow integration.

Source code in zenml/integrations/mlflow/__init__.py
@classmethod
def activate(cls) -> None:
    """Activate the MLflow integration."""
    from zenml.integrations.mlflow import services  # noqa

flavors() classmethod

Declare the stack component flavors for the MLflow integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/mlflow/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the MLflow integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.mlflow.flavors import (
        MLFlowExperimentTrackerFlavor,
        MLFlowModelDeployerFlavor,
        MLFlowModelRegistryFlavor,
    )

    return [
        MLFlowModelDeployerFlavor,
        MLFlowExperimentTrackerFlavor,
        MLFlowModelRegistryFlavor,
    ]

experiment_trackers special

Initialization of the MLflow experiment tracker.

mlflow_experiment_tracker

Implementation of the MLflow experiment tracker for ZenML.

MLFlowExperimentTracker (BaseExperimentTracker)

Track experiments using MLflow.

Source code in zenml/integrations/mlflow/experiment_trackers/mlflow_experiment_tracker.py
class MLFlowExperimentTracker(BaseExperimentTracker):
    """Track experiments using MLflow."""

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """Initialize the experiment tracker and validate the tracking uri.

        Args:
            *args: Variable length argument list.
            **kwargs: Arbitrary keyword arguments.
        """
        super().__init__(*args, **kwargs)
        self._ensure_valid_tracking_uri()

    def _ensure_valid_tracking_uri(self) -> None:
        """Ensures that the tracking uri is a valid mlflow tracking uri.

        Raises:
            ValueError: If the tracking uri is not valid.
        """
        tracking_uri = self.config.tracking_uri
        if tracking_uri:
            valid_schemes = DATABASE_ENGINES + ["http", "https", "file"]
            if not any(
                tracking_uri.startswith(scheme) for scheme in valid_schemes
            ) and not is_databricks_tracking_uri(tracking_uri):
                raise ValueError(
                    f"MLflow tracking uri does not start with one of the valid "
                    f"schemes {valid_schemes} or its value is not set to "
                    f"'databricks'. See "
                    f"https://www.mlflow.org/docs/latest/tracking.html#where-runs-are-recorded "
                    f"for more information."
                )

    @property
    def config(self) -> MLFlowExperimentTrackerConfig:
        """Returns the `MLFlowExperimentTrackerConfig` config.

        Returns:
            The configuration.
        """
        return cast(MLFlowExperimentTrackerConfig, self._config)

    @property
    def local_path(self) -> Optional[str]:
        """Path to the local directory where the MLflow artifacts are stored.

        Returns:
            None if configured with a remote tracking URI, otherwise the
            path to the local MLflow artifact store directory.
        """
        tracking_uri = self.get_tracking_uri()
        if is_remote_mlflow_tracking_uri(tracking_uri):
            return None
        else:
            assert tracking_uri.startswith("file:")
            return tracking_uri[5:]

    @property
    def validator(self) -> Optional["StackValidator"]:
        """Checks the stack has a `LocalArtifactStore` if no tracking uri was specified.

        Returns:
            An optional `StackValidator`.
        """
        if self.config.tracking_uri:
            # user specified a tracking uri, do nothing
            return None
        else:
            # try to fall back to a tracking uri inside the zenml artifact
            # store. this only works in case of a local artifact store, so we
            # make sure to prevent stack with other artifact stores for now
            return StackValidator(
                custom_validation_function=lambda stack: (
                    isinstance(stack.artifact_store, LocalArtifactStore),
                    "MLflow experiment tracker without a specified tracking "
                    "uri only works with a local artifact store.",
                )
            )

    @property
    def settings_class(self) -> Optional[Type["BaseSettings"]]:
        """Settings class for the Mlflow experiment tracker.

        Returns:
            The settings class.
        """
        return MLFlowExperimentTrackerSettings

    @staticmethod
    def _local_mlflow_backend() -> str:
        """Gets the local MLflow backend inside the ZenML artifact repository directory.

        Returns:
            The MLflow tracking URI for the local MLflow backend.
        """
        client = Client()
        artifact_store = client.active_stack.artifact_store
        local_mlflow_tracking_uri = os.path.join(artifact_store.path, "mlruns")
        if not os.path.exists(local_mlflow_tracking_uri):
            os.makedirs(local_mlflow_tracking_uri)
        return "file:" + local_mlflow_tracking_uri

    def get_tracking_uri(self) -> str:
        """Returns the configured tracking URI or a local fallback.

        Returns:
            The tracking URI.
        """
        return self.config.tracking_uri or self._local_mlflow_backend()

    def prepare_step_run(self, info: "StepRunInfo") -> None:
        """Sets the MLflow tracking uri and credentials.

        Args:
            info: Info about the step that will be executed.
        """
        self.configure_mlflow()
        settings = cast(
            MLFlowExperimentTrackerSettings,
            self.get_settings(info),
        )

        experiment_name = settings.experiment_name or info.pipeline.name
        experiment = self._set_active_experiment(experiment_name)
        run_id = self.get_run_id(
            experiment_name=experiment_name, run_name=info.run_name
        )

        tags = settings.tags.copy()
        tags.update(self._get_internal_tags())

        mlflow.start_run(
            run_id=run_id,
            run_name=info.run_name,
            experiment_id=experiment.experiment_id,
            tags=tags,
        )

        if settings.nested:
            mlflow.start_run(run_name=info.config.name, nested=True, tags=tags)

    def get_step_run_metadata(
        self, info: "StepRunInfo"
    ) -> Dict[str, "MetadataType"]:
        """Get component- and step-specific metadata after a step ran.

        Args:
            info: Info about the step that was executed.

        Returns:
            A dictionary of metadata.
        """
        return {
            METADATA_EXPERIMENT_TRACKER_URL: Uri(self.get_tracking_uri()),
            "mlflow_run_id": mlflow.active_run().info.run_id,
            "mlflow_experiment_id": mlflow.active_run().info.experiment_id,
        }

    def disable_autologging(self) -> None:
        """Disables MLflow autologging."""
        from mlflow import (
            fastai,
            gluon,
            lightgbm,
            pytorch,
            sklearn,
            spark,
            statsmodels,
            tensorflow,
            xgboost,
        )

        # There is no way to disable auto-logging for all frameworks at once.
        # If auto-logging is explicitly enabled for a framework by calling its
        # autolog() method, it cannot be disabled by calling
        # `mlflow.autolog(disable=True)`. Therefore, we need to disable
        # auto-logging for all frameworks explicitly.

        tensorflow.autolog(disable=True)
        gluon.autolog(disable=True)
        xgboost.autolog(disable=True)
        lightgbm.autolog(disable=True)
        statsmodels.autolog(disable=True)
        spark.autolog(disable=True)
        sklearn.autolog(disable=True)
        fastai.autolog(disable=True)
        pytorch.autolog(disable=True)

    def cleanup_step_run(
        self,
        info: "StepRunInfo",
        step_failed: bool,
    ) -> None:
        """Stops active MLflow runs and resets the MLflow tracking uri.

        Args:
            info: Info about the step that was executed.
            step_failed: Whether the step failed or not.
        """
        status = "FAILED" if step_failed else "FINISHED"
        self.disable_autologging()
        mlflow_utils.stop_zenml_mlflow_runs(status)
        mlflow.set_tracking_uri("")

    def configure_mlflow(self) -> None:
        """Configures the MLflow tracking URI and any additional credentials."""
        tracking_uri = self.get_tracking_uri()
        mlflow.set_tracking_uri(tracking_uri)
        mlflow.set_registry_uri(tracking_uri)

        if is_databricks_tracking_uri(tracking_uri):
            if self.config.databricks_host:
                os.environ[DATABRICKS_HOST] = self.config.databricks_host
            if self.config.tracking_username:
                os.environ[DATABRICKS_USERNAME] = self.config.tracking_username
            if self.config.tracking_password:
                os.environ[DATABRICKS_PASSWORD] = self.config.tracking_password
            if self.config.tracking_token:
                os.environ[DATABRICKS_TOKEN] = self.config.tracking_token
        else:
            if self.config.tracking_username:
                os.environ[
                    MLFLOW_TRACKING_USERNAME
                ] = self.config.tracking_username
            if self.config.tracking_password:
                os.environ[
                    MLFLOW_TRACKING_PASSWORD
                ] = self.config.tracking_password
            if self.config.tracking_token:
                os.environ[MLFLOW_TRACKING_TOKEN] = self.config.tracking_token

        os.environ[MLFLOW_TRACKING_INSECURE_TLS] = (
            "true" if self.config.tracking_insecure_tls else "false"
        )

    def get_run_id(self, experiment_name: str, run_name: str) -> Optional[str]:
        """Gets the if of a run with the given name and experiment.

        Args:
            experiment_name: Name of the experiment in which to search for the
                run.
            run_name: Name of the run to search.

        Returns:
            The id of the run if it exists.
        """
        self.configure_mlflow()
        experiment_name = self._adjust_experiment_name(experiment_name)

        runs = mlflow.search_runs(
            experiment_names=[experiment_name],
            filter_string=f'tags.mlflow.runName = "{run_name}"',
            run_view_type=3,
            output_format="list",
        )
        if not runs:
            return None

        run: Run = runs[0]
        if mlflow_utils.is_zenml_run(run):
            return cast(str, run.info.run_id)
        else:
            return None

    def _set_active_experiment(self, experiment_name: str) -> Experiment:
        """Sets the active MLflow experiment.

        If no experiment with this name exists, it is created and then
        activated.

        Args:
            experiment_name: Name of the experiment to activate.

        Raises:
            RuntimeError: If the experiment creation or activation failed.

        Returns:
            The experiment.
        """
        experiment_name = self._adjust_experiment_name(experiment_name)

        mlflow.set_experiment(experiment_name=experiment_name)
        experiment = mlflow.get_experiment_by_name(experiment_name)
        if not experiment:
            raise RuntimeError("Failed to set active mlflow experiment.")
        return experiment

    def _adjust_experiment_name(self, experiment_name: str) -> str:
        """Prepends a slash to the experiment name if using Databricks.

        Databricks requires the experiment name to be an absolute path within
        the Databricks workspace.

        Args:
            experiment_name: The experiment name.

        Returns:
            The potentially adjusted experiment name.
        """
        tracking_uri = self.get_tracking_uri()

        if (
            tracking_uri
            and is_databricks_tracking_uri(tracking_uri)
            and not experiment_name.startswith("/")
        ):
            return f"/{experiment_name}"
        else:
            return experiment_name

    @staticmethod
    def _get_internal_tags() -> Dict[str, Any]:
        """Gets ZenML internal tags for MLflow runs.

        Returns:
            Internal tags.
        """
        return {mlflow_utils.ZENML_TAG_KEY: zenml.__version__}
config: MLFlowExperimentTrackerConfig property readonly

Returns the MLFlowExperimentTrackerConfig config.

Returns:

Type Description
MLFlowExperimentTrackerConfig

The configuration.

local_path: Optional[str] property readonly

Path to the local directory where the MLflow artifacts are stored.

Returns:

Type Description
Optional[str]

None if configured with a remote tracking URI, otherwise the path to the local MLflow artifact store directory.

settings_class: Optional[Type[BaseSettings]] property readonly

Settings class for the Mlflow experiment tracker.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property readonly

Checks the stack has a LocalArtifactStore if no tracking uri was specified.

Returns:

Type Description
Optional[StackValidator]

An optional StackValidator.

__init__(self, *args, **kwargs) special

Initialize the experiment tracker and validate the tracking uri.

Parameters:

Name Type Description Default
*args Any

Variable length argument list.

()
**kwargs Any

Arbitrary keyword arguments.

{}
Source code in zenml/integrations/mlflow/experiment_trackers/mlflow_experiment_tracker.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initialize the experiment tracker and validate the tracking uri.

    Args:
        *args: Variable length argument list.
        **kwargs: Arbitrary keyword arguments.
    """
    super().__init__(*args, **kwargs)
    self._ensure_valid_tracking_uri()
cleanup_step_run(self, info, step_failed)

Stops active MLflow runs and resets the MLflow tracking uri.

Parameters:

Name Type Description Default
info StepRunInfo

Info about the step that was executed.

required
step_failed bool

Whether the step failed or not.

required
Source code in zenml/integrations/mlflow/experiment_trackers/mlflow_experiment_tracker.py
def cleanup_step_run(
    self,
    info: "StepRunInfo",
    step_failed: bool,
) -> None:
    """Stops active MLflow runs and resets the MLflow tracking uri.

    Args:
        info: Info about the step that was executed.
        step_failed: Whether the step failed or not.
    """
    status = "FAILED" if step_failed else "FINISHED"
    self.disable_autologging()
    mlflow_utils.stop_zenml_mlflow_runs(status)
    mlflow.set_tracking_uri("")
configure_mlflow(self)

Configures the MLflow tracking URI and any additional credentials.

Source code in zenml/integrations/mlflow/experiment_trackers/mlflow_experiment_tracker.py
def configure_mlflow(self) -> None:
    """Configures the MLflow tracking URI and any additional credentials."""
    tracking_uri = self.get_tracking_uri()
    mlflow.set_tracking_uri(tracking_uri)
    mlflow.set_registry_uri(tracking_uri)

    if is_databricks_tracking_uri(tracking_uri):
        if self.config.databricks_host:
            os.environ[DATABRICKS_HOST] = self.config.databricks_host
        if self.config.tracking_username:
            os.environ[DATABRICKS_USERNAME] = self.config.tracking_username
        if self.config.tracking_password:
            os.environ[DATABRICKS_PASSWORD] = self.config.tracking_password
        if self.config.tracking_token:
            os.environ[DATABRICKS_TOKEN] = self.config.tracking_token
    else:
        if self.config.tracking_username:
            os.environ[
                MLFLOW_TRACKING_USERNAME
            ] = self.config.tracking_username
        if self.config.tracking_password:
            os.environ[
                MLFLOW_TRACKING_PASSWORD
            ] = self.config.tracking_password
        if self.config.tracking_token:
            os.environ[MLFLOW_TRACKING_TOKEN] = self.config.tracking_token

    os.environ[MLFLOW_TRACKING_INSECURE_TLS] = (
        "true" if self.config.tracking_insecure_tls else "false"
    )
disable_autologging(self)

Disables MLflow autologging.

Source code in zenml/integrations/mlflow/experiment_trackers/mlflow_experiment_tracker.py
def disable_autologging(self) -> None:
    """Disables MLflow autologging."""
    from mlflow import (
        fastai,
        gluon,
        lightgbm,
        pytorch,
        sklearn,
        spark,
        statsmodels,
        tensorflow,
        xgboost,
    )

    # There is no way to disable auto-logging for all frameworks at once.
    # If auto-logging is explicitly enabled for a framework by calling its
    # autolog() method, it cannot be disabled by calling
    # `mlflow.autolog(disable=True)`. Therefore, we need to disable
    # auto-logging for all frameworks explicitly.

    tensorflow.autolog(disable=True)
    gluon.autolog(disable=True)
    xgboost.autolog(disable=True)
    lightgbm.autolog(disable=True)
    statsmodels.autolog(disable=True)
    spark.autolog(disable=True)
    sklearn.autolog(disable=True)
    fastai.autolog(disable=True)
    pytorch.autolog(disable=True)
get_run_id(self, experiment_name, run_name)

Gets the if of a run with the given name and experiment.

Parameters:

Name Type Description Default
experiment_name str

Name of the experiment in which to search for the run.

required
run_name str

Name of the run to search.

required

Returns:

Type Description
Optional[str]

The id of the run if it exists.

Source code in zenml/integrations/mlflow/experiment_trackers/mlflow_experiment_tracker.py
def get_run_id(self, experiment_name: str, run_name: str) -> Optional[str]:
    """Gets the if of a run with the given name and experiment.

    Args:
        experiment_name: Name of the experiment in which to search for the
            run.
        run_name: Name of the run to search.

    Returns:
        The id of the run if it exists.
    """
    self.configure_mlflow()
    experiment_name = self._adjust_experiment_name(experiment_name)

    runs = mlflow.search_runs(
        experiment_names=[experiment_name],
        filter_string=f'tags.mlflow.runName = "{run_name}"',
        run_view_type=3,
        output_format="list",
    )
    if not runs:
        return None

    run: Run = runs[0]
    if mlflow_utils.is_zenml_run(run):
        return cast(str, run.info.run_id)
    else:
        return None
get_step_run_metadata(self, info)

Get component- and step-specific metadata after a step ran.

Parameters:

Name Type Description Default
info StepRunInfo

Info about the step that was executed.

required

Returns:

Type Description
Dict[str, MetadataType]

A dictionary of metadata.

Source code in zenml/integrations/mlflow/experiment_trackers/mlflow_experiment_tracker.py
def get_step_run_metadata(
    self, info: "StepRunInfo"
) -> Dict[str, "MetadataType"]:
    """Get component- and step-specific metadata after a step ran.

    Args:
        info: Info about the step that was executed.

    Returns:
        A dictionary of metadata.
    """
    return {
        METADATA_EXPERIMENT_TRACKER_URL: Uri(self.get_tracking_uri()),
        "mlflow_run_id": mlflow.active_run().info.run_id,
        "mlflow_experiment_id": mlflow.active_run().info.experiment_id,
    }
get_tracking_uri(self)

Returns the configured tracking URI or a local fallback.

Returns:

Type Description
str

The tracking URI.

Source code in zenml/integrations/mlflow/experiment_trackers/mlflow_experiment_tracker.py
def get_tracking_uri(self) -> str:
    """Returns the configured tracking URI or a local fallback.

    Returns:
        The tracking URI.
    """
    return self.config.tracking_uri or self._local_mlflow_backend()
prepare_step_run(self, info)

Sets the MLflow tracking uri and credentials.

Parameters:

Name Type Description Default
info StepRunInfo

Info about the step that will be executed.

required
Source code in zenml/integrations/mlflow/experiment_trackers/mlflow_experiment_tracker.py
def prepare_step_run(self, info: "StepRunInfo") -> None:
    """Sets the MLflow tracking uri and credentials.

    Args:
        info: Info about the step that will be executed.
    """
    self.configure_mlflow()
    settings = cast(
        MLFlowExperimentTrackerSettings,
        self.get_settings(info),
    )

    experiment_name = settings.experiment_name or info.pipeline.name
    experiment = self._set_active_experiment(experiment_name)
    run_id = self.get_run_id(
        experiment_name=experiment_name, run_name=info.run_name
    )

    tags = settings.tags.copy()
    tags.update(self._get_internal_tags())

    mlflow.start_run(
        run_id=run_id,
        run_name=info.run_name,
        experiment_id=experiment.experiment_id,
        tags=tags,
    )

    if settings.nested:
        mlflow.start_run(run_name=info.config.name, nested=True, tags=tags)

flavors special

MLFlow integration flavors.

mlflow_experiment_tracker_flavor

MLflow experiment tracker flavor.

MLFlowExperimentTrackerConfig (BaseExperimentTrackerConfig, MLFlowExperimentTrackerSettings) pydantic-model

Config for the MLflow experiment tracker.

Attributes:

Name Type Description
tracking_uri Optional[str]

The uri of the mlflow tracking server. If no uri is set, your stack must contain a LocalArtifactStore and ZenML will point MLflow to a subdirectory of your artifact store instead.

tracking_username Optional[str]

Username for authenticating with the MLflow tracking server. When a remote tracking uri is specified, either tracking_token or tracking_username and tracking_password must be specified.

tracking_password Optional[str]

Password for authenticating with the MLflow tracking server. When a remote tracking uri is specified, either tracking_token or tracking_username and tracking_password must be specified.

tracking_token Optional[str]

Token for authenticating with the MLflow tracking server. When a remote tracking uri is specified, either tracking_token or tracking_username and tracking_password must be specified.

tracking_insecure_tls bool

Skips verification of TLS connection to the MLflow tracking server if set to True.

databricks_host Optional[str]

The host of the Databricks workspace with the MLflow managed server to connect to. This is only required if tracking_uri value is set to "databricks".

Source code in zenml/integrations/mlflow/flavors/mlflow_experiment_tracker_flavor.py
class MLFlowExperimentTrackerConfig(  # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
    BaseExperimentTrackerConfig, MLFlowExperimentTrackerSettings
):
    """Config for the MLflow experiment tracker.

    Attributes:
        tracking_uri: The uri of the mlflow tracking server. If no uri is set,
            your stack must contain a `LocalArtifactStore` and ZenML will
            point MLflow to a subdirectory of your artifact store instead.
        tracking_username: Username for authenticating with the MLflow
            tracking server. When a remote tracking uri is specified,
            either `tracking_token` or `tracking_username` and
            `tracking_password` must be specified.
        tracking_password: Password for authenticating with the MLflow
            tracking server. When a remote tracking uri is specified,
            either `tracking_token` or `tracking_username` and
            `tracking_password` must be specified.
        tracking_token: Token for authenticating with the MLflow
            tracking server. When a remote tracking uri is specified,
            either `tracking_token` or `tracking_username` and
            `tracking_password` must be specified.
        tracking_insecure_tls: Skips verification of TLS connection to the
            MLflow tracking server if set to `True`.
        databricks_host: The host of the Databricks workspace with the MLflow
            managed server to connect to. This is only required if
            `tracking_uri` value is set to `"databricks"`.
    """

    tracking_uri: Optional[str] = None
    tracking_username: Optional[str] = SecretField()
    tracking_password: Optional[str] = SecretField()
    tracking_token: Optional[str] = SecretField()
    tracking_insecure_tls: bool = False
    databricks_host: Optional[str] = None

    @root_validator(skip_on_failure=True)
    def _ensure_authentication_if_necessary(
        cls, values: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Ensures that credentials or a token for authentication exist.

        We make this check when running MLflow tracking with a remote backend.

        Args:
            values: The values to validate.

        Returns:
            The validated values.

        Raises:
            ValueError: If neither credentials nor a token are provided.
        """
        tracking_uri = values.get("tracking_uri")

        if tracking_uri:
            if is_databricks_tracking_uri(tracking_uri):
                # If the tracking uri is "databricks", then we need the databricks
                # host to be set.
                databricks_host = values.get("databricks_host")

                if not databricks_host:
                    raise ValueError(
                        "MLflow experiment tracking with a Databricks MLflow "
                        "managed tracking server requires the `databricks_host` "
                        "to be set in your stack component. To update your "
                        "component, run `zenml experiment-tracker update "
                        "<NAME> --databricks_host=DATABRICKS_HOST` "
                        "and specify the hostname of your Databricks workspace."
                    )

            if is_remote_mlflow_tracking_uri(tracking_uri):
                # we need either username + password or a token to authenticate to
                # the remote backend
                basic_auth = values.get("tracking_username") and values.get(
                    "tracking_password"
                )
                token_auth = values.get("tracking_token")

                if not (basic_auth or token_auth):
                    raise ValueError(
                        f"MLflow experiment tracking with a remote backend "
                        f"{tracking_uri} is only possible when specifying either "
                        f"username and password or an authentication token in your "
                        f"stack component. To update your component, run the "
                        f"following command: `zenml experiment-tracker update "
                        f"<NAME> --tracking_username=MY_USERNAME "
                        f"--tracking_password=MY_PASSWORD "
                        f"--tracking_token=MY_TOKEN` and specify either your "
                        f"username and password or token."
                    )

        return values

    @property
    def is_local(self) -> bool:
        """Checks if this stack component is running locally.

        This designation is used to determine if the stack component can be
        shared with other users or if it is only usable on the local host.

        Returns:
            True if this config is for a local component, False otherwise.
        """
        if not self.tracking_uri or not is_remote_mlflow_tracking_uri(
            self.tracking_uri
        ):
            return True
        return False
is_local: bool property readonly

Checks if this stack component is running locally.

This designation is used to determine if the stack component can be shared with other users or if it is only usable on the local host.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

MLFlowExperimentTrackerFlavor (BaseExperimentTrackerFlavor)

Class for the MLFlowExperimentTrackerFlavor.

Source code in zenml/integrations/mlflow/flavors/mlflow_experiment_tracker_flavor.py
class MLFlowExperimentTrackerFlavor(BaseExperimentTrackerFlavor):
    """Class for the `MLFlowExperimentTrackerFlavor`."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return MLFLOW_MODEL_EXPERIMENT_TRACKER_FLAVOR

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/experiment_tracker/mlflow.png"

    @property
    def config_class(self) -> Type[MLFlowExperimentTrackerConfig]:
        """Returns `MLFlowExperimentTrackerConfig` config class.

        Returns:
                The config class.
        """
        return MLFlowExperimentTrackerConfig

    @property
    def implementation_class(self) -> Type["MLFlowExperimentTracker"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.mlflow.experiment_trackers import (
            MLFlowExperimentTracker,
        )

        return MLFlowExperimentTracker
config_class: Type[zenml.integrations.mlflow.flavors.mlflow_experiment_tracker_flavor.MLFlowExperimentTrackerConfig] property readonly

Returns MLFlowExperimentTrackerConfig config class.

Returns:

Type Description
Type[zenml.integrations.mlflow.flavors.mlflow_experiment_tracker_flavor.MLFlowExperimentTrackerConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[MLFlowExperimentTracker] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[MLFlowExperimentTracker]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

MLFlowExperimentTrackerSettings (BaseSettings) pydantic-model

Settings for the MLflow experiment tracker.

Attributes:

Name Type Description
experiment_name Optional[str]

The MLflow experiment name.

nested bool

If True, will create a nested sub-run for the step.

tags Dict[str, Any]

Tags for the Mlflow run.

Source code in zenml/integrations/mlflow/flavors/mlflow_experiment_tracker_flavor.py
class MLFlowExperimentTrackerSettings(BaseSettings):
    """Settings for the MLflow experiment tracker.

    Attributes:
        experiment_name: The MLflow experiment name.
        nested: If `True`, will create a nested sub-run for the step.
        tags: Tags for the Mlflow run.
    """

    experiment_name: Optional[str] = None
    nested: bool = False
    tags: Dict[str, Any] = {}
is_databricks_tracking_uri(tracking_uri)

Checks whether the given tracking uri is a Databricks tracking uri.

Parameters:

Name Type Description Default
tracking_uri str

The tracking uri to check.

required

Returns:

Type Description
bool

True if the tracking uri is a Databricks tracking uri, False otherwise.

Source code in zenml/integrations/mlflow/flavors/mlflow_experiment_tracker_flavor.py
def is_databricks_tracking_uri(tracking_uri: str) -> bool:
    """Checks whether the given tracking uri is a Databricks tracking uri.

    Args:
        tracking_uri: The tracking uri to check.

    Returns:
        `True` if the tracking uri is a Databricks tracking uri, `False`
        otherwise.
    """
    return tracking_uri == "databricks"
is_remote_mlflow_tracking_uri(tracking_uri)

Checks whether the given tracking uri is remote or not.

Parameters:

Name Type Description Default
tracking_uri str

The tracking uri to check.

required

Returns:

Type Description
bool

True if the tracking uri is remote, False otherwise.

Source code in zenml/integrations/mlflow/flavors/mlflow_experiment_tracker_flavor.py
def is_remote_mlflow_tracking_uri(tracking_uri: str) -> bool:
    """Checks whether the given tracking uri is remote or not.

    Args:
        tracking_uri: The tracking uri to check.

    Returns:
        `True` if the tracking uri is remote, `False` otherwise.
    """
    return any(
        tracking_uri.startswith(prefix) for prefix in ["http://", "https://"]
    ) or is_databricks_tracking_uri(tracking_uri)

mlflow_model_deployer_flavor

MLflow model deployer flavor.

MLFlowModelDeployerConfig (BaseModelDeployerConfig) pydantic-model

Configuration for the MLflow model deployer.

Attributes:

Name Type Description
service_path str

the path where the local MLflow deployment service configuration, PID and log files are stored.

Source code in zenml/integrations/mlflow/flavors/mlflow_model_deployer_flavor.py
class MLFlowModelDeployerConfig(BaseModelDeployerConfig):
    """Configuration for the MLflow model deployer.

    Attributes:
        service_path: the path where the local MLflow deployment service
            configuration, PID and log files are stored.
    """

    service_path: str = ""

    @property
    def is_local(self) -> bool:
        """Checks if this stack component is running locally.

        This designation is used to determine if the stack component can be
        shared with other users or if it is only usable on the local host.

        Returns:
            True if this config is for a local component, False otherwise.
        """
        return True
is_local: bool property readonly

Checks if this stack component is running locally.

This designation is used to determine if the stack component can be shared with other users or if it is only usable on the local host.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

MLFlowModelDeployerFlavor (BaseModelDeployerFlavor)

Model deployer flavor for MLflow models.

Source code in zenml/integrations/mlflow/flavors/mlflow_model_deployer_flavor.py
class MLFlowModelDeployerFlavor(BaseModelDeployerFlavor):
    """Model deployer flavor for MLflow models."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return MLFLOW_MODEL_DEPLOYER_FLAVOR

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_deployer/mlflow.png"

    @property
    def config_class(self) -> Type[MLFlowModelDeployerConfig]:
        """Returns `MLFlowModelDeployerConfig` config class.

        Returns:
                The config class.
        """
        return MLFlowModelDeployerConfig

    @property
    def implementation_class(self) -> Type["MLFlowModelDeployer"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.mlflow.model_deployers import (
            MLFlowModelDeployer,
        )

        return MLFlowModelDeployer
config_class: Type[zenml.integrations.mlflow.flavors.mlflow_model_deployer_flavor.MLFlowModelDeployerConfig] property readonly

Returns MLFlowModelDeployerConfig config class.

Returns:

Type Description
Type[zenml.integrations.mlflow.flavors.mlflow_model_deployer_flavor.MLFlowModelDeployerConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[MLFlowModelDeployer] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[MLFlowModelDeployer]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

mlflow_model_registry_flavor

MLflow model registry flavor.

MLFlowModelRegistryConfig (BaseModelRegistryConfig) pydantic-model

Configuration for the MLflow model registry.

Source code in zenml/integrations/mlflow/flavors/mlflow_model_registry_flavor.py
class MLFlowModelRegistryConfig(BaseModelRegistryConfig):
    """Configuration for the MLflow model registry."""
MLFlowModelRegistryFlavor (BaseModelRegistryFlavor)

Model registry flavor for MLflow models.

Source code in zenml/integrations/mlflow/flavors/mlflow_model_registry_flavor.py
class MLFlowModelRegistryFlavor(BaseModelRegistryFlavor):
    """Model registry flavor for MLflow models."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return MLFLOW_MODEL_REGISTRY_FLAVOR

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_deployer/mlflow.png"

    @property
    def config_class(self) -> Type[MLFlowModelRegistryConfig]:
        """Returns `MLFlowModelRegistryConfig` config class.

        Returns:
                The config class.
        """
        return MLFlowModelRegistryConfig

    @property
    def implementation_class(self) -> Type["MLFlowModelRegistry"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.mlflow.model_registries import (
            MLFlowModelRegistry,
        )

        return MLFlowModelRegistry
config_class: Type[zenml.integrations.mlflow.flavors.mlflow_model_registry_flavor.MLFlowModelRegistryConfig] property readonly

Returns MLFlowModelRegistryConfig config class.

Returns:

Type Description
Type[zenml.integrations.mlflow.flavors.mlflow_model_registry_flavor.MLFlowModelRegistryConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[MLFlowModelRegistry] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[MLFlowModelRegistry]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

mlflow_utils

Implementation of utils specific to the MLflow integration.

get_missing_mlflow_experiment_tracker_error()

Returns description of how to add an MLflow experiment tracker to your stack.

Returns:

Type Description
ValueError

If no MLflow experiment tracker is registered in the active stack.

Source code in zenml/integrations/mlflow/mlflow_utils.py
def get_missing_mlflow_experiment_tracker_error() -> ValueError:
    """Returns description of how to add an MLflow experiment tracker to your stack.

    Returns:
        ValueError: If no MLflow experiment tracker is registered in the active stack.
    """
    return ValueError(
        "The active stack needs to have a MLflow experiment tracker "
        "component registered to be able to track experiments using "
        "MLflow. You can create a new stack with a MLflow experiment "
        "tracker component or update your existing stack to add this "
        "component, e.g.:\n\n"
        "  'zenml experiment-tracker register mlflow_tracker "
        "--type=mlflow'\n"
        "  'zenml stack register stack-name -e mlflow_tracker ...'\n"
    )

get_tracking_uri()

Gets the MLflow tracking URI from the active experiment tracking stack component.

noqa: DAR401

Returns:

Type Description
str

MLflow tracking URI.

Source code in zenml/integrations/mlflow/mlflow_utils.py
def get_tracking_uri() -> str:
    """Gets the MLflow tracking URI from the active experiment tracking stack component.

    # noqa: DAR401

    Returns:
        MLflow tracking URI.
    """
    from zenml.integrations.mlflow.experiment_trackers.mlflow_experiment_tracker import (
        MLFlowExperimentTracker,
    )

    tracker = Client().active_stack.experiment_tracker
    if tracker is None or not isinstance(tracker, MLFlowExperimentTracker):
        raise get_missing_mlflow_experiment_tracker_error()

    return tracker.get_tracking_uri()

is_zenml_run(run)

Checks if a MLflow run is a ZenML run or not.

Parameters:

Name Type Description Default
run Run

The run to check.

required

Returns:

Type Description
bool

If the run is a ZenML run.

Source code in zenml/integrations/mlflow/mlflow_utils.py
def is_zenml_run(run: Run) -> bool:
    """Checks if a MLflow run is a ZenML run or not.

    Args:
        run: The run to check.

    Returns:
        If the run is a ZenML run.
    """
    return ZENML_TAG_KEY in run.data.tags

stop_zenml_mlflow_runs(status)

Stops active ZenML Mlflow runs.

This function stops all MLflow active runs until no active run exists or a non-ZenML run is active.

Parameters:

Name Type Description Default
status str

The status to set the run to.

required
Source code in zenml/integrations/mlflow/mlflow_utils.py
def stop_zenml_mlflow_runs(status: str) -> None:
    """Stops active ZenML Mlflow runs.

    This function stops all MLflow active runs until no active run exists or
    a non-ZenML run is active.

    Args:
        status: The status to set the run to.
    """
    active_run = mlflow.active_run()
    while active_run:
        if is_zenml_run(active_run):
            logger.debug("Stopping mlflow run %s.", active_run.info.run_id)
            mlflow.end_run(status=status)
            active_run = mlflow.active_run()
        else:
            break

model_deployers special

Initialization of the MLflow model deployers.

mlflow_model_deployer

Implementation of the MLflow model deployer.

MLFlowModelDeployer (BaseModelDeployer)

MLflow implementation of the BaseModelDeployer.

Source code in zenml/integrations/mlflow/model_deployers/mlflow_model_deployer.py
class MLFlowModelDeployer(BaseModelDeployer):
    """MLflow implementation of the BaseModelDeployer."""

    NAME: ClassVar[str] = "MLflow"
    FLAVOR: ClassVar[Type[BaseModelDeployerFlavor]] = MLFlowModelDeployerFlavor

    _service_path: Optional[str] = None

    @property
    def config(self) -> MLFlowModelDeployerConfig:
        """Returns the `MLFlowModelDeployerConfig` config.

        Returns:
            The configuration.
        """
        return cast(MLFlowModelDeployerConfig, self._config)

    @staticmethod
    def get_service_path(id_: UUID) -> str:
        """Get the path where local MLflow service information is stored.

        This includes the deployment service configuration, PID and log files
        are stored.

        Args:
            id_: The ID of the MLflow model deployer.

        Returns:
            The service path.
        """
        service_path = os.path.join(
            GlobalConfiguration().local_stores_path,
            str(id_),
        )
        create_dir_recursive_if_not_exists(service_path)
        return service_path

    @property
    def local_path(self) -> str:
        """Returns the path to the root directory.

        This is where all configurations for MLflow deployment daemon processes
        are stored.

        If the service path is not set in the config by the user, the path is
        set to a local default path according to the component ID.

        Returns:
            The path to the local service root directory.
        """
        if self._service_path is not None:
            return self._service_path

        if self.config.service_path:
            self._service_path = self.config.service_path
        else:
            self._service_path = self.get_service_path(self.id)

        create_dir_recursive_if_not_exists(self._service_path)
        return self._service_path

    @staticmethod
    def get_model_server_info(  # type: ignore[override]
        service_instance: "MLFlowDeploymentService",
    ) -> Dict[str, Optional[str]]:
        """Return implementation specific information relevant to the user.

        Args:
            service_instance: Instance of a SeldonDeploymentService

        Returns:
            A dictionary containing the information.
        """
        return {
            "PREDICTION_URL": service_instance.endpoint.prediction_url,
            "MODEL_URI": service_instance.config.model_uri,
            "MODEL_NAME": service_instance.config.model_name,
            "REGISTRY_MODEL_NAME": service_instance.config.registry_model_name,
            "REGISTRY_MODEL_VERSION": service_instance.config.registry_model_version,
            "SERVICE_PATH": service_instance.status.runtime_path,
            "DAEMON_PID": str(service_instance.status.pid),
        }

    def deploy_model(
        self,
        config: ServiceConfig,
        replace: bool = False,
        timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
    ) -> BaseService:
        """Create a new MLflow deployment service or update an existing one.

        This should serve the supplied model and deployment configuration.

        This method has two modes of operation, depending on the `replace`
        argument value:

          * if `replace` is False, calling this method will create a new MLflow
            deployment server to reflect the model and other configuration
            parameters specified in the supplied MLflow service `config`.

          * if `replace` is True, this method will first attempt to find an
            existing MLflow deployment service that is *equivalent* to the
            supplied configuration parameters. Two or more MLflow deployment
            services are considered equivalent if they have the same
            `pipeline_name`, `pipeline_step_name` and `model_name` configuration
            parameters. To put it differently, two MLflow deployment services
            are equivalent if they serve versions of the same model deployed by
            the same pipeline step. If an equivalent MLflow deployment is found,
            it will be updated in place to reflect the new configuration
            parameters.

        Callers should set `replace` to True if they want a continuous model
        deployment workflow that doesn't spin up a new MLflow deployment
        server for each new model version. If multiple equivalent MLflow
        deployment servers are found, one is selected at random to be updated
        and the others are deleted.

        Args:
            config: the configuration of the model to be deployed with MLflow.
            replace: set this flag to True to find and update an equivalent
                MLflow deployment server with the new model instead of
                creating and starting a new deployment server.
            timeout: the timeout in seconds to wait for the MLflow server
                to be provisioned and successfully started or updated. If set
                to 0, the method will return immediately after the MLflow
                server is provisioned, without waiting for it to fully start.

        Returns:
            The ZenML MLflow deployment service object that can be used to
            interact with the MLflow model server.
        """
        config = cast(MLFlowDeploymentConfig, config)
        service = None

        # if replace is True, remove all existing services
        if replace is True:
            existing_services = self.find_model_server(
                pipeline_name=config.pipeline_name,
                pipeline_step_name=config.pipeline_step_name,
                model_name=config.model_name,
            )

            for existing_service in existing_services:
                if service is None:
                    # keep the most recently created service
                    service = cast(MLFlowDeploymentService, existing_service)
                try:
                    # delete the older services and don't wait for them to
                    # be deprovisioned
                    self._clean_up_existing_service(
                        existing_service=cast(
                            MLFlowDeploymentService, existing_service
                        ),
                        timeout=timeout,
                        force=True,
                    )
                except RuntimeError:
                    # ignore errors encountered while stopping old services
                    pass
        if service:
            logger.info(
                f"Updating an existing MLflow deployment service: {service}"
            )

            # set the root runtime path with the stack component's UUID
            config.root_runtime_path = self.local_path
            service.stop(timeout=timeout, force=True)
            service.update(config)
            service.start(timeout=timeout)
        else:
            # create a new MLFlowDeploymentService instance
            service = self._create_new_service(timeout, config)
            logger.info(f"Created a new MLflow deployment service: {service}")

        return cast(BaseService, service)

    def _clean_up_existing_service(
        self,
        timeout: int,
        force: bool,
        existing_service: MLFlowDeploymentService,
    ) -> None:
        # stop the older service
        existing_service.stop(timeout=timeout, force=force)

        # delete the old configuration file
        if existing_service.status.runtime_path:
            shutil.rmtree(existing_service.status.runtime_path)

    # the step will receive a config from the user that mentions the number
    # of workers etc.the step implementation will create a new config using
    # all values from the user and add values like pipeline name, model_uri
    def _create_new_service(
        self, timeout: int, config: MLFlowDeploymentConfig
    ) -> MLFlowDeploymentService:
        """Creates a new MLFlowDeploymentService.

        Args:
            timeout: the timeout in seconds to wait for the MLflow server
                to be provisioned and successfully started or updated.
            config: the configuration of the model to be deployed with MLflow.

        Returns:
            The MLFlowDeploymentService object that can be used to interact
            with the MLflow model server.
        """
        # set the root runtime path with the stack component's UUID
        config.root_runtime_path = self.local_path
        # create a new service for the new model
        service = MLFlowDeploymentService(config)
        service.start(timeout=timeout)

        return service

    def find_model_server(
        self,
        running: bool = False,
        service_uuid: Optional[UUID] = None,
        pipeline_name: Optional[str] = None,
        run_name: Optional[str] = None,
        pipeline_step_name: Optional[str] = None,
        model_name: Optional[str] = None,
        model_uri: Optional[str] = None,
        model_type: Optional[str] = None,
        registry_model_name: Optional[str] = None,
        registry_model_version: Optional[str] = None,
    ) -> List[BaseService]:
        """Finds one or more model servers that match the given criteria.

        Args:
            running: If true, only running services will be returned.
            service_uuid: The UUID of the service that was originally used
                to deploy the model.
            pipeline_name: Name of the pipeline that the deployed model was part
                of.
            run_name: Name of the pipeline run which the deployed model
                was part of.
            pipeline_step_name: The name of the pipeline model deployment step
                that deployed the model.
            model_name: Name of the deployed model.
            model_uri: URI of the deployed model.
            model_type: Type/format of the deployed model. Not used in this
                MLflow case.
            registry_model_name: Name of the registered model that the
                deployed model belongs to.
            registry_model_version: Version of the registered model that
                the deployed model belongs to.

        Returns:
            One or more Service objects representing model servers that match
            the input search criteria.

        Raises:
            TypeError: if any of the input arguments are of an invalid type.
        """
        services = []
        config = MLFlowDeploymentConfig(
            model_name=model_name or "",
            model_uri=model_uri or "",
            pipeline_name=pipeline_name or "",
            pipeline_run_id=run_name or "",
            run_name=run_name or "",
            pipeline_step_name=pipeline_step_name or "",
            registry_model_name=registry_model_name,
            registry_model_version=registry_model_version,
        )

        # find all services that match the input criteria
        for root, _, files in os.walk(self.local_path):
            if service_uuid and Path(root).name != str(service_uuid):
                continue
            for file in files:
                if file == SERVICE_DAEMON_CONFIG_FILE_NAME:
                    service_config_path = os.path.join(root, file)
                    logger.debug(
                        "Loading service daemon configuration from %s",
                        service_config_path,
                    )
                    existing_service_config = None
                    with open(service_config_path, "r") as f:
                        existing_service_config = f.read()
                    existing_service = (
                        ServiceRegistry().load_service_from_json(
                            existing_service_config
                        )
                    )
                    if not isinstance(
                        existing_service, MLFlowDeploymentService
                    ):
                        raise TypeError(
                            f"Expected service type MLFlowDeploymentService but got "
                            f"{type(existing_service)} instead"
                        )
                    existing_service.update_status()
                    if self._matches_search_criteria(existing_service, config):
                        if not running or existing_service.is_running:
                            services.append(
                                cast(BaseService, existing_service)
                            )

        return services

    def _matches_search_criteria(
        self,
        existing_service: MLFlowDeploymentService,
        config: MLFlowDeploymentConfig,
    ) -> bool:
        """Returns true if a service matches the input criteria.

        If any of the values in the input criteria are None, they are ignored.
        This allows listing services just by common pipeline names or step
        names, etc.

        Args:
            existing_service: The materialized Service instance derived from
                the config of the older (existing) service
            config: The MLFlowDeploymentConfig object passed to the
                deploy_model function holding parameters of the new service
                to be created.

        Returns:
            True if the service matches the input criteria.
        """
        existing_service_config = existing_service.config
        # check if the existing service matches the input criteria
        if (
            (
                not config.pipeline_name
                or existing_service_config.pipeline_name
                == config.pipeline_name
            )
            and (
                not config.model_name
                or existing_service_config.model_name == config.model_name
            )
            and (
                not config.pipeline_step_name
                or existing_service_config.pipeline_step_name
                == config.pipeline_step_name
            )
            and (
                not config.run_name
                or existing_service_config.run_name == config.run_name
            )
            and (
                (
                    not config.registry_model_name
                    and not config.registry_model_version
                )
                or (
                    existing_service_config.registry_model_name
                    == config.registry_model_name
                    and existing_service_config.registry_model_version
                    == config.registry_model_version
                )
            )
        ):
            return True

        return False

    def stop_model_server(
        self,
        uuid: UUID,
        timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
        force: bool = False,
    ) -> None:
        """Method to stop a model server.

        Args:
            uuid: UUID of the model server to stop.
            timeout: Timeout in seconds to wait for the service to stop.
            force: If True, force the service to stop.
        """
        # get list of all services
        existing_services = self.find_model_server(service_uuid=uuid)

        # if the service exists, stop it
        if existing_services:
            existing_services[0].stop(timeout=timeout, force=force)

    def start_model_server(
        self, uuid: UUID, timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT
    ) -> None:
        """Method to start a model server.

        Args:
            uuid: UUID of the model server to start.
            timeout: Timeout in seconds to wait for the service to start.
        """
        # get list of all services
        existing_services = self.find_model_server(service_uuid=uuid)

        # if the service exists, start it
        if existing_services:
            existing_services[0].start(timeout=timeout)

    def delete_model_server(
        self,
        uuid: UUID,
        timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
        force: bool = False,
    ) -> None:
        """Method to delete all configuration of a model server.

        Args:
            uuid: UUID of the model server to delete.
            timeout: Timeout in seconds to wait for the service to stop.
            force: If True, force the service to stop.
        """
        # get list of all services
        existing_services = self.find_model_server(service_uuid=uuid)

        # if the service exists, clean it up
        if existing_services:
            service = cast(MLFlowDeploymentService, existing_services[0])
            self._clean_up_existing_service(
                existing_service=service, timeout=timeout, force=force
            )
config: MLFlowModelDeployerConfig property readonly

Returns the MLFlowModelDeployerConfig config.

Returns:

Type Description
MLFlowModelDeployerConfig

The configuration.

local_path: str property readonly

Returns the path to the root directory.

This is where all configurations for MLflow deployment daemon processes are stored.

If the service path is not set in the config by the user, the path is set to a local default path according to the component ID.

Returns:

Type Description
str

The path to the local service root directory.

FLAVOR (BaseModelDeployerFlavor)

Model deployer flavor for MLflow models.

Source code in zenml/integrations/mlflow/model_deployers/mlflow_model_deployer.py
class MLFlowModelDeployerFlavor(BaseModelDeployerFlavor):
    """Model deployer flavor for MLflow models."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return MLFLOW_MODEL_DEPLOYER_FLAVOR

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_deployer/mlflow.png"

    @property
    def config_class(self) -> Type[MLFlowModelDeployerConfig]:
        """Returns `MLFlowModelDeployerConfig` config class.

        Returns:
                The config class.
        """
        return MLFlowModelDeployerConfig

    @property
    def implementation_class(self) -> Type["MLFlowModelDeployer"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.mlflow.model_deployers import (
            MLFlowModelDeployer,
        )

        return MLFlowModelDeployer
config_class: Type[zenml.integrations.mlflow.flavors.mlflow_model_deployer_flavor.MLFlowModelDeployerConfig] property readonly

Returns MLFlowModelDeployerConfig config class.

Returns:

Type Description
Type[zenml.integrations.mlflow.flavors.mlflow_model_deployer_flavor.MLFlowModelDeployerConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[MLFlowModelDeployer] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[MLFlowModelDeployer]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

delete_model_server(self, uuid, timeout=60, force=False)

Method to delete all configuration of a model server.

Parameters:

Name Type Description Default
uuid UUID

UUID of the model server to delete.

required
timeout int

Timeout in seconds to wait for the service to stop.

60
force bool

If True, force the service to stop.

False
Source code in zenml/integrations/mlflow/model_deployers/mlflow_model_deployer.py
def delete_model_server(
    self,
    uuid: UUID,
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Method to delete all configuration of a model server.

    Args:
        uuid: UUID of the model server to delete.
        timeout: Timeout in seconds to wait for the service to stop.
        force: If True, force the service to stop.
    """
    # get list of all services
    existing_services = self.find_model_server(service_uuid=uuid)

    # if the service exists, clean it up
    if existing_services:
        service = cast(MLFlowDeploymentService, existing_services[0])
        self._clean_up_existing_service(
            existing_service=service, timeout=timeout, force=force
        )
deploy_model(self, config, replace=False, timeout=60)

Create a new MLflow deployment service or update an existing one.

This should serve the supplied model and deployment configuration.

This method has two modes of operation, depending on the replace argument value:

  • if replace is False, calling this method will create a new MLflow deployment server to reflect the model and other configuration parameters specified in the supplied MLflow service config.

  • if replace is True, this method will first attempt to find an existing MLflow deployment service that is equivalent to the supplied configuration parameters. Two or more MLflow deployment services are considered equivalent if they have the same pipeline_name, pipeline_step_name and model_name configuration parameters. To put it differently, two MLflow deployment services are equivalent if they serve versions of the same model deployed by the same pipeline step. If an equivalent MLflow deployment is found, it will be updated in place to reflect the new configuration parameters.

Callers should set replace to True if they want a continuous model deployment workflow that doesn't spin up a new MLflow deployment server for each new model version. If multiple equivalent MLflow deployment servers are found, one is selected at random to be updated and the others are deleted.

Parameters:

Name Type Description Default
config ServiceConfig

the configuration of the model to be deployed with MLflow.

required
replace bool

set this flag to True to find and update an equivalent MLflow deployment server with the new model instead of creating and starting a new deployment server.

False
timeout int

the timeout in seconds to wait for the MLflow server to be provisioned and successfully started or updated. If set to 0, the method will return immediately after the MLflow server is provisioned, without waiting for it to fully start.

60

Returns:

Type Description
BaseService

The ZenML MLflow deployment service object that can be used to interact with the MLflow model server.

Source code in zenml/integrations/mlflow/model_deployers/mlflow_model_deployer.py
def deploy_model(
    self,
    config: ServiceConfig,
    replace: bool = False,
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
) -> BaseService:
    """Create a new MLflow deployment service or update an existing one.

    This should serve the supplied model and deployment configuration.

    This method has two modes of operation, depending on the `replace`
    argument value:

      * if `replace` is False, calling this method will create a new MLflow
        deployment server to reflect the model and other configuration
        parameters specified in the supplied MLflow service `config`.

      * if `replace` is True, this method will first attempt to find an
        existing MLflow deployment service that is *equivalent* to the
        supplied configuration parameters. Two or more MLflow deployment
        services are considered equivalent if they have the same
        `pipeline_name`, `pipeline_step_name` and `model_name` configuration
        parameters. To put it differently, two MLflow deployment services
        are equivalent if they serve versions of the same model deployed by
        the same pipeline step. If an equivalent MLflow deployment is found,
        it will be updated in place to reflect the new configuration
        parameters.

    Callers should set `replace` to True if they want a continuous model
    deployment workflow that doesn't spin up a new MLflow deployment
    server for each new model version. If multiple equivalent MLflow
    deployment servers are found, one is selected at random to be updated
    and the others are deleted.

    Args:
        config: the configuration of the model to be deployed with MLflow.
        replace: set this flag to True to find and update an equivalent
            MLflow deployment server with the new model instead of
            creating and starting a new deployment server.
        timeout: the timeout in seconds to wait for the MLflow server
            to be provisioned and successfully started or updated. If set
            to 0, the method will return immediately after the MLflow
            server is provisioned, without waiting for it to fully start.

    Returns:
        The ZenML MLflow deployment service object that can be used to
        interact with the MLflow model server.
    """
    config = cast(MLFlowDeploymentConfig, config)
    service = None

    # if replace is True, remove all existing services
    if replace is True:
        existing_services = self.find_model_server(
            pipeline_name=config.pipeline_name,
            pipeline_step_name=config.pipeline_step_name,
            model_name=config.model_name,
        )

        for existing_service in existing_services:
            if service is None:
                # keep the most recently created service
                service = cast(MLFlowDeploymentService, existing_service)
            try:
                # delete the older services and don't wait for them to
                # be deprovisioned
                self._clean_up_existing_service(
                    existing_service=cast(
                        MLFlowDeploymentService, existing_service
                    ),
                    timeout=timeout,
                    force=True,
                )
            except RuntimeError:
                # ignore errors encountered while stopping old services
                pass
    if service:
        logger.info(
            f"Updating an existing MLflow deployment service: {service}"
        )

        # set the root runtime path with the stack component's UUID
        config.root_runtime_path = self.local_path
        service.stop(timeout=timeout, force=True)
        service.update(config)
        service.start(timeout=timeout)
    else:
        # create a new MLFlowDeploymentService instance
        service = self._create_new_service(timeout, config)
        logger.info(f"Created a new MLflow deployment service: {service}")

    return cast(BaseService, service)
find_model_server(self, running=False, service_uuid=None, pipeline_name=None, run_name=None, pipeline_step_name=None, model_name=None, model_uri=None, model_type=None, registry_model_name=None, registry_model_version=None)

Finds one or more model servers that match the given criteria.

Parameters:

Name Type Description Default
running bool

If true, only running services will be returned.

False
service_uuid Optional[uuid.UUID]

The UUID of the service that was originally used to deploy the model.

None
pipeline_name Optional[str]

Name of the pipeline that the deployed model was part of.

None
run_name Optional[str]

Name of the pipeline run which the deployed model was part of.

None
pipeline_step_name Optional[str]

The name of the pipeline model deployment step that deployed the model.

None
model_name Optional[str]

Name of the deployed model.

None
model_uri Optional[str]

URI of the deployed model.

None
model_type Optional[str]

Type/format of the deployed model. Not used in this MLflow case.

None
registry_model_name Optional[str]

Name of the registered model that the deployed model belongs to.

None
registry_model_version Optional[str]

Version of the registered model that the deployed model belongs to.

None

Returns:

Type Description
List[zenml.services.service.BaseService]

One or more Service objects representing model servers that match the input search criteria.

Exceptions:

Type Description
TypeError

if any of the input arguments are of an invalid type.

Source code in zenml/integrations/mlflow/model_deployers/mlflow_model_deployer.py
def find_model_server(
    self,
    running: bool = False,
    service_uuid: Optional[UUID] = None,
    pipeline_name: Optional[str] = None,
    run_name: Optional[str] = None,
    pipeline_step_name: Optional[str] = None,
    model_name: Optional[str] = None,
    model_uri: Optional[str] = None,
    model_type: Optional[str] = None,
    registry_model_name: Optional[str] = None,
    registry_model_version: Optional[str] = None,
) -> List[BaseService]:
    """Finds one or more model servers that match the given criteria.

    Args:
        running: If true, only running services will be returned.
        service_uuid: The UUID of the service that was originally used
            to deploy the model.
        pipeline_name: Name of the pipeline that the deployed model was part
            of.
        run_name: Name of the pipeline run which the deployed model
            was part of.
        pipeline_step_name: The name of the pipeline model deployment step
            that deployed the model.
        model_name: Name of the deployed model.
        model_uri: URI of the deployed model.
        model_type: Type/format of the deployed model. Not used in this
            MLflow case.
        registry_model_name: Name of the registered model that the
            deployed model belongs to.
        registry_model_version: Version of the registered model that
            the deployed model belongs to.

    Returns:
        One or more Service objects representing model servers that match
        the input search criteria.

    Raises:
        TypeError: if any of the input arguments are of an invalid type.
    """
    services = []
    config = MLFlowDeploymentConfig(
        model_name=model_name or "",
        model_uri=model_uri or "",
        pipeline_name=pipeline_name or "",
        pipeline_run_id=run_name or "",
        run_name=run_name or "",
        pipeline_step_name=pipeline_step_name or "",
        registry_model_name=registry_model_name,
        registry_model_version=registry_model_version,
    )

    # find all services that match the input criteria
    for root, _, files in os.walk(self.local_path):
        if service_uuid and Path(root).name != str(service_uuid):
            continue
        for file in files:
            if file == SERVICE_DAEMON_CONFIG_FILE_NAME:
                service_config_path = os.path.join(root, file)
                logger.debug(
                    "Loading service daemon configuration from %s",
                    service_config_path,
                )
                existing_service_config = None
                with open(service_config_path, "r") as f:
                    existing_service_config = f.read()
                existing_service = (
                    ServiceRegistry().load_service_from_json(
                        existing_service_config
                    )
                )
                if not isinstance(
                    existing_service, MLFlowDeploymentService
                ):
                    raise TypeError(
                        f"Expected service type MLFlowDeploymentService but got "
                        f"{type(existing_service)} instead"
                    )
                existing_service.update_status()
                if self._matches_search_criteria(existing_service, config):
                    if not running or existing_service.is_running:
                        services.append(
                            cast(BaseService, existing_service)
                        )

    return services
get_model_server_info(service_instance) staticmethod

Return implementation specific information relevant to the user.

Parameters:

Name Type Description Default
service_instance MLFlowDeploymentService

Instance of a SeldonDeploymentService

required

Returns:

Type Description
Dict[str, Optional[str]]

A dictionary containing the information.

Source code in zenml/integrations/mlflow/model_deployers/mlflow_model_deployer.py
@staticmethod
def get_model_server_info(  # type: ignore[override]
    service_instance: "MLFlowDeploymentService",
) -> Dict[str, Optional[str]]:
    """Return implementation specific information relevant to the user.

    Args:
        service_instance: Instance of a SeldonDeploymentService

    Returns:
        A dictionary containing the information.
    """
    return {
        "PREDICTION_URL": service_instance.endpoint.prediction_url,
        "MODEL_URI": service_instance.config.model_uri,
        "MODEL_NAME": service_instance.config.model_name,
        "REGISTRY_MODEL_NAME": service_instance.config.registry_model_name,
        "REGISTRY_MODEL_VERSION": service_instance.config.registry_model_version,
        "SERVICE_PATH": service_instance.status.runtime_path,
        "DAEMON_PID": str(service_instance.status.pid),
    }
get_service_path(id_) staticmethod

Get the path where local MLflow service information is stored.

This includes the deployment service configuration, PID and log files are stored.

Parameters:

Name Type Description Default
id_ UUID

The ID of the MLflow model deployer.

required

Returns:

Type Description
str

The service path.

Source code in zenml/integrations/mlflow/model_deployers/mlflow_model_deployer.py
@staticmethod
def get_service_path(id_: UUID) -> str:
    """Get the path where local MLflow service information is stored.

    This includes the deployment service configuration, PID and log files
    are stored.

    Args:
        id_: The ID of the MLflow model deployer.

    Returns:
        The service path.
    """
    service_path = os.path.join(
        GlobalConfiguration().local_stores_path,
        str(id_),
    )
    create_dir_recursive_if_not_exists(service_path)
    return service_path
start_model_server(self, uuid, timeout=60)

Method to start a model server.

Parameters:

Name Type Description Default
uuid UUID

UUID of the model server to start.

required
timeout int

Timeout in seconds to wait for the service to start.

60
Source code in zenml/integrations/mlflow/model_deployers/mlflow_model_deployer.py
def start_model_server(
    self, uuid: UUID, timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT
) -> None:
    """Method to start a model server.

    Args:
        uuid: UUID of the model server to start.
        timeout: Timeout in seconds to wait for the service to start.
    """
    # get list of all services
    existing_services = self.find_model_server(service_uuid=uuid)

    # if the service exists, start it
    if existing_services:
        existing_services[0].start(timeout=timeout)
stop_model_server(self, uuid, timeout=60, force=False)

Method to stop a model server.

Parameters:

Name Type Description Default
uuid UUID

UUID of the model server to stop.

required
timeout int

Timeout in seconds to wait for the service to stop.

60
force bool

If True, force the service to stop.

False
Source code in zenml/integrations/mlflow/model_deployers/mlflow_model_deployer.py
def stop_model_server(
    self,
    uuid: UUID,
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Method to stop a model server.

    Args:
        uuid: UUID of the model server to stop.
        timeout: Timeout in seconds to wait for the service to stop.
        force: If True, force the service to stop.
    """
    # get list of all services
    existing_services = self.find_model_server(service_uuid=uuid)

    # if the service exists, stop it
    if existing_services:
        existing_services[0].stop(timeout=timeout, force=force)

model_registries special

Initialization of the MLflow model registry.

mlflow_model_registry

Implementation of the MLflow model registry for ZenML.

MLFlowModelRegistry (BaseModelRegistry)

Register models using MLflow.

Source code in zenml/integrations/mlflow/model_registries/mlflow_model_registry.py
class MLFlowModelRegistry(BaseModelRegistry):
    """Register models using MLflow."""

    _client: Optional[MlflowClient] = None

    @property
    def config(self) -> MLFlowModelRegistryConfig:
        """Returns the `MLFlowModelRegistryConfig` config.

        Returns:
            The configuration.
        """
        return cast(MLFlowModelRegistryConfig, self._config)

    def configure_mlflow(self) -> None:
        """Configures the MLflow Client with the experiment tracker config."""
        experiment_tracker = Client().active_stack.experiment_tracker
        assert isinstance(experiment_tracker, MLFlowExperimentTracker)
        experiment_tracker.configure_mlflow()

    @property
    def mlflow_client(self) -> MlflowClient:
        """Get the MLflow client.

        Returns:
            The MLFlowClient.
        """
        if not self._client:
            self.configure_mlflow()
            self._client = mlflow.tracking.MlflowClient()
        return self._client

    @property
    def validator(self) -> Optional[StackValidator]:
        """Validates that the stack contains an mlflow experiment tracker.

        Returns:
            A StackValidator instance.
        """

        def _validate_stack_requirements(stack: "Stack") -> Tuple[bool, str]:
            """Validates that all the requirements are met for the stack.

            Args:
                stack: The stack to validate.

            Returns:
                A tuple of (is_valid, error_message).
            """
            # Validate that the experiment tracker is an mlflow experiment tracker.
            experiment_tracker = stack.experiment_tracker
            assert experiment_tracker is not None
            if experiment_tracker.flavor != "mlflow":
                return False, (
                    "The MLflow model registry requires a MLflow experiment "
                    "tracker. You should register a MLflow experiment "
                    "tracker to the stack using the following command: "
                    "`zenml stack update model_registry -e mlflow_tracker"
                )
            mlflow_version = mlflow.version.VERSION
            if (
                not mlflow_version >= "2.1.1"
                and experiment_tracker.config.is_local
            ):
                return False, (
                    "The MLflow model registry requires MLflow version "
                    f"2.1.1 or higher to use a local MLflow registry. "
                    f"Your current MLflow version is {mlflow_version}."
                    "You can upgrade MLflow using the following command: "
                    "`pip install --upgrade mlflow`"
                )
            return True, ""

        return StackValidator(
            required_components={
                StackComponentType.EXPERIMENT_TRACKER,
            },
            custom_validation_function=_validate_stack_requirements,
        )

    # ---------
    # Model Registration Methods
    # ---------

    def register_model(
        self,
        name: str,
        description: Optional[str] = None,
        metadata: Optional[Dict[str, str]] = None,
    ) -> RegisteredModel:
        """Register a model to the MLflow model registry.

        Args:
            name: The name of the model.
            description: The description of the model.
            metadata: The metadata of the model.

        Raises:
            RuntimeError: If the model already exists.

        Returns:
            The registered model.
        """
        # Check if model already exists.
        try:
            self.get_model(name)
            raise KeyError(
                f"Model with name {name} already exists in the MLflow model "
                f"registry. Please use a different name.",
            )
        except KeyError:
            pass
        # Register model.
        try:
            registered_model = self.mlflow_client.create_registered_model(
                name=name,
                description=description,
                tags=metadata,
            )
        except MlflowException as e:
            raise RuntimeError(
                f"Failed to register model with name {name} to the MLflow "
                f"model registry: {str(e)}",
            )

        # Return the registered model.
        return RegisteredModel(
            name=registered_model.name,
            description=registered_model.description,
            metadata=registered_model.tags,
        )

    def delete_model(
        self,
        name: str,
    ) -> None:
        """Delete a model from the MLflow model registry.

        Args:
            name: The name of the model.

        Raises:
            RuntimeError: If the model does not exist.
        """
        # Check if model exists.
        self.get_model(name=name)
        # Delete the registered model.
        try:
            self.mlflow_client.delete_registered_model(
                name=name,
            )
        except MlflowException as e:
            raise RuntimeError(
                f"Failed to delete model with name {name} from MLflow model "
                f"registry: {str(e)}",
            )

    def update_model(
        self,
        name: str,
        description: Optional[str] = None,
        metadata: Optional[Dict[str, str]] = None,
        remove_metadata: Optional[List[str]] = None,
    ) -> RegisteredModel:
        """Update a model in the MLflow model registry.

        Args:
            name: The name of the model.
            description: The description of the model.
            metadata: The metadata of the model.
            remove_metadata: The metadata to remove from the model.

        Raises:
            RuntimeError: If mlflow fails to update the model.

        Returns:
            The updated model.
        """
        # Check if model exists.
        self.get_model(name=name)
        # Update the registered model description.
        if description:
            try:
                self.mlflow_client.update_registered_model(
                    name=name,
                    description=description,
                )
            except MlflowException as e:
                raise RuntimeError(
                    f"Failed to update description for the model {name} in MLflow "
                    f"model registry: {str(e)}",
                )
        # Update the registered model tags.
        if metadata:
            try:
                for tag, value in metadata.items():
                    self.mlflow_client.set_registered_model_tag(
                        name=name,
                        key=tag,
                        value=value,
                    )
            except MlflowException as e:
                raise RuntimeError(
                    f"Failed to update tags for the model {name} in MLflow model "
                    f"registry: {str(e)}",
                )
        # Remove tags from the registered model.
        if remove_metadata:
            try:
                for tag in remove_metadata:
                    self.mlflow_client.delete_registered_model_tag(
                        name=name,
                        key=tag,
                    )
            except MlflowException as e:
                raise RuntimeError(
                    f"Failed to remove tags for the model {name} in MLflow model "
                    f"registry: {str(e)}",
                )
        # Return the updated registered model.
        return self.get_model(name)

    def get_model(self, name: str) -> RegisteredModel:
        """Get a model from the MLflow model registry.

        Args:
            name: The name of the model.

        Returns:
            The model.

        Raises:
            KeyError: If mlflow fails to get the model.
        """
        # Get the registered model.
        try:
            registered_model = self.mlflow_client.get_registered_model(
                name=name,
            )
        except MlflowException as e:
            raise KeyError(
                f"Failed to get model with name {name} from the MLflow model "
                f"registry: {str(e)}",
            )
        # Return the registered model.
        return RegisteredModel(
            name=registered_model.name,
            description=registered_model.description,
            metadata=registered_model.tags,
        )

    def list_models(
        self,
        name: Optional[str] = None,
        metadata: Optional[Dict[str, str]] = None,
    ) -> List[RegisteredModel]:
        """List models in the MLflow model registry.

        Args:
            name: A name to filter the models by.
            metadata: The metadata to filter the models by.

        Returns:
            A list of models (RegisteredModel)
        """
        # Set the filter string.
        filter_string = ""
        if name:
            filter_string += f"name='{name}'"
        if metadata:
            for tag, value in metadata.items():
                if filter_string:
                    filter_string += " AND "
                filter_string += f"tags.{tag}='{value}'"

        # Get the registered models.
        registered_models = self.mlflow_client.search_registered_models(
            filter_string=filter_string,
            max_results=100,
        )
        # Return the registered models.
        return [
            RegisteredModel(
                name=registered_model.name,
                description=registered_model.description,
                metadata=registered_model.tags,
            )
            for registered_model in registered_models
        ]

    # ---------
    # Model Version Methods
    # ---------

    def register_model_version(
        self,
        name: str,
        version: Optional[str] = None,
        model_source_uri: Optional[str] = None,
        description: Optional[str] = None,
        metadata: ModelRegistryModelMetadata = Field(
            default_factory=ModelRegistryModelMetadata
        ),
        **kwargs: Any,
    ) -> ModelVersion:
        """Register a model version to the MLflow model registry.

        Args:
            name: The name of the model.
            model_source_uri: The source URI of the model.
            version: The version of the model.
            description: The description of the model version.
            metadata: The registry metadata of the model version.
            **kwargs: Additional keyword arguments.

        Raises:
            RuntimeError: If the registered model does not exist.

        Returns:
            The registered model version.
        """
        # Check if the model exists, if not create it.
        try:
            self.get_model(name=name)
        except KeyError:
            logger.info(
                f"No registered model with name {name} found. Creating a new"
                "registered model."
            )
            self.register_model(
                name=name,
            )
        try:
            # Inform the user that the version is ignored.
            if version:
                logger.info(
                    f"MLflow model registry does not take a version as an argument. "
                    f"Registering a new version for the model `'{name}'` "
                    f"a version will be assigned automatically."
                )
            # Set the run ID and link.
            run_id = metadata.dict().get("mlflow_run_id", None)
            run_link = metadata.dict().get("mlflow_run_link", None)
            # Register the model version.
            registered_model_version = self.mlflow_client.create_model_version(
                name=name,
                source=model_source_uri,
                run_id=run_id,
                run_link=run_link,
                description=description,
                tags=metadata.dict(),
            )
        except MlflowException as e:
            raise RuntimeError(
                f"Failed to register model version with name '{name}' and "
                f"version '{version}' to the MLflow model registry."
                f"Error: {e}"
            )
        # Return the registered model version.
        return self._cast_mlflow_version_to_model_version(
            registered_model_version
        )

    def delete_model_version(
        self,
        name: str,
        version: str,
    ) -> None:
        """Delete a model version from the MLflow model registry.

        Args:
            name: The name of the model.
            version: The version of the model.

        Raises:
            RuntimeError: If mlflow fails to delete the model version.
        """
        self.get_model_version(name=name, version=version)
        try:
            self.mlflow_client.delete_model_version(
                name=name,
                version=version,
            )
        except MlflowException as e:
            raise RuntimeError(
                f"Failed to delete model version '{version}' of model '{name}'."
                f"From the MLflow model registry: {str(e)}",
            )

    def update_model_version(
        self,
        name: str,
        version: str,
        description: Optional[str] = None,
        metadata: ModelRegistryModelMetadata = Field(
            default_factory=ModelRegistryModelMetadata
        ),
        remove_metadata: Optional[List[str]] = None,
        stage: Optional[ModelVersionStage] = None,
    ) -> ModelVersion:
        """Update a model version in the MLflow model registry.

        Args:
            name: The name of the model.
            version: The version of the model.
            description: The description of the model version.
            metadata: The metadata of the model version.
            remove_metadata: The metadata to remove from the model version.
            stage: The stage of the model version.

        Raises:
            RuntimeError: If mlflow fails to update the model version.

        Returns:
            The updated model version.
        """
        self.get_model_version(name=name, version=version)
        # Update the model description.
        if description:
            try:
                self.mlflow_client.update_model_version(
                    name=name,
                    version=version,
                    description=description,
                )
            except MlflowException as e:
                raise RuntimeError(
                    f"Failed to update the description of model version "
                    f"'{name}:{version}' in the MLflow model registry: {str(e)}"
                )
        # Update the model tags.
        if metadata:
            try:
                for key, value in metadata.dict().items():
                    self.mlflow_client.set_model_version_tag(
                        name=name,
                        version=version,
                        key=key,
                        value=value,
                    )
            except MlflowException as e:
                raise RuntimeError(
                    f"Failed to update the tags of model version "
                    f"'{name}:{version}' in the MLflow model registry: {str(e)}"
                )
        # Remove the model tags.
        if remove_metadata:
            try:
                for key in remove_metadata:
                    self.mlflow_client.delete_model_version_tag(
                        name=name,
                        version=version,
                        key=key,
                    )
            except MlflowException as e:
                raise RuntimeError(
                    f"Failed to remove the tags of model version "
                    f"'{name}:{version}' in the MLflow model registry: {str(e)}"
                )
        # Update the model stage.
        if stage:
            try:
                self.mlflow_client.transition_model_stage(
                    name=name,
                    version=version,
                    stage=stage.value,
                )
            except MlflowException as e:
                raise RuntimeError(
                    f"Failed to update the current stage of model version "
                    f"'{name}:{version}' in the MLflow model registry: {str(e)}"
                )
        return self.get_model_version(name, version)

    def get_model_version(
        self,
        name: str,
        version: str,
    ) -> ModelVersion:
        """Get a model version from the MLflow model registry.

        Args:
            name: The name of the model.
            version: The version of the model.

        Raises:
            KeyError: If the model version does not exist.

        Returns:
            The model version.
        """
        # Get the model version from the MLflow model registry.
        try:
            mlflow_model_version = self.mlflow_client.get_model_version(
                name=name,
                version=version,
            )
        except MlflowException as e:
            raise KeyError(
                f"Failed to get model version '{name}:{version}' from the "
                f"MLflow model registry: {str(e)}"
            )
        # Return the model version.
        return self._cast_mlflow_version_to_model_version(
            mlflow_model_version=mlflow_model_version,
        )

    def list_model_versions(
        self,
        name: Optional[str] = None,
        model_source_uri: Optional[str] = None,
        metadata: ModelRegistryModelMetadata = Field(
            default_factory=ModelRegistryModelMetadata
        ),
        stage: Optional[ModelVersionStage] = None,
        count: Optional[int] = None,
        created_after: Optional[datetime] = None,
        created_before: Optional[datetime] = None,
        order_by_date: Optional[str] = None,
        **kwargs: Any,
    ) -> List[ModelVersion]:
        """List model versions from the MLflow model registry.

        Args:
            name: The name of the model.
            model_source_uri: The model source URI.
            metadata: The metadata of the model version.
            stage: The stage of the model version.
            count: The maximum number of model versions to return.
            created_after: The minimum creation time of the model versions.
            created_before: The maximum creation time of the model versions.
            order_by_date: The order of the model versions by creation time,
                either ascending or descending.
            kwargs: Additional keyword arguments.

        Returns:
            The model versions.
        """
        # Set the filter string.
        filter_string = ""
        if name:
            filter_string += f"name='{name}'"
        if model_source_uri:
            if filter_string:
                filter_string += " AND "
            filter_string += f"source='{model_source_uri}'"
        if "mlflow_run_id" in kwargs and kwargs["mlflow_run_id"]:
            if filter_string:
                filter_string += " AND "
            filter_string += f"run_id='{kwargs['mlflow_run_id']}'"
        if metadata:
            for tag, value in metadata.dict().items():
                if value:
                    if filter_string:
                        filter_string += " AND "
                    filter_string += f"tags.{tag}='{value}'"
        # Get the model versions.
        mlflow_model_versions = self.mlflow_client.search_model_versions(
            filter_string=filter_string,
        )
        # Cast the MLflow model versions to the ZenML model version class.
        model_versions = [
            self._cast_mlflow_version_to_model_version(
                mlflow_model_version=mlflow_model_version,
            )
            for mlflow_model_version in mlflow_model_versions
        ]
        # Filter the model versions by stage.
        if stage:
            model_versions = [
                model_version
                for model_version in model_versions
                if model_version.stage == stage
            ]
        # Filter the model versions by creation time.
        if created_after:
            model_versions = [
                model_version
                for model_version in model_versions
                if model_version.created_at
                and model_version.created_at >= created_after
            ]
        if created_before:
            model_versions = [
                model_version
                for model_version in model_versions
                if model_version.created_at
                and model_version.created_at <= created_before
            ]
        # Sort the model versions by creation time.
        if order_by_date == "asc":
            model_versions = sorted(
                model_versions,
                key=lambda model_version: model_version.created_at
                if model_version.created_at is not None
                else float("-inf"),
            )
        elif order_by_date == "desc":
            model_versions = sorted(
                model_versions,
                key=lambda model_version: model_version.created_at
                if model_version.created_at is not None
                else float("inf"),
                reverse=True,
            )
        # Return the model versions.
        if count:
            return model_versions[:count]
        return model_versions

    def load_model_version(
        self,
        name: str,
        version: str,
        **kwargs: Any,
    ) -> Any:
        """Load a model version from the MLflow model registry.

        This method loads the model version from the MLflow model registry
        and returns the model. The model is loaded using the `mlflow.pyfunc`
        module which takes care of loading the model from the model source
        URI for the right framework.

        Args:
            name: The name of the model.
            version: The version of the model.
            kwargs: Additional keyword arguments.

        Returns:
            The model version.

        Raises:
            KeyError: If the model version does not exist.
        """
        try:
            self.get_model_version(name=name, version=version)
        except KeyError:
            raise KeyError(
                f"Failed to load model version '{name}:{version}' from the "
                f"MLflow model registry: Model version does not exist."
            )
        # Load the model version.
        mlflow_model_version = self.mlflow_client.get_model_version(
            name=name,
            version=version,
        )
        return load_model(
            model_uri=mlflow_model_version.source,
            **kwargs,
        )

    def get_model_uri_artifact_store(
        self,
        model_version: ModelVersion,
    ) -> str:
        """Get the model URI artifact store.

        Args:
            model_version: The model version.

        Returns:
            The model URI artifact store.
        """
        artifact_store_path = (
            f"{Client().active_stack.artifact_store.path}/mlflow"
        )
        model_source_uri = model_version.model_source_uri.rsplit(":")[-1]
        return artifact_store_path + model_source_uri

    def _cast_mlflow_version_to_model_version(
        self,
        mlflow_model_version: MLflowModelVersion,
    ) -> ModelVersion:
        """Cast an MLflow model version to a model version.

        Args:
            mlflow_model_version: The MLflow model version.

        Returns:
            The model version.
        """
        metadata = mlflow_model_version.tags or {}
        if mlflow_model_version.run_id:
            metadata["mlflow_run_id"] = mlflow_model_version.run_id
        if mlflow_model_version.run_link:
            metadata["mlflow_run_link"] = mlflow_model_version.run_link

        try:
            from mlflow.models import get_model_info

            model_library = (
                get_model_info(model_uri=mlflow_model_version.source)
                .flavors.get("python_function", {})
                .get("loader_module")
            )
        except ImportError:
            model_library = None
        return ModelVersion(
            registered_model=RegisteredModel(name=mlflow_model_version.name),
            model_format=MLFLOW_MODEL_FORMAT,
            model_library=model_library,
            version=mlflow_model_version.version,
            created_at=datetime.fromtimestamp(
                int(mlflow_model_version.creation_timestamp) / 1e3
            ),
            stage=ModelVersionStage(mlflow_model_version.current_stage),
            description=mlflow_model_version.description,
            last_updated_at=datetime.fromtimestamp(
                int(mlflow_model_version.last_updated_timestamp) / 1e3
            ),
            metadata=ModelRegistryModelMetadata(**metadata),
            model_source_uri=mlflow_model_version.source,
        )
config: MLFlowModelRegistryConfig property readonly

Returns the MLFlowModelRegistryConfig config.

Returns:

Type Description
MLFlowModelRegistryConfig

The configuration.

mlflow_client: MlflowClient property readonly

Get the MLflow client.

Returns:

Type Description
MlflowClient

The MLFlowClient.

validator: Optional[zenml.stack.stack_validator.StackValidator] property readonly

Validates that the stack contains an mlflow experiment tracker.

Returns:

Type Description
Optional[zenml.stack.stack_validator.StackValidator]

A StackValidator instance.

configure_mlflow(self)

Configures the MLflow Client with the experiment tracker config.

Source code in zenml/integrations/mlflow/model_registries/mlflow_model_registry.py
def configure_mlflow(self) -> None:
    """Configures the MLflow Client with the experiment tracker config."""
    experiment_tracker = Client().active_stack.experiment_tracker
    assert isinstance(experiment_tracker, MLFlowExperimentTracker)
    experiment_tracker.configure_mlflow()
delete_model(self, name)

Delete a model from the MLflow model registry.

Parameters:

Name Type Description Default
name str

The name of the model.

required

Exceptions:

Type Description
RuntimeError

If the model does not exist.

Source code in zenml/integrations/mlflow/model_registries/mlflow_model_registry.py
def delete_model(
    self,
    name: str,
) -> None:
    """Delete a model from the MLflow model registry.

    Args:
        name: The name of the model.

    Raises:
        RuntimeError: If the model does not exist.
    """
    # Check if model exists.
    self.get_model(name=name)
    # Delete the registered model.
    try:
        self.mlflow_client.delete_registered_model(
            name=name,
        )
    except MlflowException as e:
        raise RuntimeError(
            f"Failed to delete model with name {name} from MLflow model "
            f"registry: {str(e)}",
        )
delete_model_version(self, name, version)

Delete a model version from the MLflow model registry.

Parameters:

Name Type Description Default
name str

The name of the model.

required
version str

The version of the model.

required

Exceptions:

Type Description
RuntimeError

If mlflow fails to delete the model version.

Source code in zenml/integrations/mlflow/model_registries/mlflow_model_registry.py
def delete_model_version(
    self,
    name: str,
    version: str,
) -> None:
    """Delete a model version from the MLflow model registry.

    Args:
        name: The name of the model.
        version: The version of the model.

    Raises:
        RuntimeError: If mlflow fails to delete the model version.
    """
    self.get_model_version(name=name, version=version)
    try:
        self.mlflow_client.delete_model_version(
            name=name,
            version=version,
        )
    except MlflowException as e:
        raise RuntimeError(
            f"Failed to delete model version '{version}' of model '{name}'."
            f"From the MLflow model registry: {str(e)}",
        )
get_model(self, name)

Get a model from the MLflow model registry.

Parameters:

Name Type Description Default
name str

The name of the model.

required

Returns:

Type Description
RegisteredModel

The model.

Exceptions:

Type Description
KeyError

If mlflow fails to get the model.

Source code in zenml/integrations/mlflow/model_registries/mlflow_model_registry.py
def get_model(self, name: str) -> RegisteredModel:
    """Get a model from the MLflow model registry.

    Args:
        name: The name of the model.

    Returns:
        The model.

    Raises:
        KeyError: If mlflow fails to get the model.
    """
    # Get the registered model.
    try:
        registered_model = self.mlflow_client.get_registered_model(
            name=name,
        )
    except MlflowException as e:
        raise KeyError(
            f"Failed to get model with name {name} from the MLflow model "
            f"registry: {str(e)}",
        )
    # Return the registered model.
    return RegisteredModel(
        name=registered_model.name,
        description=registered_model.description,
        metadata=registered_model.tags,
    )
get_model_uri_artifact_store(self, model_version)

Get the model URI artifact store.

Parameters:

Name Type Description Default
model_version ModelVersion

The model version.

required

Returns:

Type Description
str

The model URI artifact store.

Source code in zenml/integrations/mlflow/model_registries/mlflow_model_registry.py
def get_model_uri_artifact_store(
    self,
    model_version: ModelVersion,
) -> str:
    """Get the model URI artifact store.

    Args:
        model_version: The model version.

    Returns:
        The model URI artifact store.
    """
    artifact_store_path = (
        f"{Client().active_stack.artifact_store.path}/mlflow"
    )
    model_source_uri = model_version.model_source_uri.rsplit(":")[-1]
    return artifact_store_path + model_source_uri
get_model_version(self, name, version)

Get a model version from the MLflow model registry.

Parameters:

Name Type Description Default
name str

The name of the model.

required
version str

The version of the model.

required

Exceptions:

Type Description
KeyError

If the model version does not exist.

Returns:

Type Description
ModelVersion

The model version.

Source code in zenml/integrations/mlflow/model_registries/mlflow_model_registry.py
def get_model_version(
    self,
    name: str,
    version: str,
) -> ModelVersion:
    """Get a model version from the MLflow model registry.

    Args:
        name: The name of the model.
        version: The version of the model.

    Raises:
        KeyError: If the model version does not exist.

    Returns:
        The model version.
    """
    # Get the model version from the MLflow model registry.
    try:
        mlflow_model_version = self.mlflow_client.get_model_version(
            name=name,
            version=version,
        )
    except MlflowException as e:
        raise KeyError(
            f"Failed to get model version '{name}:{version}' from the "
            f"MLflow model registry: {str(e)}"
        )
    # Return the model version.
    return self._cast_mlflow_version_to_model_version(
        mlflow_model_version=mlflow_model_version,
    )
list_model_versions(self, name=None, model_source_uri=None, metadata=FieldInfo(default=PydanticUndefined, default_factory=<class 'zenml.model_registries.base_model_registry.ModelRegistryModelMetadata'>, extra={}), stage=None, count=None, created_after=None, created_before=None, order_by_date=None, **kwargs)

List model versions from the MLflow model registry.

Parameters:

Name Type Description Default
name Optional[str]

The name of the model.

None
model_source_uri Optional[str]

The model source URI.

None
metadata ModelRegistryModelMetadata

The metadata of the model version.

FieldInfo(default=PydanticUndefined, default_factory=<class 'zenml.model_registries.base_model_registry.ModelRegistryModelMetadata'>, extra={})
stage Optional[zenml.model_registries.base_model_registry.ModelVersionStage]

The stage of the model version.

None
count Optional[int]

The maximum number of model versions to return.

None
created_after Optional[datetime.datetime]

The minimum creation time of the model versions.

None
created_before Optional[datetime.datetime]

The maximum creation time of the model versions.

None
order_by_date Optional[str]

The order of the model versions by creation time, either ascending or descending.

None
kwargs Any

Additional keyword arguments.

{}

Returns:

Type Description
List[zenml.model_registries.base_model_registry.ModelVersion]

The model versions.

Source code in zenml/integrations/mlflow/model_registries/mlflow_model_registry.py
def list_model_versions(
    self,
    name: Optional[str] = None,
    model_source_uri: Optional[str] = None,
    metadata: ModelRegistryModelMetadata = Field(
        default_factory=ModelRegistryModelMetadata
    ),
    stage: Optional[ModelVersionStage] = None,
    count: Optional[int] = None,
    created_after: Optional[datetime] = None,
    created_before: Optional[datetime] = None,
    order_by_date: Optional[str] = None,
    **kwargs: Any,
) -> List[ModelVersion]:
    """List model versions from the MLflow model registry.

    Args:
        name: The name of the model.
        model_source_uri: The model source URI.
        metadata: The metadata of the model version.
        stage: The stage of the model version.
        count: The maximum number of model versions to return.
        created_after: The minimum creation time of the model versions.
        created_before: The maximum creation time of the model versions.
        order_by_date: The order of the model versions by creation time,
            either ascending or descending.
        kwargs: Additional keyword arguments.

    Returns:
        The model versions.
    """
    # Set the filter string.
    filter_string = ""
    if name:
        filter_string += f"name='{name}'"
    if model_source_uri:
        if filter_string:
            filter_string += " AND "
        filter_string += f"source='{model_source_uri}'"
    if "mlflow_run_id" in kwargs and kwargs["mlflow_run_id"]:
        if filter_string:
            filter_string += " AND "
        filter_string += f"run_id='{kwargs['mlflow_run_id']}'"
    if metadata:
        for tag, value in metadata.dict().items():
            if value:
                if filter_string:
                    filter_string += " AND "
                filter_string += f"tags.{tag}='{value}'"
    # Get the model versions.
    mlflow_model_versions = self.mlflow_client.search_model_versions(
        filter_string=filter_string,
    )
    # Cast the MLflow model versions to the ZenML model version class.
    model_versions = [
        self._cast_mlflow_version_to_model_version(
            mlflow_model_version=mlflow_model_version,
        )
        for mlflow_model_version in mlflow_model_versions
    ]
    # Filter the model versions by stage.
    if stage:
        model_versions = [
            model_version
            for model_version in model_versions
            if model_version.stage == stage
        ]
    # Filter the model versions by creation time.
    if created_after:
        model_versions = [
            model_version
            for model_version in model_versions
            if model_version.created_at
            and model_version.created_at >= created_after
        ]
    if created_before:
        model_versions = [
            model_version
            for model_version in model_versions
            if model_version.created_at
            and model_version.created_at <= created_before
        ]
    # Sort the model versions by creation time.
    if order_by_date == "asc":
        model_versions = sorted(
            model_versions,
            key=lambda model_version: model_version.created_at
            if model_version.created_at is not None
            else float("-inf"),
        )
    elif order_by_date == "desc":
        model_versions = sorted(
            model_versions,
            key=lambda model_version: model_version.created_at
            if model_version.created_at is not None
            else float("inf"),
            reverse=True,
        )
    # Return the model versions.
    if count:
        return model_versions[:count]
    return model_versions
list_models(self, name=None, metadata=None)

List models in the MLflow model registry.

Parameters:

Name Type Description Default
name Optional[str]

A name to filter the models by.

None
metadata Optional[Dict[str, str]]

The metadata to filter the models by.

None

Returns:

Type Description
List[zenml.model_registries.base_model_registry.RegisteredModel]

A list of models (RegisteredModel)

Source code in zenml/integrations/mlflow/model_registries/mlflow_model_registry.py
def list_models(
    self,
    name: Optional[str] = None,
    metadata: Optional[Dict[str, str]] = None,
) -> List[RegisteredModel]:
    """List models in the MLflow model registry.

    Args:
        name: A name to filter the models by.
        metadata: The metadata to filter the models by.

    Returns:
        A list of models (RegisteredModel)
    """
    # Set the filter string.
    filter_string = ""
    if name:
        filter_string += f"name='{name}'"
    if metadata:
        for tag, value in metadata.items():
            if filter_string:
                filter_string += " AND "
            filter_string += f"tags.{tag}='{value}'"

    # Get the registered models.
    registered_models = self.mlflow_client.search_registered_models(
        filter_string=filter_string,
        max_results=100,
    )
    # Return the registered models.
    return [
        RegisteredModel(
            name=registered_model.name,
            description=registered_model.description,
            metadata=registered_model.tags,
        )
        for registered_model in registered_models
    ]
load_model_version(self, name, version, **kwargs)

Load a model version from the MLflow model registry.

This method loads the model version from the MLflow model registry and returns the model. The model is loaded using the mlflow.pyfunc module which takes care of loading the model from the model source URI for the right framework.

Parameters:

Name Type Description Default
name str

The name of the model.

required
version str

The version of the model.

required
kwargs Any

Additional keyword arguments.

{}

Returns:

Type Description
Any

The model version.

Exceptions:

Type Description
KeyError

If the model version does not exist.

Source code in zenml/integrations/mlflow/model_registries/mlflow_model_registry.py
def load_model_version(
    self,
    name: str,
    version: str,
    **kwargs: Any,
) -> Any:
    """Load a model version from the MLflow model registry.

    This method loads the model version from the MLflow model registry
    and returns the model. The model is loaded using the `mlflow.pyfunc`
    module which takes care of loading the model from the model source
    URI for the right framework.

    Args:
        name: The name of the model.
        version: The version of the model.
        kwargs: Additional keyword arguments.

    Returns:
        The model version.

    Raises:
        KeyError: If the model version does not exist.
    """
    try:
        self.get_model_version(name=name, version=version)
    except KeyError:
        raise KeyError(
            f"Failed to load model version '{name}:{version}' from the "
            f"MLflow model registry: Model version does not exist."
        )
    # Load the model version.
    mlflow_model_version = self.mlflow_client.get_model_version(
        name=name,
        version=version,
    )
    return load_model(
        model_uri=mlflow_model_version.source,
        **kwargs,
    )
register_model(self, name, description=None, metadata=None)

Register a model to the MLflow model registry.

Parameters:

Name Type Description Default
name str

The name of the model.

required
description Optional[str]

The description of the model.

None
metadata Optional[Dict[str, str]]

The metadata of the model.

None

Exceptions:

Type Description
RuntimeError

If the model already exists.

Returns:

Type Description
RegisteredModel

The registered model.

Source code in zenml/integrations/mlflow/model_registries/mlflow_model_registry.py
def register_model(
    self,
    name: str,
    description: Optional[str] = None,
    metadata: Optional[Dict[str, str]] = None,
) -> RegisteredModel:
    """Register a model to the MLflow model registry.

    Args:
        name: The name of the model.
        description: The description of the model.
        metadata: The metadata of the model.

    Raises:
        RuntimeError: If the model already exists.

    Returns:
        The registered model.
    """
    # Check if model already exists.
    try:
        self.get_model(name)
        raise KeyError(
            f"Model with name {name} already exists in the MLflow model "
            f"registry. Please use a different name.",
        )
    except KeyError:
        pass
    # Register model.
    try:
        registered_model = self.mlflow_client.create_registered_model(
            name=name,
            description=description,
            tags=metadata,
        )
    except MlflowException as e:
        raise RuntimeError(
            f"Failed to register model with name {name} to the MLflow "
            f"model registry: {str(e)}",
        )

    # Return the registered model.
    return RegisteredModel(
        name=registered_model.name,
        description=registered_model.description,
        metadata=registered_model.tags,
    )
register_model_version(self, name, version=None, model_source_uri=None, description=None, metadata=FieldInfo(default=PydanticUndefined, default_factory=<class 'zenml.model_registries.base_model_registry.ModelRegistryModelMetadata'>, extra={}), **kwargs)

Register a model version to the MLflow model registry.

Parameters:

Name Type Description Default
name str

The name of the model.

required
model_source_uri Optional[str]

The source URI of the model.

None
version Optional[str]

The version of the model.

None
description Optional[str]

The description of the model version.

None
metadata ModelRegistryModelMetadata

The registry metadata of the model version.

FieldInfo(default=PydanticUndefined, default_factory=<class 'zenml.model_registries.base_model_registry.ModelRegistryModelMetadata'>, extra={})
**kwargs Any

Additional keyword arguments.

{}

Exceptions:

Type Description
RuntimeError

If the registered model does not exist.

Returns:

Type Description
ModelVersion

The registered model version.

Source code in zenml/integrations/mlflow/model_registries/mlflow_model_registry.py
def register_model_version(
    self,
    name: str,
    version: Optional[str] = None,
    model_source_uri: Optional[str] = None,
    description: Optional[str] = None,
    metadata: ModelRegistryModelMetadata = Field(
        default_factory=ModelRegistryModelMetadata
    ),
    **kwargs: Any,
) -> ModelVersion:
    """Register a model version to the MLflow model registry.

    Args:
        name: The name of the model.
        model_source_uri: The source URI of the model.
        version: The version of the model.
        description: The description of the model version.
        metadata: The registry metadata of the model version.
        **kwargs: Additional keyword arguments.

    Raises:
        RuntimeError: If the registered model does not exist.

    Returns:
        The registered model version.
    """
    # Check if the model exists, if not create it.
    try:
        self.get_model(name=name)
    except KeyError:
        logger.info(
            f"No registered model with name {name} found. Creating a new"
            "registered model."
        )
        self.register_model(
            name=name,
        )
    try:
        # Inform the user that the version is ignored.
        if version:
            logger.info(
                f"MLflow model registry does not take a version as an argument. "
                f"Registering a new version for the model `'{name}'` "
                f"a version will be assigned automatically."
            )
        # Set the run ID and link.
        run_id = metadata.dict().get("mlflow_run_id", None)
        run_link = metadata.dict().get("mlflow_run_link", None)
        # Register the model version.
        registered_model_version = self.mlflow_client.create_model_version(
            name=name,
            source=model_source_uri,
            run_id=run_id,
            run_link=run_link,
            description=description,
            tags=metadata.dict(),
        )
    except MlflowException as e:
        raise RuntimeError(
            f"Failed to register model version with name '{name}' and "
            f"version '{version}' to the MLflow model registry."
            f"Error: {e}"
        )
    # Return the registered model version.
    return self._cast_mlflow_version_to_model_version(
        registered_model_version
    )
update_model(self, name, description=None, metadata=None, remove_metadata=None)

Update a model in the MLflow model registry.

Parameters:

Name Type Description Default
name str

The name of the model.

required
description Optional[str]

The description of the model.

None
metadata Optional[Dict[str, str]]

The metadata of the model.

None
remove_metadata Optional[List[str]]

The metadata to remove from the model.

None

Exceptions:

Type Description
RuntimeError

If mlflow fails to update the model.

Returns:

Type Description
RegisteredModel

The updated model.

Source code in zenml/integrations/mlflow/model_registries/mlflow_model_registry.py
def update_model(
    self,
    name: str,
    description: Optional[str] = None,
    metadata: Optional[Dict[str, str]] = None,
    remove_metadata: Optional[List[str]] = None,
) -> RegisteredModel:
    """Update a model in the MLflow model registry.

    Args:
        name: The name of the model.
        description: The description of the model.
        metadata: The metadata of the model.
        remove_metadata: The metadata to remove from the model.

    Raises:
        RuntimeError: If mlflow fails to update the model.

    Returns:
        The updated model.
    """
    # Check if model exists.
    self.get_model(name=name)
    # Update the registered model description.
    if description:
        try:
            self.mlflow_client.update_registered_model(
                name=name,
                description=description,
            )
        except MlflowException as e:
            raise RuntimeError(
                f"Failed to update description for the model {name} in MLflow "
                f"model registry: {str(e)}",
            )
    # Update the registered model tags.
    if metadata:
        try:
            for tag, value in metadata.items():
                self.mlflow_client.set_registered_model_tag(
                    name=name,
                    key=tag,
                    value=value,
                )
        except MlflowException as e:
            raise RuntimeError(
                f"Failed to update tags for the model {name} in MLflow model "
                f"registry: {str(e)}",
            )
    # Remove tags from the registered model.
    if remove_metadata:
        try:
            for tag in remove_metadata:
                self.mlflow_client.delete_registered_model_tag(
                    name=name,
                    key=tag,
                )
        except MlflowException as e:
            raise RuntimeError(
                f"Failed to remove tags for the model {name} in MLflow model "
                f"registry: {str(e)}",
            )
    # Return the updated registered model.
    return self.get_model(name)
update_model_version(self, name, version, description=None, metadata=FieldInfo(default=PydanticUndefined, default_factory=<class 'zenml.model_registries.base_model_registry.ModelRegistryModelMetadata'>, extra={}), remove_metadata=None, stage=None)

Update a model version in the MLflow model registry.

Parameters:

Name Type Description Default
name str

The name of the model.

required
version str

The version of the model.

required
description Optional[str]

The description of the model version.

None
metadata ModelRegistryModelMetadata

The metadata of the model version.

FieldInfo(default=PydanticUndefined, default_factory=<class 'zenml.model_registries.base_model_registry.ModelRegistryModelMetadata'>, extra={})
remove_metadata Optional[List[str]]

The metadata to remove from the model version.

None
stage Optional[zenml.model_registries.base_model_registry.ModelVersionStage]

The stage of the model version.

None

Exceptions:

Type Description
RuntimeError

If mlflow fails to update the model version.

Returns:

Type Description
ModelVersion

The updated model version.

Source code in zenml/integrations/mlflow/model_registries/mlflow_model_registry.py
def update_model_version(
    self,
    name: str,
    version: str,
    description: Optional[str] = None,
    metadata: ModelRegistryModelMetadata = Field(
        default_factory=ModelRegistryModelMetadata
    ),
    remove_metadata: Optional[List[str]] = None,
    stage: Optional[ModelVersionStage] = None,
) -> ModelVersion:
    """Update a model version in the MLflow model registry.

    Args:
        name: The name of the model.
        version: The version of the model.
        description: The description of the model version.
        metadata: The metadata of the model version.
        remove_metadata: The metadata to remove from the model version.
        stage: The stage of the model version.

    Raises:
        RuntimeError: If mlflow fails to update the model version.

    Returns:
        The updated model version.
    """
    self.get_model_version(name=name, version=version)
    # Update the model description.
    if description:
        try:
            self.mlflow_client.update_model_version(
                name=name,
                version=version,
                description=description,
            )
        except MlflowException as e:
            raise RuntimeError(
                f"Failed to update the description of model version "
                f"'{name}:{version}' in the MLflow model registry: {str(e)}"
            )
    # Update the model tags.
    if metadata:
        try:
            for key, value in metadata.dict().items():
                self.mlflow_client.set_model_version_tag(
                    name=name,
                    version=version,
                    key=key,
                    value=value,
                )
        except MlflowException as e:
            raise RuntimeError(
                f"Failed to update the tags of model version "
                f"'{name}:{version}' in the MLflow model registry: {str(e)}"
            )
    # Remove the model tags.
    if remove_metadata:
        try:
            for key in remove_metadata:
                self.mlflow_client.delete_model_version_tag(
                    name=name,
                    version=version,
                    key=key,
                )
        except MlflowException as e:
            raise RuntimeError(
                f"Failed to remove the tags of model version "
                f"'{name}:{version}' in the MLflow model registry: {str(e)}"
            )
    # Update the model stage.
    if stage:
        try:
            self.mlflow_client.transition_model_stage(
                name=name,
                version=version,
                stage=stage.value,
            )
        except MlflowException as e:
            raise RuntimeError(
                f"Failed to update the current stage of model version "
                f"'{name}:{version}' in the MLflow model registry: {str(e)}"
            )
    return self.get_model_version(name, version)

services special

Initialization of the MLflow Service.

mlflow_deployment

Implementation of the MLflow deployment functionality.

MLFlowDeploymentConfig (LocalDaemonServiceConfig) pydantic-model

MLflow model deployment configuration.

Attributes:

Name Type Description
model_uri str

URI of the MLflow model to serve

model_name str

the name of the model

workers int

number of workers to use for the prediction service

registry_model_name Optional[str]

the name of the model in the registry

registry_model_version Optional[str]

the version of the model in the registry

mlserver bool

set to True to use the MLflow MLServer backend (see https://github.com/SeldonIO/MLServer). If False, the MLflow built-in scoring server will be used.

timeout int

timeout in seconds for starting and stopping the service

Source code in zenml/integrations/mlflow/services/mlflow_deployment.py
class MLFlowDeploymentConfig(LocalDaemonServiceConfig):
    """MLflow model deployment configuration.

    Attributes:
        model_uri: URI of the MLflow model to serve
        model_name: the name of the model
        workers: number of workers to use for the prediction service
        registry_model_name: the name of the model in the registry
        registry_model_version: the version of the model in the registry
        mlserver: set to True to use the MLflow MLServer backend (see
            https://github.com/SeldonIO/MLServer). If False, the
            MLflow built-in scoring server will be used.
        timeout: timeout in seconds for starting and stopping the service
    """

    # TODO: ServiceConfig should have additional fields such as "pipeline_run_uuid"
    #  and "pipeline_uuid" to allow for better tracking of the service.
    model_uri: str
    model_name: str
    registry_model_name: Optional[str] = None
    registry_model_version: Optional[str] = None
    registry_model_stage: Optional[str] = None
    workers: int = 1
    mlserver: bool = False
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT
MLFlowDeploymentEndpoint (LocalDaemonServiceEndpoint) pydantic-model

A service endpoint exposed by the MLflow deployment daemon.

Attributes:

Name Type Description
config MLFlowDeploymentEndpointConfig

service endpoint configuration

monitor HTTPEndpointHealthMonitor

optional service endpoint health monitor

Source code in zenml/integrations/mlflow/services/mlflow_deployment.py
class MLFlowDeploymentEndpoint(LocalDaemonServiceEndpoint):
    """A service endpoint exposed by the MLflow deployment daemon.

    Attributes:
        config: service endpoint configuration
        monitor: optional service endpoint health monitor
    """

    config: MLFlowDeploymentEndpointConfig
    monitor: HTTPEndpointHealthMonitor

    @property
    def prediction_url(self) -> Optional[str]:
        """Gets the prediction URL for the endpoint.

        Returns:
            the prediction URL for the endpoint
        """
        uri = self.status.uri
        if not uri:
            return None
        return os.path.join(uri, self.config.prediction_url_path)
prediction_url: Optional[str] property readonly

Gets the prediction URL for the endpoint.

Returns:

Type Description
Optional[str]

the prediction URL for the endpoint

MLFlowDeploymentEndpointConfig (LocalDaemonServiceEndpointConfig) pydantic-model

MLflow daemon service endpoint configuration.

Attributes:

Name Type Description
prediction_url_path str

URI subpath for prediction requests

Source code in zenml/integrations/mlflow/services/mlflow_deployment.py
class MLFlowDeploymentEndpointConfig(LocalDaemonServiceEndpointConfig):
    """MLflow daemon service endpoint configuration.

    Attributes:
        prediction_url_path: URI subpath for prediction requests
    """

    prediction_url_path: str
MLFlowDeploymentService (LocalDaemonService, BaseDeploymentService) pydantic-model

MLflow deployment service used to start a local prediction server for MLflow models.

Attributes:

Name Type Description
SERVICE_TYPE ClassVar[zenml.services.service_type.ServiceType]

a service type descriptor with information describing the MLflow deployment service class

config MLFlowDeploymentConfig

service configuration

endpoint MLFlowDeploymentEndpoint

optional service endpoint

Source code in zenml/integrations/mlflow/services/mlflow_deployment.py
class MLFlowDeploymentService(LocalDaemonService, BaseDeploymentService):
    """MLflow deployment service used to start a local prediction server for MLflow models.

    Attributes:
        SERVICE_TYPE: a service type descriptor with information describing
            the MLflow deployment service class
        config: service configuration
        endpoint: optional service endpoint
    """

    SERVICE_TYPE = ServiceType(
        name="mlflow-deployment",
        type="model-serving",
        flavor="mlflow",
        description="MLflow prediction service",
    )

    config: MLFlowDeploymentConfig
    endpoint: MLFlowDeploymentEndpoint

    def __init__(
        self,
        config: Union[MLFlowDeploymentConfig, Dict[str, Any]],
        **attrs: Any,
    ) -> None:
        """Initialize the MLflow deployment service.

        Args:
            config: service configuration
            attrs: additional attributes to set on the service
        """
        # ensure that the endpoint is created before the service is initialized
        # TODO [ENG-700]: implement a service factory or builder for MLflow
        #   deployment services
        if (
            isinstance(config, MLFlowDeploymentConfig)
            and "endpoint" not in attrs
        ):
            if config.mlserver:
                prediction_url_path = MLSERVER_PREDICTION_URL_PATH
                healthcheck_uri_path = MLSERVER_HEALTHCHECK_URL_PATH
                use_head_request = False
            else:
                prediction_url_path = MLFLOW_PREDICTION_URL_PATH
                healthcheck_uri_path = MLFLOW_HEALTHCHECK_URL_PATH
                use_head_request = True

            endpoint = MLFlowDeploymentEndpoint(
                config=MLFlowDeploymentEndpointConfig(
                    protocol=ServiceEndpointProtocol.HTTP,
                    prediction_url_path=prediction_url_path,
                ),
                monitor=HTTPEndpointHealthMonitor(
                    config=HTTPEndpointHealthMonitorConfig(
                        healthcheck_uri_path=healthcheck_uri_path,
                        use_head_request=use_head_request,
                    )
                ),
            )
            attrs["endpoint"] = endpoint
        super().__init__(config=config, **attrs)

    def run(self) -> None:
        """Start the service."""
        logger.info(
            "Starting MLflow prediction service as blocking "
            "process... press CTRL+C once to stop it."
        )

        self.endpoint.prepare_for_start()
        try:
            backend_kwargs: Dict[str, Any] = {}
            serve_kwargs: Dict[str, Any] = {}
            mlflow_version = MLFLOW_VERSION.split(".")
            # MLflow version 1.26 introduces an additional mandatory
            # `timeout` argument to the `PyFuncBackend.serve` function
            if int(mlflow_version[1]) >= 26 or int(mlflow_version[0]) >= 2:
                serve_kwargs["timeout"] = None
            # Mlflow 2.0+ requires the env_manager to be set to "local"
            # to run the deploy the model on the local running environment
            if int(mlflow_version[0]) >= 2:
                backend_kwargs["env_manager"] = "local"
            backend = PyFuncBackend(
                config={},
                no_conda=True,
                workers=self.config.workers,
                install_mlflow=False,
                **backend_kwargs,
            )
            backend.serve(
                model_uri=self.config.model_uri,
                port=self.endpoint.status.port,
                host="localhost",
                enable_mlserver=self.config.mlserver,
                **serve_kwargs,
            )
        except KeyboardInterrupt:
            logger.info(
                "MLflow prediction service stopped. Resuming normal execution."
            )

    @property
    def prediction_url(self) -> Optional[str]:
        """Get the URI where the prediction service is answering requests.

        Returns:
            The URI where the prediction service can be contacted to process
            HTTP/REST inference requests, or None, if the service isn't running.
        """
        if not self.is_running:
            return None
        return self.endpoint.prediction_url

    def predict(self, request: "NDArray[Any]") -> "NDArray[Any]":
        """Make a prediction using the service.

        Args:
            request: a numpy array representing the request

        Returns:
            A numpy array representing the prediction returned by the service.

        Raises:
            Exception: if the service is not running
            ValueError: if the prediction endpoint is unknown.
        """
        if not self.is_running:
            raise Exception(
                "MLflow prediction service is not running. "
                "Please start the service before making predictions."
            )

        if self.endpoint.prediction_url is not None:
            response = requests.post(
                self.endpoint.prediction_url,
                json={"instances": request.tolist()},
            )
        else:
            raise ValueError("No endpoint known for prediction.")
        response.raise_for_status()
        if int(MLFLOW_VERSION.split(".")[0]) <= 1:
            return np.array(response.json())
        else:
            # Mlflow 2.0+ returns a dictionary with the predictions
            # under the "predictions" key
            return np.array(response.json()["predictions"])
prediction_url: Optional[str] property readonly

Get the URI where the prediction service is answering requests.

Returns:

Type Description
Optional[str]

The URI where the prediction service can be contacted to process HTTP/REST inference requests, or None, if the service isn't running.

__init__(self, config, **attrs) special

Initialize the MLflow deployment service.

Parameters:

Name Type Description Default
config Union[zenml.integrations.mlflow.services.mlflow_deployment.MLFlowDeploymentConfig, Dict[str, Any]]

service configuration

required
attrs Any

additional attributes to set on the service

{}
Source code in zenml/integrations/mlflow/services/mlflow_deployment.py
def __init__(
    self,
    config: Union[MLFlowDeploymentConfig, Dict[str, Any]],
    **attrs: Any,
) -> None:
    """Initialize the MLflow deployment service.

    Args:
        config: service configuration
        attrs: additional attributes to set on the service
    """
    # ensure that the endpoint is created before the service is initialized
    # TODO [ENG-700]: implement a service factory or builder for MLflow
    #   deployment services
    if (
        isinstance(config, MLFlowDeploymentConfig)
        and "endpoint" not in attrs
    ):
        if config.mlserver:
            prediction_url_path = MLSERVER_PREDICTION_URL_PATH
            healthcheck_uri_path = MLSERVER_HEALTHCHECK_URL_PATH
            use_head_request = False
        else:
            prediction_url_path = MLFLOW_PREDICTION_URL_PATH
            healthcheck_uri_path = MLFLOW_HEALTHCHECK_URL_PATH
            use_head_request = True

        endpoint = MLFlowDeploymentEndpoint(
            config=MLFlowDeploymentEndpointConfig(
                protocol=ServiceEndpointProtocol.HTTP,
                prediction_url_path=prediction_url_path,
            ),
            monitor=HTTPEndpointHealthMonitor(
                config=HTTPEndpointHealthMonitorConfig(
                    healthcheck_uri_path=healthcheck_uri_path,
                    use_head_request=use_head_request,
                )
            ),
        )
        attrs["endpoint"] = endpoint
    super().__init__(config=config, **attrs)
predict(self, request)

Make a prediction using the service.

Parameters:

Name Type Description Default
request NDArray[Any]

a numpy array representing the request

required

Returns:

Type Description
NDArray[Any]

A numpy array representing the prediction returned by the service.

Exceptions:

Type Description
Exception

if the service is not running

ValueError

if the prediction endpoint is unknown.

Source code in zenml/integrations/mlflow/services/mlflow_deployment.py
def predict(self, request: "NDArray[Any]") -> "NDArray[Any]":
    """Make a prediction using the service.

    Args:
        request: a numpy array representing the request

    Returns:
        A numpy array representing the prediction returned by the service.

    Raises:
        Exception: if the service is not running
        ValueError: if the prediction endpoint is unknown.
    """
    if not self.is_running:
        raise Exception(
            "MLflow prediction service is not running. "
            "Please start the service before making predictions."
        )

    if self.endpoint.prediction_url is not None:
        response = requests.post(
            self.endpoint.prediction_url,
            json={"instances": request.tolist()},
        )
    else:
        raise ValueError("No endpoint known for prediction.")
    response.raise_for_status()
    if int(MLFLOW_VERSION.split(".")[0]) <= 1:
        return np.array(response.json())
    else:
        # Mlflow 2.0+ returns a dictionary with the predictions
        # under the "predictions" key
        return np.array(response.json()["predictions"])
run(self)

Start the service.

Source code in zenml/integrations/mlflow/services/mlflow_deployment.py
def run(self) -> None:
    """Start the service."""
    logger.info(
        "Starting MLflow prediction service as blocking "
        "process... press CTRL+C once to stop it."
    )

    self.endpoint.prepare_for_start()
    try:
        backend_kwargs: Dict[str, Any] = {}
        serve_kwargs: Dict[str, Any] = {}
        mlflow_version = MLFLOW_VERSION.split(".")
        # MLflow version 1.26 introduces an additional mandatory
        # `timeout` argument to the `PyFuncBackend.serve` function
        if int(mlflow_version[1]) >= 26 or int(mlflow_version[0]) >= 2:
            serve_kwargs["timeout"] = None
        # Mlflow 2.0+ requires the env_manager to be set to "local"
        # to run the deploy the model on the local running environment
        if int(mlflow_version[0]) >= 2:
            backend_kwargs["env_manager"] = "local"
        backend = PyFuncBackend(
            config={},
            no_conda=True,
            workers=self.config.workers,
            install_mlflow=False,
            **backend_kwargs,
        )
        backend.serve(
            model_uri=self.config.model_uri,
            port=self.endpoint.status.port,
            host="localhost",
            enable_mlserver=self.config.mlserver,
            **serve_kwargs,
        )
    except KeyboardInterrupt:
        logger.info(
            "MLflow prediction service stopped. Resuming normal execution."
        )

steps special

Initialization of the MLflow standard interface steps.

mlflow_deployer

Implementation of the MLflow model deployer pipeline step.

MLFlowDeployerParameters (BaseParameters) pydantic-model

Model deployer step parameters for MLflow.

Attributes:

Name Type Description
model_name str

the name of the MLflow model logged in the MLflow artifact store for the current pipeline.

experiment_name Optional[str]

Name of the MLflow experiment in which the model was logged.

run_name Optional[str]

Name of the MLflow run in which the model was logged.

workers int

number of workers to use for the prediction service

mlserver bool

set to True to use the MLflow MLServer backend (see https://github.com/SeldonIO/MLServer). If False, the MLflow built-in scoring server will be used.

registry_model_name Optional[str]

the name of the model in the model registry

registry_model_version Optional[str]

the version of the model in the model registry

registry_model_stage Optional[zenml.model_registries.base_model_registry.ModelVersionStage]

the stage of the model in the model registry

replace_existing bool

whether to create a new deployment service or not, this parameter is only used when trying to deploy a model that is registered in the MLflow model registry. Default is True.

timeout int

the number of seconds to wait for the service to start/stop.

Source code in zenml/integrations/mlflow/steps/mlflow_deployer.py
class MLFlowDeployerParameters(BaseParameters):
    """Model deployer step parameters for MLflow.

    Attributes:
        model_name: the name of the MLflow model logged in the MLflow artifact
            store for the current pipeline.
        experiment_name: Name of the MLflow experiment in which the model was
            logged.
        run_name: Name of the MLflow run in which the model was logged.
        workers: number of workers to use for the prediction service
        mlserver: set to True to use the MLflow MLServer backend (see
            https://github.com/SeldonIO/MLServer). If False, the
            MLflow built-in scoring server will be used.
        registry_model_name: the name of the model in the model registry
        registry_model_version: the version of the model in the model registry
        registry_model_stage: the stage of the model in the model registry
        replace_existing: whether to create a new deployment service or not,
            this parameter is only used when trying to deploy a model that
            is registered in the MLflow model registry. Default is True.
        timeout: the number of seconds to wait for the service to start/stop.
    """

    model_name: str = "model"
    registry_model_name: Optional[str] = None
    registry_model_version: Optional[str] = None
    registry_model_stage: Optional[ModelVersionStage] = None
    experiment_name: Optional[str] = None
    run_name: Optional[str] = None
    replace_existing: bool = True
    workers: int = 1
    mlserver: bool = False
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT
mlflow_model_deployer_step (BaseStep)

Model deployer pipeline step for MLflow.

This step deploys a model logged in the MLflow artifact store to a deployment service. The user would typically use this step in a pipeline that deploys a model that was already registered in the MLflow model registr either manually or by using the mlflow_model_registry_step.

Parameters:

Name Type Description Default
deploy_decision

whether to deploy the model or not

required
model

the model artifact to deploy

required
params

parameters for the deployer step

required

Returns:

Type Description

MLflow deployment service

Exceptions:

Type Description
ValueError

if the MLflow experiment tracker is not found

PARAMETERS_CLASS (BaseParameters) pydantic-model

Model deployer step parameters for MLflow.

Attributes:

Name Type Description
model_name str

the name of the MLflow model logged in the MLflow artifact store for the current pipeline.

experiment_name Optional[str]

Name of the MLflow experiment in which the model was logged.

run_name Optional[str]

Name of the MLflow run in which the model was logged.

workers int

number of workers to use for the prediction service

mlserver bool

set to True to use the MLflow MLServer backend (see https://github.com/SeldonIO/MLServer). If False, the MLflow built-in scoring server will be used.

registry_model_name Optional[str]

the name of the model in the model registry

registry_model_version Optional[str]

the version of the model in the model registry

registry_model_stage Optional[zenml.model_registries.base_model_registry.ModelVersionStage]

the stage of the model in the model registry

replace_existing bool

whether to create a new deployment service or not, this parameter is only used when trying to deploy a model that is registered in the MLflow model registry. Default is True.

timeout int

the number of seconds to wait for the service to start/stop.

Source code in zenml/integrations/mlflow/steps/mlflow_deployer.py
class MLFlowDeployerParameters(BaseParameters):
    """Model deployer step parameters for MLflow.

    Attributes:
        model_name: the name of the MLflow model logged in the MLflow artifact
            store for the current pipeline.
        experiment_name: Name of the MLflow experiment in which the model was
            logged.
        run_name: Name of the MLflow run in which the model was logged.
        workers: number of workers to use for the prediction service
        mlserver: set to True to use the MLflow MLServer backend (see
            https://github.com/SeldonIO/MLServer). If False, the
            MLflow built-in scoring server will be used.
        registry_model_name: the name of the model in the model registry
        registry_model_version: the version of the model in the model registry
        registry_model_stage: the stage of the model in the model registry
        replace_existing: whether to create a new deployment service or not,
            this parameter is only used when trying to deploy a model that
            is registered in the MLflow model registry. Default is True.
        timeout: the number of seconds to wait for the service to start/stop.
    """

    model_name: str = "model"
    registry_model_name: Optional[str] = None
    registry_model_version: Optional[str] = None
    registry_model_stage: Optional[ModelVersionStage] = None
    experiment_name: Optional[str] = None
    run_name: Optional[str] = None
    replace_existing: bool = True
    workers: int = 1
    mlserver: bool = False
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT
entrypoint(deploy_decision, model, params) staticmethod

Model deployer pipeline step for MLflow.

This step deploys a model logged in the MLflow artifact store to a deployment service. The user would typically use this step in a pipeline that deploys a model that was already registered in the MLflow model registr either manually or by using the mlflow_model_registry_step.

Parameters:

Name Type Description Default
deploy_decision bool

whether to deploy the model or not

required
model UnmaterializedArtifact

the model artifact to deploy

required
params MLFlowDeployerParameters

parameters for the deployer step

required

Returns:

Type Description
MLFlowDeploymentService

MLflow deployment service

Exceptions:

Type Description
ValueError

if the MLflow experiment tracker is not found

Source code in zenml/integrations/mlflow/steps/mlflow_deployer.py
@step(enable_cache=False)
def mlflow_model_deployer_step(
    deploy_decision: bool,
    model: UnmaterializedArtifact,
    params: MLFlowDeployerParameters,
) -> MLFlowDeploymentService:
    """Model deployer pipeline step for MLflow.

    This step deploys a model logged in the MLflow artifact store to a
    deployment service. The user would typically use this step in a pipeline
    that deploys a model that was already registered in the MLflow model
    registr either manually or by using the `mlflow_model_registry_step`.

    Args:
        deploy_decision: whether to deploy the model or not
        model: the model artifact to deploy
        params: parameters for the deployer step

    Returns:
        MLflow deployment service

    Raises:
        ValueError: if the MLflow experiment tracker is not found
    """
    model_deployer = cast(
        MLFlowModelDeployer, MLFlowModelDeployer.get_active_model_deployer()
    )

    experiment_tracker = Client().active_stack.experiment_tracker

    if not isinstance(experiment_tracker, MLFlowExperimentTracker):
        raise ValueError(
            "MLflow model deployer step requires an MLflow experiment "
            "tracker. Please add an MLflow experiment tracker to your "
            "stack."
        )

    # get pipeline name, step name and run id
    step_env = cast(StepEnvironment, Environment()[STEP_ENVIRONMENT_NAME])
    pipeline_name = step_env.pipeline_name
    run_name = step_env.run_name
    step_name = step_env.step_name

    # Configure Mlflow so the client points to the correct store
    experiment_tracker.configure_mlflow()
    client = MlflowClient()
    mlflow_run_id = experiment_tracker.get_run_id(
        experiment_name=params.experiment_name or pipeline_name,
        run_name=params.run_name or run_name,
    )

    model_uri = ""
    if mlflow_run_id and client.list_artifacts(
        mlflow_run_id, params.model_name
    ):
        model_uri = artifact_utils.get_artifact_uri(
            run_id=mlflow_run_id, artifact_path=params.model_name
        )

    # fetch existing services with same pipeline name, step name and model name
    existing_services = model_deployer.find_model_server(
        pipeline_name=pipeline_name,
        pipeline_step_name=step_name,
        model_name=params.model_name,
    )

    # create a config for the new model service
    predictor_cfg = MLFlowDeploymentConfig(
        model_name=params.model_name or "",
        model_uri=model_uri,
        workers=params.workers,
        mlserver=params.mlserver,
        registry_model_name=params.registry_model_name or "",
        registry_model_version=params.registry_model_version or "",
        pipeline_name=pipeline_name,
        run_name=run_name,
        pipeline_run_id=run_name,
        pipeline_step_name=step_name,
        timeout=params.timeout,
    )

    # Creating a new service with inactive state and status by default
    service = MLFlowDeploymentService(predictor_cfg)
    if existing_services:
        service = cast(MLFlowDeploymentService, existing_services[0])

    # check for conditions to deploy the model
    if not model_uri:
        # an MLflow model was not trained in the current run, so we simply reuse
        # the currently running service created for the same model, if any
        if not existing_services:
            logger.warning(
                f"An MLflow model with name `{params.model_name}` was not "
                f"logged in the current pipeline run and no running MLflow "
                f"model server was found. Please ensure that your pipeline "
                f"includes a step with a MLflow experiment configured that "
                "trains a model and logs it to MLflow. This could also happen "
                "if the current pipeline run did not log an MLflow model  "
                f"because the training step was cached."
            )
            # return an inactive service just because we have to return
            # something
            return service
        logger.info(
            f"An MLflow model with name `{params.model_name}` was not "
            f"trained in the current pipeline run. Reusing the existing "
            f"MLflow model server."
        )
        if not service.is_running:
            service.start(params.timeout)

        # return the existing service
        return service

    # even when the deploy decision is negative, if an existing model server
    # is not running for this pipeline/step, we still have to serve the
    # current model, to ensure that a model server is available at all times
    if not deploy_decision and existing_services:
        logger.info(
            f"Skipping model deployment because the model quality does not "
            f"meet the criteria. Reusing last model server deployed by step "
            f"'{step_name}' and pipeline '{pipeline_name}' for model "
            f"'{params.model_name}'..."
        )
        # even when the deploy decision is negative, we still need to start
        # the previous model server if it is no longer running, to ensure
        # that a model server is available at all times
        if not service.is_running:
            service.start(params.timeout)
        return service

    # create a new model deployment and replace an old one if it exists
    new_service = cast(
        MLFlowDeploymentService,
        model_deployer.deploy_model(
            replace=True,
            config=predictor_cfg,
            timeout=params.timeout,
        ),
    )

    logger.info(
        f"MLflow deployment service started and reachable at:\n"
        f"    {new_service.prediction_url}\n"
    )

    return new_service
mlflow_model_registry_deployer_step (BaseStep)

Model deployer pipeline step for MLflow.

Parameters:

Name Type Description Default
params

parameters for the deployer step

required

Returns:

Type Description

MLflow deployment service

Exceptions:

Type Description
ValueError

if the registry_model_name is not provided

ValueError

if the registry_model_version or registry_model_stage is not provided

ValueError

if No MLflow experiment tracker is found in the current active stack

LookupError

if no model version is found in the MLflow model registry.

PARAMETERS_CLASS (BaseParameters) pydantic-model

Model deployer step parameters for MLflow.

Attributes:

Name Type Description
model_name str

the name of the MLflow model logged in the MLflow artifact store for the current pipeline.

experiment_name Optional[str]

Name of the MLflow experiment in which the model was logged.

run_name Optional[str]

Name of the MLflow run in which the model was logged.

workers int

number of workers to use for the prediction service

mlserver bool

set to True to use the MLflow MLServer backend (see https://github.com/SeldonIO/MLServer). If False, the MLflow built-in scoring server will be used.

registry_model_name Optional[str]

the name of the model in the model registry

registry_model_version Optional[str]

the version of the model in the model registry

registry_model_stage Optional[zenml.model_registries.base_model_registry.ModelVersionStage]

the stage of the model in the model registry

replace_existing bool

whether to create a new deployment service or not, this parameter is only used when trying to deploy a model that is registered in the MLflow model registry. Default is True.

timeout int

the number of seconds to wait for the service to start/stop.

Source code in zenml/integrations/mlflow/steps/mlflow_deployer.py
class MLFlowDeployerParameters(BaseParameters):
    """Model deployer step parameters for MLflow.

    Attributes:
        model_name: the name of the MLflow model logged in the MLflow artifact
            store for the current pipeline.
        experiment_name: Name of the MLflow experiment in which the model was
            logged.
        run_name: Name of the MLflow run in which the model was logged.
        workers: number of workers to use for the prediction service
        mlserver: set to True to use the MLflow MLServer backend (see
            https://github.com/SeldonIO/MLServer). If False, the
            MLflow built-in scoring server will be used.
        registry_model_name: the name of the model in the model registry
        registry_model_version: the version of the model in the model registry
        registry_model_stage: the stage of the model in the model registry
        replace_existing: whether to create a new deployment service or not,
            this parameter is only used when trying to deploy a model that
            is registered in the MLflow model registry. Default is True.
        timeout: the number of seconds to wait for the service to start/stop.
    """

    model_name: str = "model"
    registry_model_name: Optional[str] = None
    registry_model_version: Optional[str] = None
    registry_model_stage: Optional[ModelVersionStage] = None
    experiment_name: Optional[str] = None
    run_name: Optional[str] = None
    replace_existing: bool = True
    workers: int = 1
    mlserver: bool = False
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT
entrypoint(params) staticmethod

Model deployer pipeline step for MLflow.

Parameters:

Name Type Description Default
params MLFlowDeployerParameters

parameters for the deployer step

required

Returns:

Type Description
MLFlowDeploymentService

MLflow deployment service

Exceptions:

Type Description
ValueError

if the registry_model_name is not provided

ValueError

if the registry_model_version or registry_model_stage is not provided

ValueError

if No MLflow experiment tracker is found in the current active stack

LookupError

if no model version is found in the MLflow model registry.

Source code in zenml/integrations/mlflow/steps/mlflow_deployer.py
@step(enable_cache=False)
def mlflow_model_registry_deployer_step(
    params: MLFlowDeployerParameters,
) -> MLFlowDeploymentService:
    """Model deployer pipeline step for MLflow.

    Args:
        params: parameters for the deployer step

    Returns:
        MLflow deployment service

    Raises:
        ValueError: if the registry_model_name is not provided
        ValueError: if the registry_model_version or registry_model_stage is not provided
        ValueError: if No MLflow experiment tracker is found in the current active stack
        LookupError: if no model version is found in the MLflow model registry.
    """
    if not params.registry_model_name:
        raise ValueError(
            "registry_model_name must be provided to the MLflow"
            "model registry deployer step."
        )
    elif not params.registry_model_version and not params.registry_model_stage:
        raise ValueError(
            "Either registry_model_version or registry_model_stage must"
            "be provided in addition to registry_model_name to the MLflow"
            "model registry deployer step. Since the"
            "mlflow_model_registry_deployer_step is used in conjunction with"
            "the mlflow_model_registry."
        )

    model_deployer = cast(
        MLFlowModelDeployer, MLFlowModelDeployer.get_active_model_deployer()
    )

    # fetch the MLflow model registry
    model_registry = Client().active_stack.model_registry
    if not isinstance(model_registry, MLFlowModelRegistry):
        raise ValueError(
            "The MLflow model registry step can only be used with an "
            "MLflow model registry."
        )

    # fetch the model version
    if params.registry_model_version:
        try:
            model_version = model_registry.get_model_version(
                name=params.registry_model_name,
                version=params.registry_model_version,
            )
        except KeyError:
            model_version = None
    elif params.registry_model_stage:
        model_version = model_registry.get_latest_model_version(
            name=params.registry_model_name,
            stage=params.registry_model_stage,
        )
    if not model_version:
        raise LookupError(
            f"No Model Version found for model name "
            f"{params.registry_model_name} and version "
            f"{params.registry_model_version} or stage "
            f"{params.registry_model_stage}"
        )
    if model_version.model_format != MLFLOW_MODEL_FORMAT:
        raise ValueError(
            f"Model version {model_version.version} of model "
            f"{model_version.registered_model.name} is not an MLflow model."
            f"Only MLflow models can be deployed with the MLflow deployer "
            f"using this step."
        )
    # fetch existing services with same pipeline name, step name and model name
    existing_services = (
        model_deployer.find_model_server(
            registry_model_name=model_version.registered_model.name,
        )
        if params.replace_existing
        else None
    )

    # create a config for the new model service
    metadata = model_version.metadata or ModelRegistryModelMetadata()
    predictor_cfg = MLFlowDeploymentConfig(
        model_name=params.model_name or "",
        model_uri=model_version.model_source_uri,
        registry_model_name=model_version.registered_model.name,
        registry_model_version=model_version.version,
        registry_model_stage=model_version.stage.value,
        workers=params.workers,
        mlserver=params.mlserver,
        pipeline_name=metadata.zenml_pipeline_name or "",
        run_name=metadata.zenml_run_name or "",
        pipeline_step_name=metadata.zenml_step_name or "",
        timeout=params.timeout,
    )

    # Creating a new service with inactive state and status by default
    service = MLFlowDeploymentService(predictor_cfg)
    if existing_services:
        service = cast(MLFlowDeploymentService, existing_services[0])

    # check if the model is already deployed but not running
    if existing_services and not service.is_running:
        service.start(params.timeout)
        return service

    # create a new model deployment and replace an old one if it exists
    new_service = cast(
        MLFlowDeploymentService,
        model_deployer.deploy_model(
            replace=True,
            config=predictor_cfg,
            timeout=params.timeout,
        ),
    )

    logger.info(
        f"MLflow deployment service started and reachable at:\n"
        f"    {new_service.prediction_url}\n"
    )

    return new_service
mlflow_deployer_step(enable_cache=True, name=None)

Creates a pipeline step to deploy a given ML model with a local MLflow prediction server.

The returned step can be used in a pipeline to implement continuous deployment for an MLflow model.

Parameters:

Name Type Description Default
enable_cache bool

Specify whether caching is enabled for this step. If no value is passed, caching is enabled by default

True
name Optional[str]

Name of the step.

None

Returns:

Type Description
Type[zenml.steps.base_step.BaseStep]

an MLflow model deployer pipeline step

Source code in zenml/integrations/mlflow/steps/mlflow_deployer.py
def mlflow_deployer_step(
    enable_cache: bool = True,
    name: Optional[str] = None,
) -> Type[BaseStep]:
    """Creates a pipeline step to deploy a given ML model with a local MLflow prediction server.

    The returned step can be used in a pipeline to implement continuous
    deployment for an MLflow model.

    Args:
        enable_cache: Specify whether caching is enabled for this step. If no
            value is passed, caching is enabled by default
        name: Name of the step.

    Returns:
        an MLflow model deployer pipeline step
    """
    logger.warning(
        "The `mlflow_deployer_step` function is deprecated. Please "
        "use the built-in `mlflow_model_deployer_step` step instead."
    )
    return mlflow_model_deployer_step

mlflow_registry

Implementation of the MLflow model registration pipeline step.

MLFlowRegistryParameters (BaseParameters) pydantic-model

Model registry step parameters for MLflow.

Parameters:

Name Type Description Default
name

Name of the registered model.

required
version

Version of the registered model.

required
trained_model_name

Name of the model to be deployed.

required
experiment_name

Name of the experiment to be used for the run.

required
run_name

Name of the run to be created.

required
run_id

ID of the run to be used.

required
model_source_uri

URI of the model source. If not provided, the model will be fetched from the MLflow tracking server.

required
description

Description of the model.

required
metadata

Metadata of the model version to be added to the model registry.

required
Source code in zenml/integrations/mlflow/steps/mlflow_registry.py
class MLFlowRegistryParameters(BaseParameters):
    """Model registry step parameters for MLflow.

    Args:
        name: Name of the registered model.
        version: Version of the registered model.
        trained_model_name: Name of the model to be deployed.
        experiment_name: Name of the experiment to be used for the run.
        run_name: Name of the run to be created.
        run_id: ID of the run to be used.
        model_source_uri: URI of the model source. If not provided, the model
            will be fetched from the MLflow tracking server.
        description: Description of the model.
        metadata: Metadata of the model version to be added to the model registry.
    """

    name: str
    version: Optional[str] = None
    trained_model_name: Optional[str] = "model"
    model_source_uri: Optional[str] = None
    experiment_name: Optional[str] = None
    run_name: Optional[str] = None
    run_id: Optional[str] = None
    description: Optional[str] = None
    metadata: ModelRegistryModelMetadata = Field(
        default_factory=ModelRegistryModelMetadata
    )
mlflow_register_model_step (BaseStep)

MLflow model registry step.

Parameters:

Name Type Description Default
model

Model to be registered, This is not used in the step, but is required to trigger the step when the model is trained.

required
params

Parameters for the step.

required

Exceptions:

Type Description
ValueError

If the model registry is not an MLflow model registry.

ValueError

If the experiment tracker is not an MLflow experiment tracker.

RuntimeError

If no model source URI is provided and no model is found.

RuntimeError

If no run ID is provided and no run is found.

PARAMETERS_CLASS (BaseParameters) pydantic-model

Model registry step parameters for MLflow.

Parameters:

Name Type Description Default
name

Name of the registered model.

required
version

Version of the registered model.

required
trained_model_name

Name of the model to be deployed.

required
experiment_name

Name of the experiment to be used for the run.

required
run_name

Name of the run to be created.

required
run_id

ID of the run to be used.

required
model_source_uri

URI of the model source. If not provided, the model will be fetched from the MLflow tracking server.

required
description

Description of the model.

required
metadata

Metadata of the model version to be added to the model registry.

required
Source code in zenml/integrations/mlflow/steps/mlflow_registry.py
class MLFlowRegistryParameters(BaseParameters):
    """Model registry step parameters for MLflow.

    Args:
        name: Name of the registered model.
        version: Version of the registered model.
        trained_model_name: Name of the model to be deployed.
        experiment_name: Name of the experiment to be used for the run.
        run_name: Name of the run to be created.
        run_id: ID of the run to be used.
        model_source_uri: URI of the model source. If not provided, the model
            will be fetched from the MLflow tracking server.
        description: Description of the model.
        metadata: Metadata of the model version to be added to the model registry.
    """

    name: str
    version: Optional[str] = None
    trained_model_name: Optional[str] = "model"
    model_source_uri: Optional[str] = None
    experiment_name: Optional[str] = None
    run_name: Optional[str] = None
    run_id: Optional[str] = None
    description: Optional[str] = None
    metadata: ModelRegistryModelMetadata = Field(
        default_factory=ModelRegistryModelMetadata
    )
entrypoint(model, params) staticmethod

MLflow model registry step.

Parameters:

Name Type Description Default
model UnmaterializedArtifact

Model to be registered, This is not used in the step, but is required to trigger the step when the model is trained.

required
params MLFlowRegistryParameters

Parameters for the step.

required

Exceptions:

Type Description
ValueError

If the model registry is not an MLflow model registry.

ValueError

If the experiment tracker is not an MLflow experiment tracker.

RuntimeError

If no model source URI is provided and no model is found.

RuntimeError

If no run ID is provided and no run is found.

Source code in zenml/integrations/mlflow/steps/mlflow_registry.py
@step(enable_cache=True)
def mlflow_register_model_step(
    model: UnmaterializedArtifact,
    params: MLFlowRegistryParameters,
) -> None:
    """MLflow model registry step.

    Args:
        model: Model to be registered, This is not used in the step, but is
            required to trigger the step when the model is trained.
        params: Parameters for the step.

    Raises:
        ValueError: If the model registry is not an MLflow model registry.
        ValueError: If the experiment tracker is not an MLflow experiment tracker.
        RuntimeError: If no model source URI is provided and no model is found.
        RuntimeError: If no run ID is provided and no run is found.
    """
    # get the experiment tracker and check if it is an MLflow experiment tracker.
    experiment_tracker = Client().active_stack.experiment_tracker
    if not isinstance(experiment_tracker, MLFlowExperimentTracker):
        raise ValueError(
            "The MLflow model registry step can only be used with an "
            "MLflow experiment tracker. Please add an MLflow experiment "
            "tracker to your stack."
        )

    # fetch the MLflow model registry
    model_registry = Client().active_stack.model_registry
    if not isinstance(model_registry, MLFlowModelRegistry):
        raise ValueError(
            "The MLflow model registry step can only be used with an "
            "MLflow model registry."
        )

    # get pipeline name, step name and run id
    step_env = cast(StepEnvironment, Environment()[STEP_ENVIRONMENT_NAME])
    pipeline_name = step_env.pipeline_name
    run_name = step_env.run_name
    pipeline_run_uuid = str(step_env.step_run_info.run_id)
    zenml_workspace = str(model_registry.workspace)

    # Get MLflow run ID either from params or from experiment tracker using
    # pipeline name and run name
    mlflow_run_id = params.run_id or experiment_tracker.get_run_id(
        experiment_name=params.experiment_name or pipeline_name,
        run_name=params.run_name or run_name,
    )
    # If no value was set at all, raise an error
    if not mlflow_run_id:
        raise RuntimeError(
            f"Could not find MLflow run for experiment {pipeline_name} "
            f"and run {run_name}."
        )

    # Get MLflow client
    client = model_registry.mlflow_client
    # Lastly, check if the run ID is valid
    try:
        client.get_run(run_id=mlflow_run_id).info.run_id
    except Exception:
        raise RuntimeError(
            f"Could not find MLflow run with ID {mlflow_run_id}."
        )

    # Set model source URI
    model_source_uri = params.model_source_uri or None

    # Check if the run ID have a model artifact if no model source URI is set.
    if not params.model_source_uri and client.list_artifacts(
        mlflow_run_id, params.trained_model_name
    ):
        model_source_uri = artifact_utils.get_artifact_uri(
            run_id=mlflow_run_id, artifact_path=params.trained_model_name
        )
    if not model_source_uri:
        raise RuntimeError(
            "No model source URI provided or no model found in the "
            "MLflow tracking server for the given inputs."
        )

    # Check metadata
    if params.metadata.zenml_version is None:
        params.metadata.zenml_version = __version__
    if params.metadata.zenml_pipeline_name is None:
        params.metadata.zenml_pipeline_name = pipeline_name
    if params.metadata.zenml_run_name is None:
        params.metadata.zenml_run_name = run_name
    if params.metadata.zenml_pipeline_run_uuid is None:
        params.metadata.zenml_pipeline_run_uuid = pipeline_run_uuid
    if params.metadata.zenml_workspace is None:
        params.metadata.zenml_workspace = zenml_workspace

    # Register model version
    model_version = model_registry.register_model_version(
        name=params.name,
        version=params.version or "1",
        model_source_uri=model_source_uri,
        description=params.description,
        metadata=params.metadata,
    )

    logger.info(
        f"Registered model {params.name} "
        f"with version {model_version.version} "
        f"from source {model_source_uri}."
    )