Skip to content

Label Studio

zenml.integrations.label_studio special

Initialization of the Label Studio integration.

LabelStudioIntegration (Integration)

Definition of Label Studio integration for ZenML.

Source code in zenml/integrations/label_studio/__init__.py
class LabelStudioIntegration(Integration):
    """Definition of Label Studio integration for ZenML."""

    NAME = LABEL_STUDIO
    REQUIREMENTS = [
        "label-studio>=1.6.0,<=1.9.1",
        "label-studio-sdk>=0.0.17,<=0.0.32",
    ]

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Label Studio integration.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.label_studio.flavors import (
            LabelStudioAnnotatorFlavor,
        )

        return [LabelStudioAnnotatorFlavor]

flavors() classmethod

Declare the stack component flavors for the Label Studio integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/label_studio/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Label Studio integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.label_studio.flavors import (
        LabelStudioAnnotatorFlavor,
    )

    return [LabelStudioAnnotatorFlavor]

annotators special

Initialization of the Label Studio annotators submodule.

label_studio_annotator

Implementation of the Label Studio annotation integration.

LabelStudioAnnotator (BaseAnnotator, AuthenticationMixin)

Class to interact with the Label Studio annotation interface.

Source code in zenml/integrations/label_studio/annotators/label_studio_annotator.py
class LabelStudioAnnotator(BaseAnnotator, AuthenticationMixin):
    """Class to interact with the Label Studio annotation interface."""

    @property
    def config(self) -> LabelStudioAnnotatorConfig:
        """Returns the `LabelStudioAnnotatorConfig` config.

        Returns:
            The configuration.
        """
        return cast(LabelStudioAnnotatorConfig, self._config)

    def get_url(self) -> str:
        """Gets the top-level URL of the annotation interface.

        Returns:
            The URL of the annotation interface.
        """
        return f"{self.config.instance_url}:{self.config.port}"

    def get_url_for_dataset(self, dataset_name: str) -> str:
        """Gets the URL of the annotation interface for the given dataset.

        Args:
            dataset_name: The name of the dataset.

        Returns:
            The URL of the annotation interface.
        """
        project_id = self.get_id_from_name(dataset_name)
        return f"{self.get_url()}/projects/{project_id}/"

    def get_id_from_name(self, dataset_name: str) -> Optional[int]:
        """Gets the ID of the given dataset.

        Args:
            dataset_name: The name of the dataset.

        Returns:
            The ID of the dataset.
        """
        projects = self.get_datasets()
        for project in projects:
            if project.get_params()["title"] == dataset_name:
                return cast(int, project.get_params()["id"])
        return None

    def get_datasets(self) -> List[Any]:
        """Gets the datasets currently available for annotation.

        Returns:
            A list of datasets.
        """
        datasets = self._get_client().get_projects()
        return cast(List[Any], datasets)

    def get_dataset_names(self) -> List[str]:
        """Gets the names of the datasets.

        Returns:
            A list of dataset names.
        """
        return [
            dataset.get_params()["title"] for dataset in self.get_datasets()
        ]

    def get_dataset_stats(self, dataset_name: str) -> Tuple[int, int]:
        """Gets the statistics of the given dataset.

        Args:
            dataset_name: The name of the dataset.

        Returns:
            A tuple containing (labeled_task_count, unlabeled_task_count) for
                the dataset.

        Raises:
            IndexError: If the dataset does not exist.
        """
        for project in self.get_datasets():
            if dataset_name in project.get_params()["title"]:
                labeled_task_count = len(project.get_labeled_tasks())
                unlabeled_task_count = len(project.get_unlabeled_tasks())
                return (labeled_task_count, unlabeled_task_count)
        raise IndexError(
            f"Dataset {dataset_name} not found. Please use "
            f"`zenml annotator dataset list` to list all available datasets."
        )

    def launch(self, url: Optional[str]) -> None:
        """Launches the annotation interface.

        Args:
            url: The URL of the annotation interface.
        """
        if not url:
            url = self.get_url()
        if self._connection_available():
            webbrowser.open(url, new=1, autoraise=True)
        else:
            logger.warning(
                "Could not launch annotation interface"
                "because the connection could not be established."
            )

    def _get_client(self) -> Client:
        """Gets Label Studio client.

        Returns:
            Label Studio client.

        Raises:
            ValueError: when unable to access the Label Studio API key.
        """
        secret = self.get_authentication_secret(ArbitrarySecretSchema)
        if not secret:
            raise ValueError(
                "Unable to access predefined secret to access Label Studio API key."
            )
        api_key = secret.content.get("api_key")
        if not api_key:
            raise ValueError(
                "Unable to access Label Studio API key from secret."
            )
        return Client(url=self.get_url(), api_key=api_key)

    def _connection_available(self) -> bool:
        """Checks if the connection to the annotation server is available.

        Returns:
            True if the connection is available, False otherwise.
        """
        try:
            result = self._get_client().check_connection()
            return result.get("status") == "UP"  # type: ignore[no-any-return]
        # TODO: [HIGH] refactor to use a more specific exception
        except Exception:
            logger.error(
                "Connection error: No connection was able to be established to the Label Studio backend."
            )
            return False

    def add_dataset(self, **kwargs: Any) -> Any:
        """Registers a dataset for annotation.

        Args:
            **kwargs: Additional keyword arguments to pass to the Label Studio client.

        Returns:
            A Label Studio Project object.

        Raises:
            ValueError: if 'dataset_name' and 'label_config' aren't provided.
        """
        dataset_name = kwargs.get("dataset_name")
        label_config = kwargs.get("label_config")
        if not dataset_name:
            raise ValueError("`dataset_name` keyword argument is required.")
        elif not label_config:
            raise ValueError("`label_config` keyword argument is required.")

        return self._get_client().start_project(
            title=dataset_name,
            label_config=label_config,
        )

    def delete_dataset(self, **kwargs: Any) -> None:
        """Deletes a dataset from the annotation interface.

        Args:
            **kwargs: Additional keyword arguments to pass to the Label Studio
                client.

        Raises:
            ValueError: If the dataset name is not provided or if the dataset
                does not exist.
        """
        ls = self._get_client()
        dataset_name = kwargs.get("dataset_name")
        if not dataset_name:
            raise ValueError("`dataset_name` keyword argument is required.")

        dataset_id = self.get_id_from_name(dataset_name)
        if not dataset_id:
            raise ValueError(
                f"Dataset name '{dataset_name}' has no corresponding `dataset_id` in Label Studio."
            )
        ls.delete_project(dataset_id)

    def get_dataset(self, **kwargs: Any) -> Any:
        """Gets the dataset with the given name.

        Args:
            **kwargs: Additional keyword arguments to pass to the Label Studio client.

        Returns:
            The LabelStudio Dataset object (a 'Project') for the given name.

        Raises:
            ValueError: If the dataset name is not provided or if the dataset
                does not exist.
        """
        # TODO: check for and raise error if client unavailable
        dataset_name = kwargs.get("dataset_name")
        if not dataset_name:
            raise ValueError("`dataset_name` keyword argument is required.")

        dataset_id = self.get_id_from_name(dataset_name)
        if not dataset_id:
            raise ValueError(
                f"Dataset name '{dataset_name}' has no corresponding `dataset_id` in Label Studio."
            )
        return self._get_client().get_project(dataset_id)

    def get_converted_dataset(
        self, dataset_name: str, output_format: str
    ) -> Dict[Any, Any]:
        """Extract annotated tasks in a specific converted format.

        Args:
            dataset_name: Id of the dataset.
            output_format: Output format.

        Returns:
            A dictionary containing the converted dataset.
        """
        project = self.get_dataset(dataset_name=dataset_name)
        return project.export_tasks(export_type=output_format)  # type: ignore[no-any-return]

    def get_labeled_data(self, **kwargs: Any) -> Any:
        """Gets the labeled data for the given dataset.

        Args:
            **kwargs: Additional keyword arguments to pass to the Label Studio client.

        Returns:
            The labeled data.

        Raises:
            ValueError: If the dataset name is not provided or if the dataset
                does not exist.
        """
        dataset_name = kwargs.get("dataset_name")
        if not dataset_name:
            raise ValueError("`dataset_name` keyword argument is required.")

        dataset_id = self.get_id_from_name(dataset_name)
        if not dataset_id:
            raise ValueError(
                f"Dataset name '{dataset_name}' has no corresponding `dataset_id` in Label Studio."
            )
        return self._get_client().get_project(dataset_id).get_labeled_tasks()

    def get_unlabeled_data(self, **kwargs: str) -> Any:
        """Gets the unlabeled data for the given dataset.

        Args:
            **kwargs: Additional keyword arguments to pass to the Label Studio client.

        Returns:
            The unlabeled data.

        Raises:
            ValueError: If the dataset name is not provided.
        """
        dataset_name = kwargs.get("dataset_name")
        if not dataset_name:
            raise ValueError("`dataset_name` keyword argument is required.")

        dataset_id = self.get_id_from_name(dataset_name)
        if not dataset_id:
            raise ValueError(
                f"Dataset name '{dataset_name}' has no corresponding `dataset_id` in Label Studio."
            )
        return self._get_client().get_project(dataset_id).get_unlabeled_tasks()

    def register_dataset_for_annotation(
        self,
        label_config: str,
        dataset_name: str,
    ) -> Any:
        """Registers a dataset for annotation.

        Args:
            label_config: The label config to use for the annotation interface.
            dataset_name: Name of the dataset to register.

        Returns:
            A Label Studio Project object.
        """
        project_id = self.get_id_from_name(dataset_name)
        if project_id:
            dataset = self._get_client().get_project(project_id)
        else:
            dataset = self.add_dataset(
                dataset_name=dataset_name,
                label_config=label_config,
            )

        return dataset

    def _get_azure_import_storage_sources(
        self, dataset_id: int
    ) -> List[Dict[str, Any]]:
        """Gets a list of all Azure import storage sources.

        Args:
            dataset_id: Id of the dataset.

        Returns:
            A list of Azure import storage sources.

        Raises:
            ConnectionError: If the connection to the Label Studio backend is unavailable.
        """
        # TODO: check if client actually is connected etc
        query_url = f"/api/storages/azure?project={dataset_id}"
        response = self._get_client().make_request(method="GET", url=query_url)
        if response.status_code == 200:
            return cast(List[Dict[str, Any]], response.json())
        else:
            raise ConnectionError(
                f"Unable to get list of import storage sources. Client raised HTTP error {response.status_code}."
            )

    def _get_gcs_import_storage_sources(
        self, dataset_id: int
    ) -> List[Dict[str, Any]]:
        """Gets a list of all Google Cloud Storage import storage sources.

        Args:
            dataset_id: Id of the dataset.

        Returns:
            A list of Google Cloud Storage import storage sources.

        Raises:
            ConnectionError: If the connection to the Label Studio backend is unavailable.
        """
        # TODO: check if client actually is connected etc
        query_url = f"/api/storages/gcs?project={dataset_id}"
        response = self._get_client().make_request(method="GET", url=query_url)
        if response.status_code == 200:
            return cast(List[Dict[str, Any]], response.json())
        else:
            raise ConnectionError(
                f"Unable to get list of import storage sources. Client raised HTTP error {response.status_code}."
            )

    def _get_s3_import_storage_sources(
        self, dataset_id: int
    ) -> List[Dict[str, Any]]:
        """Gets a list of all AWS S3 import storage sources.

        Args:
            dataset_id: Id of the dataset.

        Returns:
            A list of AWS S3 import storage sources.

        Raises:
            ConnectionError: If the connection to the Label Studio backend is unavailable.
        """
        # TODO: check if client actually is connected etc
        query_url = f"/api/storages/s3?project={dataset_id}"
        response = self._get_client().make_request(method="GET", url=query_url)
        if response.status_code == 200:
            return cast(List[Dict[str, Any]], response.json())
        else:
            raise ConnectionError(
                f"Unable to get list of import storage sources. Client raised HTTP error {response.status_code}."
            )

    def _storage_source_already_exists(
        self,
        uri: str,
        params: LabelStudioDatasetSyncParameters,
        dataset: Project,
    ) -> bool:
        """Returns whether a storage source already exists.

        Args:
            uri: URI of the storage source.
            params: Parameters for the dataset.
            dataset: Label Studio dataset.

        Returns:
            True if the storage source already exists, False otherwise.

        Raises:
            NotImplementedError: If the storage source type is not supported.
        """
        # TODO: check we are already connected
        dataset_id = int(dataset.get_params()["id"])
        if params.storage_type == "azure":
            storage_sources = self._get_azure_import_storage_sources(
                dataset_id
            )
        elif params.storage_type == "gcs":
            storage_sources = self._get_gcs_import_storage_sources(dataset_id)
        elif params.storage_type == "s3":
            storage_sources = self._get_s3_import_storage_sources(dataset_id)
        else:
            raise NotImplementedError(
                f"Storage type '{params.storage_type}' not implemented."
            )
        return any(
            (
                source.get("presign") == params.presign
                and source.get("bucket") == uri
                and source.get("regex_filter") == params.regex_filter
                and source.get("use_blob_urls") == params.use_blob_urls
                and source.get("title") == dataset.get_params()["title"]
                and source.get("description") == params.description
                and source.get("presign_ttl") == params.presign_ttl
                and source.get("project") == dataset_id
            )
            for source in storage_sources
        )

    def get_parsed_label_config(self, dataset_id: int) -> Dict[str, Any]:
        """Returns the parsed Label Studio label config for a dataset.

        Args:
            dataset_id: Id of the dataset.

        Returns:
            A dictionary containing the parsed label config.

        Raises:
            ValueError: If no dataset is found for the given id.
        """
        # TODO: check if client actually is connected etc
        dataset = self._get_client().get_project(dataset_id)
        if dataset:
            return cast(Dict[str, Any], dataset.parsed_label_config)
        raise ValueError("No dataset found for the given id.")

    def populate_artifact_store_parameters(
        self,
        params: LabelStudioDatasetSyncParameters,
        artifact_store: BaseArtifactStore,
    ) -> None:
        """Populate the dataset sync parameters with the artifact store credentials.

        Args:
            params: The dataset sync parameters.
            artifact_store: The active artifact store.

        Raises:
            RuntimeError: if the artifact store credentials cannot be fetched.
        """
        if artifact_store.flavor == "s3":
            from zenml.integrations.s3.artifact_stores import S3ArtifactStore

            assert isinstance(artifact_store, S3ArtifactStore)

            params.storage_type = "s3"

            (
                aws_access_key_id,
                aws_secret_access_key,
                aws_session_token,
            ) = artifact_store.get_credentials()

            if aws_access_key_id and aws_secret_access_key:
                # Convert the credentials into the format expected by Label
                # Studio
                params.aws_access_key_id = aws_access_key_id
                params.aws_secret_access_key = aws_secret_access_key
                params.aws_session_token = aws_session_token

                if artifact_store.config.client_kwargs:
                    if "endpoint_url" in artifact_store.config.client_kwargs:
                        params.s3_endpoint = (
                            artifact_store.config.client_kwargs["endpoint_url"]
                        )
                    if "region_name" in artifact_store.config.client_kwargs:
                        params.s3_region_name = str(
                            artifact_store.config.client_kwargs["region_name"]
                        )

                return

            raise RuntimeError(
                "No credentials are configured for the active S3 artifact "
                "store. The Label Studio annotator needs explicit credentials "
                "to be configured for your artifact store to sync data "
                "artifacts."
            )

        elif artifact_store.flavor == "gcp":
            from zenml.integrations.gcp.artifact_stores import GCPArtifactStore

            assert isinstance(artifact_store, GCPArtifactStore)

            params.storage_type = "gcs"

            gcp_credentials = artifact_store.get_credentials()

            if gcp_credentials:
                # Save the credentials to a file in secure location, because
                # Label Studio will need to read it from a file
                secret_folder = Path(
                    GlobalConfiguration().config_directory,
                    "label-studio",
                    str(self.id),
                )
                fileio.makedirs(str(secret_folder))
                file_path = Path(
                    secret_folder, "google_application_credentials.json"
                )
                with open(file_path, "w") as f:
                    f.write(json.dumps(gcp_credentials))
                    file_path.chmod(0o600)

                params.google_application_credentials = str(file_path)

                return

            raise RuntimeError(
                "No credentials are configured for the active GCS artifact "
                "store. The Label Studio annotator needs explicit credentials "
                "to be configured for your artifact store to sync data "
                "artifacts."
            )

        elif artifact_store.flavor == "azure":
            from zenml.integrations.azure.artifact_stores import (
                AzureArtifactStore,
            )

            assert isinstance(artifact_store, AzureArtifactStore)

            params.storage_type = "azure"

            azure_credentials = artifact_store.get_credentials()

            if azure_credentials:
                # Convert the credentials into the format expected by Label
                # Studio
                if azure_credentials.connection_string is not None:
                    try:
                        # We need to extract the account name and key from the
                        # connection string
                        tokens = azure_credentials.connection_string.split(";")
                        token_dict = dict(
                            [token.split("=", maxsplit=1) for token in tokens]
                        )
                        params.azure_account_name = token_dict["AccountName"]
                        params.azure_account_key = token_dict["AccountKey"]
                    except (KeyError, ValueError) as e:
                        raise RuntimeError(
                            "The Azure connection string configured for the "
                            "artifact store expected format."
                        ) from e

                    return

                if (
                    azure_credentials.account_name is not None
                    and azure_credentials.account_key is not None
                ):
                    params.azure_account_name = azure_credentials.account_name
                    params.azure_account_key = azure_credentials.account_key

                    return

                raise RuntimeError(
                    "The Label Studio annotator could not use the "
                    "credentials currently configured in the active Azure "
                    "artifact store because it only supports Azure storage "
                    "account credentials. "
                    "Please use Azure storage account credentials for your "
                    "artifact store."
                )

            raise RuntimeError(
                "No credentials are configured for the active Azure artifact "
                "store. The Label Studio annotator needs explicit credentials "
                "to be configured for your artifact store to sync data "
                "artifacts."
            )

        elif artifact_store.flavor == "local":
            from zenml.artifact_stores.local_artifact_store import (
                LocalArtifactStore,
            )

            assert isinstance(artifact_store, LocalArtifactStore)

            params.storage_type = "local"
            if params.prefix is None:
                params.prefix = artifact_store.path
            elif not params.prefix.startswith(artifact_store.path.lstrip("/")):
                raise RuntimeError(
                    "The prefix for the local storage must be a subdirectory "
                    "of the local artifact store path."
                )
            return

        raise RuntimeError(
            f"The active artifact store type '{artifact_store.flavor}' is not "
            "supported by ZenML's Label Studio integration. "
            "Please use one of the supported artifact stores (S3, GCP, "
            "Azure or local)."
        )

    def connect_and_sync_external_storage(
        self,
        uri: str,
        params: LabelStudioDatasetSyncParameters,
        dataset: Project,
    ) -> Optional[Dict[str, Any]]:
        """Syncs the external storage for the given project.

        Args:
            uri: URI of the storage source.
            params: Parameters for the dataset.
            dataset: Label Studio dataset.

        Returns:
            A dictionary containing the sync result.

        Raises:
            ValueError: If the storage type is not supported.
        """
        # TODO: check if proposed storage source has differing / new data
        # if self._storage_source_already_exists(uri, config, dataset):
        #     return None

        storage_connection_args = {
            "prefix": params.prefix,
            "regex_filter": params.regex_filter,
            "use_blob_urls": params.use_blob_urls,
            "presign": params.presign,
            "presign_ttl": params.presign_ttl,
            "title": dataset.get_params()["title"],
            "description": params.description,
        }
        if params.storage_type == "azure":
            if not params.azure_account_name or not params.azure_account_key:
                logger.warning(
                    "Authentication credentials for Azure aren't fully "
                    "provided. Please update the storage synchronization "
                    "settings in the Label Studio web UI as per your needs."
                )
            storage = dataset.connect_azure_import_storage(
                container=uri,
                account_name=params.azure_account_name,
                account_key=params.azure_account_key,
                **storage_connection_args,
            )
        elif params.storage_type == "gcs":
            if not params.google_application_credentials:
                logger.warning(
                    "Authentication credentials for Google Cloud Storage "
                    "aren't fully provided. Please update the storage "
                    "synchronization settings in the Label Studio web UI as "
                    "per your needs."
                )
            storage = dataset.connect_google_import_storage(
                bucket=uri,
                google_application_credentials=params.google_application_credentials,
                **storage_connection_args,
            )
        elif params.storage_type == "s3":
            if (
                not params.aws_access_key_id
                or not params.aws_secret_access_key
            ):
                logger.warning(
                    "Authentication credentials for S3 aren't fully provided."
                    "Please update the storage synchronization settings in the "
                    "Label Studio web UI as per your needs."
                )

            # temporary fix using client method until LS supports
            # recursive_scan in their SDK
            # (https://github.com/heartexlabs/label-studio-sdk/pull/130)
            ls_client = self._get_client()
            payload = {
                "bucket": uri,
                "prefix": params.prefix,
                "regex_filter": params.regex_filter,
                "use_blob_urls": params.use_blob_urls,
                "aws_access_key_id": params.aws_access_key_id,
                "aws_secret_access_key": params.aws_secret_access_key,
                "aws_session_token": params.aws_session_token,
                "region_name": params.s3_region_name,
                "s3_endpoint": params.s3_endpoint,
                "presign": params.presign,
                "presign_ttl": params.presign_ttl,
                "title": dataset.get_params()["title"],
                "description": params.description,
                "project": dataset.id,
                "recursive_scan": True,
            }
            response = ls_client.make_request(
                "POST", "/api/storages/s3", json=payload
            )
            storage = response.json()

        elif params.storage_type == "local":
            if not params.prefix:
                raise ValueError(
                    "The 'prefix' parameter is required for local storage "
                    "synchronization."
                )

            # Drop arguments that are not used by the local storage
            storage_connection_args.pop("presign")
            storage_connection_args.pop("presign_ttl")
            storage_connection_args.pop("prefix")

            prefix = params.prefix
            if not prefix.startswith("/"):
                prefix = f"/{prefix}"
            root_path = Path(prefix).parent

            # Set the environment variables required by Label Studio
            # to allow local file serving (see https://labelstud.io/guide/storage.html#Prerequisites-2)
            os.environ["LABEL_STUDIO_LOCAL_FILES_SERVING_ENABLED"] = "true"
            os.environ["LABEL_STUDIO_LOCAL_FILES_DOCUMENT_ROOT"] = str(
                root_path
            )

            storage = dataset.connect_local_import_storage(
                local_store_path=prefix,
                **storage_connection_args,
            )

            del os.environ["LABEL_STUDIO_LOCAL_FILES_SERVING_ENABLED"]
            del os.environ["LABEL_STUDIO_LOCAL_FILES_DOCUMENT_ROOT"]
        else:
            raise ValueError(
                f"Invalid storage type. '{params.storage_type}' is not "
                "supported by ZenML's Label Studio integration. Please choose "
                "between 'azure', 'gcs', 'aws' or 'local'."
            )

        synced_storage = self._get_client().sync_storage(
            storage_id=storage["id"], storage_type=storage["type"]
        )
        return cast(Dict[str, Any], synced_storage)
config: LabelStudioAnnotatorConfig property readonly

Returns the LabelStudioAnnotatorConfig config.

Returns:

Type Description
LabelStudioAnnotatorConfig

The configuration.

add_dataset(self, **kwargs)

Registers a dataset for annotation.

Parameters:

Name Type Description Default
**kwargs Any

Additional keyword arguments to pass to the Label Studio client.

{}

Returns:

Type Description
Any

A Label Studio Project object.

Exceptions:

Type Description
ValueError

if 'dataset_name' and 'label_config' aren't provided.

Source code in zenml/integrations/label_studio/annotators/label_studio_annotator.py
def add_dataset(self, **kwargs: Any) -> Any:
    """Registers a dataset for annotation.

    Args:
        **kwargs: Additional keyword arguments to pass to the Label Studio client.

    Returns:
        A Label Studio Project object.

    Raises:
        ValueError: if 'dataset_name' and 'label_config' aren't provided.
    """
    dataset_name = kwargs.get("dataset_name")
    label_config = kwargs.get("label_config")
    if not dataset_name:
        raise ValueError("`dataset_name` keyword argument is required.")
    elif not label_config:
        raise ValueError("`label_config` keyword argument is required.")

    return self._get_client().start_project(
        title=dataset_name,
        label_config=label_config,
    )
connect_and_sync_external_storage(self, uri, params, dataset)

Syncs the external storage for the given project.

Parameters:

Name Type Description Default
uri str

URI of the storage source.

required
params LabelStudioDatasetSyncParameters

Parameters for the dataset.

required
dataset Project

Label Studio dataset.

required

Returns:

Type Description
Optional[Dict[str, Any]]

A dictionary containing the sync result.

Exceptions:

Type Description
ValueError

If the storage type is not supported.

Source code in zenml/integrations/label_studio/annotators/label_studio_annotator.py
def connect_and_sync_external_storage(
    self,
    uri: str,
    params: LabelStudioDatasetSyncParameters,
    dataset: Project,
) -> Optional[Dict[str, Any]]:
    """Syncs the external storage for the given project.

    Args:
        uri: URI of the storage source.
        params: Parameters for the dataset.
        dataset: Label Studio dataset.

    Returns:
        A dictionary containing the sync result.

    Raises:
        ValueError: If the storage type is not supported.
    """
    # TODO: check if proposed storage source has differing / new data
    # if self._storage_source_already_exists(uri, config, dataset):
    #     return None

    storage_connection_args = {
        "prefix": params.prefix,
        "regex_filter": params.regex_filter,
        "use_blob_urls": params.use_blob_urls,
        "presign": params.presign,
        "presign_ttl": params.presign_ttl,
        "title": dataset.get_params()["title"],
        "description": params.description,
    }
    if params.storage_type == "azure":
        if not params.azure_account_name or not params.azure_account_key:
            logger.warning(
                "Authentication credentials for Azure aren't fully "
                "provided. Please update the storage synchronization "
                "settings in the Label Studio web UI as per your needs."
            )
        storage = dataset.connect_azure_import_storage(
            container=uri,
            account_name=params.azure_account_name,
            account_key=params.azure_account_key,
            **storage_connection_args,
        )
    elif params.storage_type == "gcs":
        if not params.google_application_credentials:
            logger.warning(
                "Authentication credentials for Google Cloud Storage "
                "aren't fully provided. Please update the storage "
                "synchronization settings in the Label Studio web UI as "
                "per your needs."
            )
        storage = dataset.connect_google_import_storage(
            bucket=uri,
            google_application_credentials=params.google_application_credentials,
            **storage_connection_args,
        )
    elif params.storage_type == "s3":
        if (
            not params.aws_access_key_id
            or not params.aws_secret_access_key
        ):
            logger.warning(
                "Authentication credentials for S3 aren't fully provided."
                "Please update the storage synchronization settings in the "
                "Label Studio web UI as per your needs."
            )

        # temporary fix using client method until LS supports
        # recursive_scan in their SDK
        # (https://github.com/heartexlabs/label-studio-sdk/pull/130)
        ls_client = self._get_client()
        payload = {
            "bucket": uri,
            "prefix": params.prefix,
            "regex_filter": params.regex_filter,
            "use_blob_urls": params.use_blob_urls,
            "aws_access_key_id": params.aws_access_key_id,
            "aws_secret_access_key": params.aws_secret_access_key,
            "aws_session_token": params.aws_session_token,
            "region_name": params.s3_region_name,
            "s3_endpoint": params.s3_endpoint,
            "presign": params.presign,
            "presign_ttl": params.presign_ttl,
            "title": dataset.get_params()["title"],
            "description": params.description,
            "project": dataset.id,
            "recursive_scan": True,
        }
        response = ls_client.make_request(
            "POST", "/api/storages/s3", json=payload
        )
        storage = response.json()

    elif params.storage_type == "local":
        if not params.prefix:
            raise ValueError(
                "The 'prefix' parameter is required for local storage "
                "synchronization."
            )

        # Drop arguments that are not used by the local storage
        storage_connection_args.pop("presign")
        storage_connection_args.pop("presign_ttl")
        storage_connection_args.pop("prefix")

        prefix = params.prefix
        if not prefix.startswith("/"):
            prefix = f"/{prefix}"
        root_path = Path(prefix).parent

        # Set the environment variables required by Label Studio
        # to allow local file serving (see https://labelstud.io/guide/storage.html#Prerequisites-2)
        os.environ["LABEL_STUDIO_LOCAL_FILES_SERVING_ENABLED"] = "true"
        os.environ["LABEL_STUDIO_LOCAL_FILES_DOCUMENT_ROOT"] = str(
            root_path
        )

        storage = dataset.connect_local_import_storage(
            local_store_path=prefix,
            **storage_connection_args,
        )

        del os.environ["LABEL_STUDIO_LOCAL_FILES_SERVING_ENABLED"]
        del os.environ["LABEL_STUDIO_LOCAL_FILES_DOCUMENT_ROOT"]
    else:
        raise ValueError(
            f"Invalid storage type. '{params.storage_type}' is not "
            "supported by ZenML's Label Studio integration. Please choose "
            "between 'azure', 'gcs', 'aws' or 'local'."
        )

    synced_storage = self._get_client().sync_storage(
        storage_id=storage["id"], storage_type=storage["type"]
    )
    return cast(Dict[str, Any], synced_storage)
delete_dataset(self, **kwargs)

Deletes a dataset from the annotation interface.

Parameters:

Name Type Description Default
**kwargs Any

Additional keyword arguments to pass to the Label Studio client.

{}

Exceptions:

Type Description
ValueError

If the dataset name is not provided or if the dataset does not exist.

Source code in zenml/integrations/label_studio/annotators/label_studio_annotator.py
def delete_dataset(self, **kwargs: Any) -> None:
    """Deletes a dataset from the annotation interface.

    Args:
        **kwargs: Additional keyword arguments to pass to the Label Studio
            client.

    Raises:
        ValueError: If the dataset name is not provided or if the dataset
            does not exist.
    """
    ls = self._get_client()
    dataset_name = kwargs.get("dataset_name")
    if not dataset_name:
        raise ValueError("`dataset_name` keyword argument is required.")

    dataset_id = self.get_id_from_name(dataset_name)
    if not dataset_id:
        raise ValueError(
            f"Dataset name '{dataset_name}' has no corresponding `dataset_id` in Label Studio."
        )
    ls.delete_project(dataset_id)
get_converted_dataset(self, dataset_name, output_format)

Extract annotated tasks in a specific converted format.

Parameters:

Name Type Description Default
dataset_name str

Id of the dataset.

required
output_format str

Output format.

required

Returns:

Type Description
Dict[Any, Any]

A dictionary containing the converted dataset.

Source code in zenml/integrations/label_studio/annotators/label_studio_annotator.py
def get_converted_dataset(
    self, dataset_name: str, output_format: str
) -> Dict[Any, Any]:
    """Extract annotated tasks in a specific converted format.

    Args:
        dataset_name: Id of the dataset.
        output_format: Output format.

    Returns:
        A dictionary containing the converted dataset.
    """
    project = self.get_dataset(dataset_name=dataset_name)
    return project.export_tasks(export_type=output_format)  # type: ignore[no-any-return]
get_dataset(self, **kwargs)

Gets the dataset with the given name.

Parameters:

Name Type Description Default
**kwargs Any

Additional keyword arguments to pass to the Label Studio client.

{}

Returns:

Type Description
Any

The LabelStudio Dataset object (a 'Project') for the given name.

Exceptions:

Type Description
ValueError

If the dataset name is not provided or if the dataset does not exist.

Source code in zenml/integrations/label_studio/annotators/label_studio_annotator.py
def get_dataset(self, **kwargs: Any) -> Any:
    """Gets the dataset with the given name.

    Args:
        **kwargs: Additional keyword arguments to pass to the Label Studio client.

    Returns:
        The LabelStudio Dataset object (a 'Project') for the given name.

    Raises:
        ValueError: If the dataset name is not provided or if the dataset
            does not exist.
    """
    # TODO: check for and raise error if client unavailable
    dataset_name = kwargs.get("dataset_name")
    if not dataset_name:
        raise ValueError("`dataset_name` keyword argument is required.")

    dataset_id = self.get_id_from_name(dataset_name)
    if not dataset_id:
        raise ValueError(
            f"Dataset name '{dataset_name}' has no corresponding `dataset_id` in Label Studio."
        )
    return self._get_client().get_project(dataset_id)
get_dataset_names(self)

Gets the names of the datasets.

Returns:

Type Description
List[str]

A list of dataset names.

Source code in zenml/integrations/label_studio/annotators/label_studio_annotator.py
def get_dataset_names(self) -> List[str]:
    """Gets the names of the datasets.

    Returns:
        A list of dataset names.
    """
    return [
        dataset.get_params()["title"] for dataset in self.get_datasets()
    ]
get_dataset_stats(self, dataset_name)

Gets the statistics of the given dataset.

Parameters:

Name Type Description Default
dataset_name str

The name of the dataset.

required

Returns:

Type Description
Tuple[int, int]

A tuple containing (labeled_task_count, unlabeled_task_count) for the dataset.

Exceptions:

Type Description
IndexError

If the dataset does not exist.

Source code in zenml/integrations/label_studio/annotators/label_studio_annotator.py
def get_dataset_stats(self, dataset_name: str) -> Tuple[int, int]:
    """Gets the statistics of the given dataset.

    Args:
        dataset_name: The name of the dataset.

    Returns:
        A tuple containing (labeled_task_count, unlabeled_task_count) for
            the dataset.

    Raises:
        IndexError: If the dataset does not exist.
    """
    for project in self.get_datasets():
        if dataset_name in project.get_params()["title"]:
            labeled_task_count = len(project.get_labeled_tasks())
            unlabeled_task_count = len(project.get_unlabeled_tasks())
            return (labeled_task_count, unlabeled_task_count)
    raise IndexError(
        f"Dataset {dataset_name} not found. Please use "
        f"`zenml annotator dataset list` to list all available datasets."
    )
get_datasets(self)

Gets the datasets currently available for annotation.

Returns:

Type Description
List[Any]

A list of datasets.

Source code in zenml/integrations/label_studio/annotators/label_studio_annotator.py
def get_datasets(self) -> List[Any]:
    """Gets the datasets currently available for annotation.

    Returns:
        A list of datasets.
    """
    datasets = self._get_client().get_projects()
    return cast(List[Any], datasets)
get_id_from_name(self, dataset_name)

Gets the ID of the given dataset.

Parameters:

Name Type Description Default
dataset_name str

The name of the dataset.

required

Returns:

Type Description
Optional[int]

The ID of the dataset.

Source code in zenml/integrations/label_studio/annotators/label_studio_annotator.py
def get_id_from_name(self, dataset_name: str) -> Optional[int]:
    """Gets the ID of the given dataset.

    Args:
        dataset_name: The name of the dataset.

    Returns:
        The ID of the dataset.
    """
    projects = self.get_datasets()
    for project in projects:
        if project.get_params()["title"] == dataset_name:
            return cast(int, project.get_params()["id"])
    return None
get_labeled_data(self, **kwargs)

Gets the labeled data for the given dataset.

Parameters:

Name Type Description Default
**kwargs Any

Additional keyword arguments to pass to the Label Studio client.

{}

Returns:

Type Description
Any

The labeled data.

Exceptions:

Type Description
ValueError

If the dataset name is not provided or if the dataset does not exist.

Source code in zenml/integrations/label_studio/annotators/label_studio_annotator.py
def get_labeled_data(self, **kwargs: Any) -> Any:
    """Gets the labeled data for the given dataset.

    Args:
        **kwargs: Additional keyword arguments to pass to the Label Studio client.

    Returns:
        The labeled data.

    Raises:
        ValueError: If the dataset name is not provided or if the dataset
            does not exist.
    """
    dataset_name = kwargs.get("dataset_name")
    if not dataset_name:
        raise ValueError("`dataset_name` keyword argument is required.")

    dataset_id = self.get_id_from_name(dataset_name)
    if not dataset_id:
        raise ValueError(
            f"Dataset name '{dataset_name}' has no corresponding `dataset_id` in Label Studio."
        )
    return self._get_client().get_project(dataset_id).get_labeled_tasks()
get_parsed_label_config(self, dataset_id)

Returns the parsed Label Studio label config for a dataset.

Parameters:

Name Type Description Default
dataset_id int

Id of the dataset.

required

Returns:

Type Description
Dict[str, Any]

A dictionary containing the parsed label config.

Exceptions:

Type Description
ValueError

If no dataset is found for the given id.

Source code in zenml/integrations/label_studio/annotators/label_studio_annotator.py
def get_parsed_label_config(self, dataset_id: int) -> Dict[str, Any]:
    """Returns the parsed Label Studio label config for a dataset.

    Args:
        dataset_id: Id of the dataset.

    Returns:
        A dictionary containing the parsed label config.

    Raises:
        ValueError: If no dataset is found for the given id.
    """
    # TODO: check if client actually is connected etc
    dataset = self._get_client().get_project(dataset_id)
    if dataset:
        return cast(Dict[str, Any], dataset.parsed_label_config)
    raise ValueError("No dataset found for the given id.")
get_unlabeled_data(self, **kwargs)

Gets the unlabeled data for the given dataset.

Parameters:

Name Type Description Default
**kwargs str

Additional keyword arguments to pass to the Label Studio client.

{}

Returns:

Type Description
Any

The unlabeled data.

Exceptions:

Type Description
ValueError

If the dataset name is not provided.

Source code in zenml/integrations/label_studio/annotators/label_studio_annotator.py
def get_unlabeled_data(self, **kwargs: str) -> Any:
    """Gets the unlabeled data for the given dataset.

    Args:
        **kwargs: Additional keyword arguments to pass to the Label Studio client.

    Returns:
        The unlabeled data.

    Raises:
        ValueError: If the dataset name is not provided.
    """
    dataset_name = kwargs.get("dataset_name")
    if not dataset_name:
        raise ValueError("`dataset_name` keyword argument is required.")

    dataset_id = self.get_id_from_name(dataset_name)
    if not dataset_id:
        raise ValueError(
            f"Dataset name '{dataset_name}' has no corresponding `dataset_id` in Label Studio."
        )
    return self._get_client().get_project(dataset_id).get_unlabeled_tasks()
get_url(self)

Gets the top-level URL of the annotation interface.

Returns:

Type Description
str

The URL of the annotation interface.

Source code in zenml/integrations/label_studio/annotators/label_studio_annotator.py
def get_url(self) -> str:
    """Gets the top-level URL of the annotation interface.

    Returns:
        The URL of the annotation interface.
    """
    return f"{self.config.instance_url}:{self.config.port}"
get_url_for_dataset(self, dataset_name)

Gets the URL of the annotation interface for the given dataset.

Parameters:

Name Type Description Default
dataset_name str

The name of the dataset.

required

Returns:

Type Description
str

The URL of the annotation interface.

Source code in zenml/integrations/label_studio/annotators/label_studio_annotator.py
def get_url_for_dataset(self, dataset_name: str) -> str:
    """Gets the URL of the annotation interface for the given dataset.

    Args:
        dataset_name: The name of the dataset.

    Returns:
        The URL of the annotation interface.
    """
    project_id = self.get_id_from_name(dataset_name)
    return f"{self.get_url()}/projects/{project_id}/"
launch(self, url)

Launches the annotation interface.

Parameters:

Name Type Description Default
url Optional[str]

The URL of the annotation interface.

required
Source code in zenml/integrations/label_studio/annotators/label_studio_annotator.py
def launch(self, url: Optional[str]) -> None:
    """Launches the annotation interface.

    Args:
        url: The URL of the annotation interface.
    """
    if not url:
        url = self.get_url()
    if self._connection_available():
        webbrowser.open(url, new=1, autoraise=True)
    else:
        logger.warning(
            "Could not launch annotation interface"
            "because the connection could not be established."
        )
populate_artifact_store_parameters(self, params, artifact_store)

Populate the dataset sync parameters with the artifact store credentials.

Parameters:

Name Type Description Default
params LabelStudioDatasetSyncParameters

The dataset sync parameters.

required
artifact_store BaseArtifactStore

The active artifact store.

required

Exceptions:

Type Description
RuntimeError

if the artifact store credentials cannot be fetched.

Source code in zenml/integrations/label_studio/annotators/label_studio_annotator.py
def populate_artifact_store_parameters(
    self,
    params: LabelStudioDatasetSyncParameters,
    artifact_store: BaseArtifactStore,
) -> None:
    """Populate the dataset sync parameters with the artifact store credentials.

    Args:
        params: The dataset sync parameters.
        artifact_store: The active artifact store.

    Raises:
        RuntimeError: if the artifact store credentials cannot be fetched.
    """
    if artifact_store.flavor == "s3":
        from zenml.integrations.s3.artifact_stores import S3ArtifactStore

        assert isinstance(artifact_store, S3ArtifactStore)

        params.storage_type = "s3"

        (
            aws_access_key_id,
            aws_secret_access_key,
            aws_session_token,
        ) = artifact_store.get_credentials()

        if aws_access_key_id and aws_secret_access_key:
            # Convert the credentials into the format expected by Label
            # Studio
            params.aws_access_key_id = aws_access_key_id
            params.aws_secret_access_key = aws_secret_access_key
            params.aws_session_token = aws_session_token

            if artifact_store.config.client_kwargs:
                if "endpoint_url" in artifact_store.config.client_kwargs:
                    params.s3_endpoint = (
                        artifact_store.config.client_kwargs["endpoint_url"]
                    )
                if "region_name" in artifact_store.config.client_kwargs:
                    params.s3_region_name = str(
                        artifact_store.config.client_kwargs["region_name"]
                    )

            return

        raise RuntimeError(
            "No credentials are configured for the active S3 artifact "
            "store. The Label Studio annotator needs explicit credentials "
            "to be configured for your artifact store to sync data "
            "artifacts."
        )

    elif artifact_store.flavor == "gcp":
        from zenml.integrations.gcp.artifact_stores import GCPArtifactStore

        assert isinstance(artifact_store, GCPArtifactStore)

        params.storage_type = "gcs"

        gcp_credentials = artifact_store.get_credentials()

        if gcp_credentials:
            # Save the credentials to a file in secure location, because
            # Label Studio will need to read it from a file
            secret_folder = Path(
                GlobalConfiguration().config_directory,
                "label-studio",
                str(self.id),
            )
            fileio.makedirs(str(secret_folder))
            file_path = Path(
                secret_folder, "google_application_credentials.json"
            )
            with open(file_path, "w") as f:
                f.write(json.dumps(gcp_credentials))
                file_path.chmod(0o600)

            params.google_application_credentials = str(file_path)

            return

        raise RuntimeError(
            "No credentials are configured for the active GCS artifact "
            "store. The Label Studio annotator needs explicit credentials "
            "to be configured for your artifact store to sync data "
            "artifacts."
        )

    elif artifact_store.flavor == "azure":
        from zenml.integrations.azure.artifact_stores import (
            AzureArtifactStore,
        )

        assert isinstance(artifact_store, AzureArtifactStore)

        params.storage_type = "azure"

        azure_credentials = artifact_store.get_credentials()

        if azure_credentials:
            # Convert the credentials into the format expected by Label
            # Studio
            if azure_credentials.connection_string is not None:
                try:
                    # We need to extract the account name and key from the
                    # connection string
                    tokens = azure_credentials.connection_string.split(";")
                    token_dict = dict(
                        [token.split("=", maxsplit=1) for token in tokens]
                    )
                    params.azure_account_name = token_dict["AccountName"]
                    params.azure_account_key = token_dict["AccountKey"]
                except (KeyError, ValueError) as e:
                    raise RuntimeError(
                        "The Azure connection string configured for the "
                        "artifact store expected format."
                    ) from e

                return

            if (
                azure_credentials.account_name is not None
                and azure_credentials.account_key is not None
            ):
                params.azure_account_name = azure_credentials.account_name
                params.azure_account_key = azure_credentials.account_key

                return

            raise RuntimeError(
                "The Label Studio annotator could not use the "
                "credentials currently configured in the active Azure "
                "artifact store because it only supports Azure storage "
                "account credentials. "
                "Please use Azure storage account credentials for your "
                "artifact store."
            )

        raise RuntimeError(
            "No credentials are configured for the active Azure artifact "
            "store. The Label Studio annotator needs explicit credentials "
            "to be configured for your artifact store to sync data "
            "artifacts."
        )

    elif artifact_store.flavor == "local":
        from zenml.artifact_stores.local_artifact_store import (
            LocalArtifactStore,
        )

        assert isinstance(artifact_store, LocalArtifactStore)

        params.storage_type = "local"
        if params.prefix is None:
            params.prefix = artifact_store.path
        elif not params.prefix.startswith(artifact_store.path.lstrip("/")):
            raise RuntimeError(
                "The prefix for the local storage must be a subdirectory "
                "of the local artifact store path."
            )
        return

    raise RuntimeError(
        f"The active artifact store type '{artifact_store.flavor}' is not "
        "supported by ZenML's Label Studio integration. "
        "Please use one of the supported artifact stores (S3, GCP, "
        "Azure or local)."
    )
register_dataset_for_annotation(self, label_config, dataset_name)

Registers a dataset for annotation.

Parameters:

Name Type Description Default
label_config str

The label config to use for the annotation interface.

required
dataset_name str

Name of the dataset to register.

required

Returns:

Type Description
Any

A Label Studio Project object.

Source code in zenml/integrations/label_studio/annotators/label_studio_annotator.py
def register_dataset_for_annotation(
    self,
    label_config: str,
    dataset_name: str,
) -> Any:
    """Registers a dataset for annotation.

    Args:
        label_config: The label config to use for the annotation interface.
        dataset_name: Name of the dataset to register.

    Returns:
        A Label Studio Project object.
    """
    project_id = self.get_id_from_name(dataset_name)
    if project_id:
        dataset = self._get_client().get_project(project_id)
    else:
        dataset = self.add_dataset(
            dataset_name=dataset_name,
            label_config=label_config,
        )

    return dataset

flavors special

Label Studio integration flavors.

label_studio_annotator_flavor

Label Studio annotator flavor.

LabelStudioAnnotatorConfig (BaseAnnotatorConfig, AuthenticationConfigMixin) pydantic-model

Config for the Label Studio annotator.

Attributes:

Name Type Description
instance_url str

URL of the Label Studio instance.

port int

The port to use for the annotation interface.

Source code in zenml/integrations/label_studio/flavors/label_studio_annotator_flavor.py
class LabelStudioAnnotatorConfig(
    BaseAnnotatorConfig, AuthenticationConfigMixin
):
    """Config for the Label Studio annotator.

    Attributes:
        instance_url: URL of the Label Studio instance.
        port: The port to use for the annotation interface.
    """

    instance_url: str = DEFAULT_LOCAL_INSTANCE_URL
    port: int = DEFAULT_LOCAL_LABEL_STUDIO_PORT
LabelStudioAnnotatorFlavor (BaseAnnotatorFlavor)

Label Studio annotator flavor.

Source code in zenml/integrations/label_studio/flavors/label_studio_annotator_flavor.py
class LabelStudioAnnotatorFlavor(BaseAnnotatorFlavor):
    """Label Studio annotator flavor."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return LABEL_STUDIO_ANNOTATOR_FLAVOR

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/annotator/label_studio.png"

    @property
    def config_class(self) -> Type[LabelStudioAnnotatorConfig]:
        """Returns `LabelStudioAnnotatorConfig` config class.

        Returns:
                The config class.
        """
        return LabelStudioAnnotatorConfig

    @property
    def implementation_class(self) -> Type["LabelStudioAnnotator"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.label_studio.annotators import (
            LabelStudioAnnotator,
        )

        return LabelStudioAnnotator
config_class: Type[zenml.integrations.label_studio.flavors.label_studio_annotator_flavor.LabelStudioAnnotatorConfig] property readonly

Returns LabelStudioAnnotatorConfig config class.

Returns:

Type Description
Type[zenml.integrations.label_studio.flavors.label_studio_annotator_flavor.LabelStudioAnnotatorConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[LabelStudioAnnotator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[LabelStudioAnnotator]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

label_config_generators special

Initialization of the Label Studio config generators submodule.

label_config_generators

Implementation of label config generators for Label Studio.

generate_basic_object_detection_bounding_boxes_label_config(labels)

Generates a Label Studio config for object detection with bounding boxes.

This is based on the basic config example shown at https://labelstud.io/templates/image_bbox.html.

Parameters:

Name Type Description Default
labels List[str]

A list of labels to be used in the label config.

required

Returns:

Type Description
Tuple[str, str]

A tuple of the generated label config and the label config type.

Exceptions:

Type Description
ValueError

If no labels are provided.

Source code in zenml/integrations/label_studio/label_config_generators/label_config_generators.py
def generate_basic_object_detection_bounding_boxes_label_config(
    labels: List[str],
) -> Tuple[str, str]:
    """Generates a Label Studio config for object detection with bounding boxes.

    This is based on the basic config example shown at
    https://labelstud.io/templates/image_bbox.html.

    Args:
        labels: A list of labels to be used in the label config.

    Returns:
        A tuple of the generated label config and the label config type.

    Raises:
        ValueError: If no labels are provided.
    """
    if not labels:
        raise ValueError("No labels provided")

    label_config_type = AnnotationTasks.OBJECT_DETECTION_BOUNDING_BOXES

    label_config_start = """<View>
    <Image name="image" value="$image"/>
    <RectangleLabels name="label" toName="image">
    """
    label_config_choices = "".join(
        f"<Label value='{label}' />\n" for label in labels
    )
    label_config_end = "</RectangleLabels>\n</View>"
    label_config = label_config_start + label_config_choices + label_config_end

    return (
        label_config,
        label_config_type,
    )
generate_basic_ocr_label_config(labels)

Generates a Label Studio config for optical character recognition (OCR) labeling task.

This is based on the basic config example shown at https://labelstud.io/templates/optical_character_recognition.html

Parameters:

Name Type Description Default
labels List[str]

A list of labels to be used in the label config.

required

Returns:

Type Description
Tuple[str, str]

A tuple of the generated label config and the label config type.

Exceptions:

Type Description
ValueError

If no labels are provided.

Source code in zenml/integrations/label_studio/label_config_generators/label_config_generators.py
def generate_basic_ocr_label_config(
    labels: List[str],
) -> Tuple[str, str]:
    """Generates a Label Studio config for optical character recognition (OCR) labeling task.

    This is based on the basic config example shown at
    https://labelstud.io/templates/optical_character_recognition.html

    Args:
        labels: A list of labels to be used in the label config.

    Returns:
        A tuple of the generated label config and the label config type.

    Raises:
        ValueError: If no labels are provided.
    """
    if not labels:
        raise ValueError("No labels provided")

    label_config_type = AnnotationTasks.OCR

    label_config_start = """
    <View>
    <Image name="image" value="$ocr" zoom="true" zoomControl="true" rotateControl="true"/>
    <View>
    <Filter toName="label" minlength="0" name="filter"/>
    <Labels name="label" toName="image">
    """
    label_config_choices = "".join(
        f"<Label value='{label}' />\n" for label in labels
    )

    label_config_end = """
    </Labels>
    </View>
    <Rectangle name="bbox" toName="image" strokeWidth="3"/>
    <Polygon name="poly" toName="image" strokeWidth="3"/>
    <TextArea name="transcription" toName="image" editable="true" perRegion="true" required="true" maxSubmissions="1" rows="5" placeholder="Recognized Text" displayMode="region-list"/>
    </View>
    """
    label_config = label_config_start + label_config_choices + label_config_end

    return (
        label_config,
        label_config_type,
    )
generate_image_classification_label_config(labels)

Generates a Label Studio label config for image classification.

This is based on the basic config example shown at https://labelstud.io/templates/image_classification.html.

Parameters:

Name Type Description Default
labels List[str]

A list of labels to be used in the label config.

required

Returns:

Type Description
Tuple[str, str]

A tuple of the generated label config and the label config type.

Exceptions:

Type Description
ValueError

If no labels are provided.

Source code in zenml/integrations/label_studio/label_config_generators/label_config_generators.py
def generate_image_classification_label_config(
    labels: List[str],
) -> Tuple[str, str]:
    """Generates a Label Studio label config for image classification.

    This is based on the basic config example shown at
    https://labelstud.io/templates/image_classification.html.

    Args:
        labels: A list of labels to be used in the label config.

    Returns:
        A tuple of the generated label config and the label config type.

    Raises:
        ValueError: If no labels are provided.
    """
    if not labels:
        raise ValueError("No labels provided")

    label_config_type = AnnotationTasks.IMAGE_CLASSIFICATION

    label_config_start = """<View>
    <Image name="image" value="$image"/>
    <Choices name="choice" toName="image">
    """
    label_config_choices = "".join(
        f"<Choice value='{label}' />\n" for label in labels
    )
    label_config_end = "</Choices>\n</View>"

    label_config = label_config_start + label_config_choices + label_config_end
    return (
        label_config,
        label_config_type,
    )
generate_text_classification_label_config(labels)

Generates a Label Studio label config for text classification.

This is based on the basic config example shown at https://labelstud.io/templates/sentiment_analysis.html.

Parameters:

Name Type Description Default
labels List[str]

A list of labels to be used in the label config.

required

Returns:

Type Description
Tuple[str, str]

A tuple of the generated label config and the label config type.

Exceptions:

Type Description
ValueError

If no labels are provided.

Source code in zenml/integrations/label_studio/label_config_generators/label_config_generators.py
def generate_text_classification_label_config(
    labels: List[str],
) -> Tuple[str, str]:
    """Generates a Label Studio label config for text classification.

    This is based on the basic config example shown at
    https://labelstud.io/templates/sentiment_analysis.html.

    Args:
        labels: A list of labels to be used in the label config.

    Returns:
        A tuple of the generated label config and the label config type.

    Raises:
        ValueError: If no labels are provided.
    """
    if not labels:
        raise ValueError("No labels provided")

    label_config_type = AnnotationTasks.TEXT_CLASSIFICATION

    label_config_start = """<View>
    <Header value="Choose text class:"/>
    <Text name="text" value="$text"/>
    <Choices name="class" toName="text" choice="single" showInline="true">
    """
    label_config_choices = "".join(
        f"<Choice value='{label}' />\n" for label in labels
    )
    label_config_end = "</Choices>\n</View>"

    label_config = label_config_start + label_config_choices + label_config_end
    return (
        label_config,
        label_config_type,
    )

label_studio_utils

Utility functions for the Label Studio annotator integration.

clean_url(url)

Remove extraneous parts of the URL prior to mapping.

Removes the query and netloc parts of the URL, and strips the leading slash from the path. For example, a string like 'gs%3A//label-studio/load_image_data/images/fdbcd451-0c80-495c-a9c5-6b51776f5019/1/0/image_file.JPEG' would become label-studio/load_image_data/images/fdbcd451-0c80-495c-a9c5-6b51776f5019/1/0/image_file.JPEG.

Parameters:

Name Type Description Default
url str

A URL string.

required

Returns:

Type Description
str

A cleaned URL string.

Source code in zenml/integrations/label_studio/label_studio_utils.py
def clean_url(url: str) -> str:
    """Remove extraneous parts of the URL prior to mapping.

    Removes the query and netloc parts of the URL, and strips the leading slash
    from the path. For example, a string like
    `'gs%3A//label-studio/load_image_data/images/fdbcd451-0c80-495c-a9c5-6b51776f5019/1/0/image_file.JPEG'`
    would become
    `label-studio/load_image_data/images/fdbcd451-0c80-495c-a9c5-6b51776f5019/1/0/image_file.JPEG`.

    Args:
        url: A URL string.

    Returns:
        A cleaned URL string.
    """
    parsed = urlparse(url)
    parsed = parsed._replace(netloc="", query="")
    return parsed.path.lstrip("/")

convert_pred_filenames_to_task_ids(preds, tasks)

Converts a list of predictions from local file references to task id.

Parameters:

Name Type Description Default
preds List[Dict[str, Any]]

List of predictions.

required
tasks List[Dict[str, Any]]

List of tasks.

required

Returns:

Type Description
List[Dict[str, Any]]

List of predictions using task ids as reference.

Source code in zenml/integrations/label_studio/label_studio_utils.py
def convert_pred_filenames_to_task_ids(
    preds: List[Dict[str, Any]],
    tasks: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
    """Converts a list of predictions from local file references to task id.

    Args:
        preds: List of predictions.
        tasks: List of tasks.

    Returns:
        List of predictions using task ids as reference.
    """
    preds = [
        {
            "filename": quote(pred["filename"]).split("//")[1],
            "result": pred["result"],
        }
        for pred in preds
    ]
    filename_id_mapping = {
        clean_url(task["storage_filename"]): task["id"] for task in tasks
    }
    return [
        {
            "task": int(
                filename_id_mapping["/".join(pred["filename"].split("/")[1:])]
            ),
            "result": pred["result"],
        }
        for pred in preds
    ]

get_file_extension(path_str)

Return the file extension of the given filename.

Parameters:

Name Type Description Default
path_str str

Path to the file.

required

Returns:

Type Description
str

File extension.

Source code in zenml/integrations/label_studio/label_studio_utils.py
def get_file_extension(path_str: str) -> str:
    """Return the file extension of the given filename.

    Args:
        path_str: Path to the file.

    Returns:
        File extension.
    """
    return os.path.splitext(urlparse(path_str).path)[1]

is_azure_url(url)

Return whether the given URL is an Azure URL.

Parameters:

Name Type Description Default
url str

URL to check.

required

Returns:

Type Description
bool

True if the URL is an Azure URL, False otherwise.

Source code in zenml/integrations/label_studio/label_studio_utils.py
def is_azure_url(url: str) -> bool:
    """Return whether the given URL is an Azure URL.

    Args:
        url: URL to check.

    Returns:
        True if the URL is an Azure URL, False otherwise.
    """
    return "blob.core.windows.net" in urlparse(url).netloc

is_gcs_url(url)

Return whether the given URL is an GCS URL.

Parameters:

Name Type Description Default
url str

URL to check.

required

Returns:

Type Description
bool

True if the URL is an GCS URL, False otherwise.

Source code in zenml/integrations/label_studio/label_studio_utils.py
def is_gcs_url(url: str) -> bool:
    """Return whether the given URL is an GCS URL.

    Args:
        url: URL to check.

    Returns:
        True if the URL is an GCS URL, False otherwise.
    """
    return "storage.googleapis.com" in urlparse(url).netloc

is_s3_url(url)

Return whether the given URL is an S3 URL.

Parameters:

Name Type Description Default
url str

URL to check.

required

Returns:

Type Description
bool

True if the URL is an S3 URL, False otherwise.

Source code in zenml/integrations/label_studio/label_studio_utils.py
def is_s3_url(url: str) -> bool:
    """Return whether the given URL is an S3 URL.

    Args:
        url: URL to check.

    Returns:
        True if the URL is an S3 URL, False otherwise.
    """
    return "s3.amazonaws" in urlparse(url).netloc

steps special

Standard steps to be used with the Label Studio annotator integration.

label_studio_standard_steps

Implementation of standard steps for the Label Studio annotator integration.

LabelStudioDatasetSyncParameters (BaseModel) pydantic-model

Step parameters when syncing data to Label Studio.

Attributes:

Name Type Description
storage_type str

The type of storage to sync to. Can be one of ["gcs", "s3", "azure", "local"]. Defaults to "local".

label_config_type str

The type of label config to use.

prefix Optional[str]

Specify the prefix within the cloud store to import your data from. For local storage, this is the full absolute path to the directory containing your data.

regex_filter Optional[str]

Specify a regex filter to filter the files to import.

use_blob_urls Optional[bool]

Specify whether your data is raw image or video data, or JSON tasks.

presign Optional[bool]

Specify whether or not to create presigned URLs.

presign_ttl Optional[int]

Specify how long to keep presigned URLs active.

description Optional[str]

Specify a description for the dataset.

azure_account_name Optional[str]

Specify the Azure account name to use for the storage.

azure_account_key Optional[str]

Specify the Azure account key to use for the storage.

google_application_credentials Optional[str]

Specify the file with Google application credentials to use for the storage.

aws_access_key_id Optional[str]

Specify the AWS access key ID to use for the storage.

aws_secret_access_key Optional[str]

Specify the AWS secret access key to use for the storage.

aws_session_token Optional[str]

Specify the AWS session token to use for the storage.

s3_region_name Optional[str]

Specify the S3 region name to use for the storage.

s3_endpoint Optional[str]

Specify the S3 endpoint to use for the storage.

Source code in zenml/integrations/label_studio/steps/label_studio_standard_steps.py
class LabelStudioDatasetSyncParameters(BaseModel):
    """Step parameters when syncing data to Label Studio.

    Attributes:
        storage_type: The type of storage to sync to. Can be one of
            ["gcs", "s3", "azure", "local"]. Defaults to "local".
        label_config_type: The type of label config to use.
        prefix: Specify the prefix within the cloud store to import your data
            from. For local storage, this is the full absolute path to the
            directory containing your data.
        regex_filter: Specify a regex filter to filter the files to import.
        use_blob_urls: Specify whether your data is raw image or video data, or
            JSON tasks.
        presign: Specify whether or not to create presigned URLs.
        presign_ttl: Specify how long to keep presigned URLs active.
        description: Specify a description for the dataset.

        azure_account_name: Specify the Azure account name to use for the
            storage.
        azure_account_key: Specify the Azure account key to use for the
            storage.
        google_application_credentials: Specify the file with Google application
            credentials to use for the storage.
        aws_access_key_id: Specify the AWS access key ID to use for the
            storage.
        aws_secret_access_key: Specify the AWS secret access key to use for the
            storage.
        aws_session_token: Specify the AWS session token to use for the
            storage.
        s3_region_name: Specify the S3 region name to use for the storage.
        s3_endpoint: Specify the S3 endpoint to use for the storage.
    """

    storage_type: str = "local"
    label_config_type: str

    prefix: Optional[str] = None
    regex_filter: Optional[str] = ".*"
    use_blob_urls: Optional[bool] = True
    presign: Optional[bool] = True
    presign_ttl: Optional[int] = 1
    description: Optional[str] = ""

    # credentials specific to the main cloud providers
    azure_account_name: Optional[str] = None
    azure_account_key: Optional[str] = None
    google_application_credentials: Optional[str] = None
    aws_access_key_id: Optional[str] = None
    aws_secret_access_key: Optional[str] = None
    aws_session_token: Optional[str] = None
    s3_region_name: Optional[str] = None
    s3_endpoint: Optional[str] = None