Skip to content

Service Connectors

zenml.service_connectors

ZenML Service Connectors.

Modules

docker_service_connector

Docker Service Connector.

The Docker Service Connector is responsible for authenticating with a Docker (or compatible) registry.

Classes
DockerAuthenticationMethods

Bases: StrEnum

Docker Authentication methods.

DockerConfiguration

Bases: DockerCredentials

Docker client configuration.

DockerCredentials

Bases: AuthenticationConfig

Docker client authentication credentials.

DockerServiceConnector(**kwargs: Any)

Bases: ServiceConnector

Docker service connector.

Source code in src/zenml/service_connectors/service_connector.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def __init__(self, **kwargs: Any) -> None:
    """Initialize a new service connector instance.

    Args:
        kwargs: Additional keyword arguments to pass to the base class
            constructor.
    """
    super().__init__(**kwargs)

    # Convert the resource ID to its canonical form. For resource types
    # that don't support multiple instances:
    # - if a resource ID is not provided, we use the default resource ID for
    # the resource type
    # - if a resource ID is provided, we verify that it matches the default
    # resource ID for the resource type
    if self.resource_type:
        try:
            self.resource_id = self._validate_resource_id(
                self.resource_type, self.resource_id
            )
        except AuthorizationException as e:
            error = (
                f"Authorization error validating resource ID "
                f"{self.resource_id} for resource type "
                f"{self.resource_type}: {e}"
            )
            # Log an exception if debug logging is enabled
            if logger.isEnabledFor(logging.DEBUG):
                logger.exception(error)
            else:
                logger.warning(error)

            self.resource_id = None
Functions
Modules

service_connector

Base ZenML Service Connector class.

Classes
AuthenticationConfig

Bases: BaseModel

Base authentication configuration.

Attributes
all_values: Dict[str, Any] property

Get all values as a dictionary.

Returns:

Type Description
Dict[str, Any]

A dictionary of all values in the configuration.

non_secret_values: Dict[str, str] property

Get the non-secret values as a dictionary.

Returns:

Type Description
Dict[str, str]

A dictionary of all non-secret values in the configuration.

secret_values: Dict[str, SecretStr] property

Get the secret values as a dictionary.

Returns:

Type Description
Dict[str, SecretStr]

A dictionary of all secret values in the configuration.

ServiceConnector(**kwargs: Any)

Bases: BaseModel

Base service connector class.

Service connectors are standalone components that can be used to link ZenML to external resources. They are responsible for validating and storing authentication configuration and sensitive credentials and for providing authentication services to other ZenML components. Service connectors are built on top of the (otherwise opaque) ZenML secrets and secrets store mechanisms and add secret auto-configuration, secret discovery and secret schema validation capabilities.

The implementation independent service connector abstraction is made possible through the use of generic "resource types" and "resource IDs". These constitute the "contract" between connectors and the consumers of the authentication services that they provide. In a nutshell, a connector instance advertises what resource(s) it can be used to gain access to, whereas a consumer may run a query to search for compatible connectors by specifying the resource(s) that they need to access and then use a matching connector instance to connect to said resource(s).

The resource types and authentication methods supported by a connector are declared in the connector type specification. The role of this specification is two-fold:

  • it declares a schema for the configuration that needs to be provided to configure the connector. This can be used to validate the configuration without having to instantiate the connector itself (e.g. in the CLI and dashboard), which also makes it possible to configure connectors and store their configuration without having to instantiate them.
  • it provides a way for ZenML to keep a registry of available connector implementations and configured connector instances. Users who want to connect ZenML to external resources via connectors can use this registry to discover what types of connectors are available and what types of resources they can be configured to access. Consumers can also use the registry to find connector instances that are compatible with the types of resources that they need to access.

Initialize a new service connector instance.

Parameters:

Name Type Description Default
kwargs Any

Additional keyword arguments to pass to the base class constructor.

{}
Source code in src/zenml/service_connectors/service_connector.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def __init__(self, **kwargs: Any) -> None:
    """Initialize a new service connector instance.

    Args:
        kwargs: Additional keyword arguments to pass to the base class
            constructor.
    """
    super().__init__(**kwargs)

    # Convert the resource ID to its canonical form. For resource types
    # that don't support multiple instances:
    # - if a resource ID is not provided, we use the default resource ID for
    # the resource type
    # - if a resource ID is provided, we verify that it matches the default
    # resource ID for the resource type
    if self.resource_type:
        try:
            self.resource_id = self._validate_resource_id(
                self.resource_type, self.resource_id
            )
        except AuthorizationException as e:
            error = (
                f"Authorization error validating resource ID "
                f"{self.resource_id} for resource type "
                f"{self.resource_type}: {e}"
            )
            # Log an exception if debug logging is enabled
            if logger.isEnabledFor(logging.DEBUG):
                logger.exception(error)
            else:
                logger.warning(error)

            self.resource_id = None
Attributes
supported_resource_types: List[str] property

The resource types supported by this connector instance.

Returns:

Type Description
List[str]

A list with the resource types supported by this connector instance.

type: ServiceConnectorTypeModel property

Get the connector type specification.

Returns:

Type Description
ServiceConnectorTypeModel

The connector type specification.

Functions
auto_configure(auth_method: Optional[str] = None, resource_type: Optional[str] = None, resource_id: Optional[str] = None, **kwargs: Any) -> Optional[ServiceConnector] classmethod

Auto-configure a connector instance.

Instantiate a connector with a configuration extracted from the authentication configuration available in the environment (e.g. environment variables or local client/SDK configuration files).

Parameters:

Name Type Description Default
auth_method Optional[str]

The particular authentication method to use. If omitted and if the connector implementation cannot decide which authentication method to use, it may raise an exception.

None
resource_type Optional[str]

The type of resource to configure. If not specified, the method returns a connector instance configured to access any of the supported resource types (multi-type connector) or configured to use a default resource type. If the connector doesn't support multi-type configurations or if it cannot decide which resource type to use, it may raise an exception.

None
resource_id Optional[str]

The ID of the resource instance to configure. The connector implementation may choose to either require or ignore this parameter if it does not support or detect a resource type that supports multiple instances.

None
kwargs Any

Additional implementation specific keyword arguments to use.

{}

Returns:

Type Description
Optional[ServiceConnector]

A connector instance configured with authentication credentials

Optional[ServiceConnector]

automatically extracted from the environment or None if

Optional[ServiceConnector]

auto-configuration is not supported.

Raises:

Type Description
ValueError

If the connector does not support the requested authentication method or resource type.

AuthorizationException

If the connector's authentication credentials have expired.

Source code in src/zenml/service_connectors/service_connector.py
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
@classmethod
def auto_configure(
    cls,
    auth_method: Optional[str] = None,
    resource_type: Optional[str] = None,
    resource_id: Optional[str] = None,
    **kwargs: Any,
) -> Optional["ServiceConnector"]:
    """Auto-configure a connector instance.

    Instantiate a connector with a configuration extracted from the
    authentication configuration available in the environment (e.g.
    environment variables or local client/SDK configuration files).

    Args:
        auth_method: The particular authentication method to use. If
            omitted and if the connector implementation cannot decide which
            authentication method to use, it may raise an exception.
        resource_type: The type of resource to configure. If not specified,
            the method returns a connector instance configured to access any
            of the supported resource types (multi-type connector) or
            configured to use a default resource type. If the connector
            doesn't support multi-type configurations or if it cannot decide
            which resource type to use, it may raise an exception.
        resource_id: The ID of the resource instance to configure. The
            connector implementation may choose to either require or ignore
            this parameter if it does not support or detect a resource type
            that supports multiple instances.
        kwargs: Additional implementation specific keyword arguments to use.

    Returns:
        A connector instance configured with authentication credentials
        automatically extracted from the environment or None if
        auto-configuration is not supported.

    Raises:
        ValueError: If the connector does not support the requested
            authentication method or resource type.
        AuthorizationException: If the connector's authentication
            credentials have expired.
    """
    spec = cls.get_type()

    if not spec.supports_auto_configuration:
        return None

    if auth_method and auth_method not in spec.auth_method_dict:
        raise ValueError(
            f"connector type {spec.name} does not support authentication "
            f"method: '{auth_method}'"
        )

    if resource_type and resource_type not in spec.resource_type_dict:
        raise ValueError(
            f"connector type {spec.name} does not support resource type: "
            f"'{resource_type}'"
        )

    connector = cls._auto_configure(
        auth_method=auth_method,
        resource_type=resource_type,
        resource_id=resource_id,
        **kwargs,
    )

    if connector.has_expired():
        raise AuthorizationException(
            "the connector's auto-configured authentication credentials "
            "have expired."
        )

    connector._verify(
        resource_type=connector.resource_type,
        resource_id=connector.resource_id,
    )
    return connector
configure_local_client(**kwargs: Any) -> None

Configure a local client to authenticate and connect to a resource.

This method uses the connector's configuration to configure a local client or SDK installed on the localhost so that it can authenticate and connect to the resource that the connector is configured to access.

The connector has to be fully configured for this method to succeed (i.e. the connector's configuration must be valid, a resource type must be specified and the resource ID must be specified if the resource type supports multiple instances). This method should only be called on a connector client retrieved by calling get_connector_client on the main service connector.

Parameters:

Name Type Description Default
kwargs Any

Additional implementation specific keyword arguments to use to configure the client.

{}

Raises:

Type Description
AuthorizationException

If the connector's authentication credentials have expired.

Source code in src/zenml/service_connectors/service_connector.py
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
def configure_local_client(
    self,
    **kwargs: Any,
) -> None:
    """Configure a local client to authenticate and connect to a resource.

    This method uses the connector's configuration to configure a local
    client or SDK installed on the localhost so that it can authenticate
    and connect to the resource that the connector is configured to access.

    The connector has to be fully configured for this method to succeed
    (i.e. the connector's configuration must be valid, a resource type must
    be specified and the resource ID must be specified if the resource type
    supports multiple instances). This method should only be called on a
    connector client retrieved by calling `get_connector_client` on the
    main service connector.

    Args:
        kwargs: Additional implementation specific keyword arguments to use
            to configure the client.

    Raises:
        AuthorizationException: If the connector's authentication
            credentials have expired.
    """
    resource_type, resource_id = self.validate_runtime_args(
        resource_type=self.resource_type,
        resource_id=self.resource_id,
        require_resource_type=True,
        require_resource_id=True,
    )

    if self.has_expired():
        raise AuthorizationException(
            "the connector's authentication credentials have expired."
        )

    self._verify(
        resource_type=resource_type,
        resource_id=resource_id,
    )

    self._configure_local_client(
        **kwargs,
    )
connect(verify: bool = True, **kwargs: Any) -> Any

Authenticate and connect to a resource.

Initialize and return an implementation specific object representing an authenticated service client, connection or session that can be used to access the resource that the connector is configured to access.

The connector has to be fully configured for this method to succeed (i.e. the connector's configuration must be valid, a resource type and a resource ID must be configured). This method should only be called on a connector client retrieved by calling get_connector_client on the main service connector.

Parameters:

Name Type Description Default
verify bool

Whether to verify that the connector can access the configured resource before connecting to it.

True
kwargs Any

Additional implementation specific keyword arguments to use to configure the client.

{}

Returns:

Type Description
Any

An implementation specific object representing the authenticated

Any

service client, connection or session.

Raises:

Type Description
AuthorizationException

If the connector's authentication credentials have expired.

Source code in src/zenml/service_connectors/service_connector.py
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
def connect(
    self,
    verify: bool = True,
    **kwargs: Any,
) -> Any:
    """Authenticate and connect to a resource.

    Initialize and return an implementation specific object representing an
    authenticated service client, connection or session that can be used
    to access the resource that the connector is configured to access.

    The connector has to be fully configured for this method to succeed
    (i.e. the connector's configuration must be valid, a resource type and
    a resource ID must be configured). This method should only be called on
    a connector client retrieved by calling `get_connector_client` on the
    main service connector.

    Args:
        verify: Whether to verify that the connector can access the
            configured resource before connecting to it.
        kwargs: Additional implementation specific keyword arguments to use
            to configure the client.

    Returns:
        An implementation specific object representing the authenticated
        service client, connection or session.

    Raises:
        AuthorizationException: If the connector's authentication
            credentials have expired.
    """
    if verify:
        resource_type, resource_id = self.validate_runtime_args(
            resource_type=self.resource_type,
            resource_id=self.resource_id,
            require_resource_type=True,
            require_resource_id=True,
        )

        if self.has_expired():
            raise AuthorizationException(
                "the connector's authentication credentials have expired."
            )

        self._verify(
            resource_type=resource_type,
            resource_id=resource_id,
        )

    return self._connect_to_resource(
        **kwargs,
    )
from_model(model: Union[ServiceConnectorRequest, ServiceConnectorResponse]) -> ServiceConnector classmethod

Creates a service connector instance from a service connector model.

Parameters:

Name Type Description Default
model Union[ServiceConnectorRequest, ServiceConnectorResponse]

The service connector model.

required

Returns:

Type Description
ServiceConnector

The created service connector instance.

Raises:

Type Description
ValueError

If the connector configuration is invalid.

Source code in src/zenml/service_connectors/service_connector.py
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
@classmethod
def from_model(
    cls,
    model: Union["ServiceConnectorRequest", "ServiceConnectorResponse"],
) -> "ServiceConnector":
    """Creates a service connector instance from a service connector model.

    Args:
        model: The service connector model.

    Returns:
        The created service connector instance.

    Raises:
        ValueError: If the connector configuration is invalid.
    """
    # Validate the connector configuration model
    spec = cls.get_type()

    # Multiple resource types in the model means that the connector
    # instance is configured to access any of the supported resource
    # types (a multi-type connector). We represent that here by setting the
    # resource type to None.
    resource_type: Optional[str] = None
    if len(model.resource_types) == 1:
        resource_type = model.resource_types[0]

    expiration_seconds: Optional[int] = None
    try:
        method_spec, _ = spec.find_resource_specifications(
            model.auth_method,
            resource_type,
        )
        expiration_seconds = method_spec.validate_expiration(
            model.expiration_seconds
        )
    except (KeyError, ValueError) as e:
        raise ValueError(
            f"connector configuration is not valid: {e}"
        ) from e

    # Unpack the authentication configuration
    config = model.configuration.copy()
    if isinstance(model, ServiceConnectorResponse) and model.secret_id:
        try:
            secret = Client().get_secret(model.secret_id)
        except KeyError as e:
            raise ValueError(
                f"could not fetch secret with ID '{model.secret_id}' "
                f"referenced in the connector configuration: {e}"
            ) from e

        if secret.has_missing_values:
            raise ValueError(
                f"secret with ID '{model.secret_id}' referenced in the "
                "connector configuration has missing values. This can "
                "happen for example if your user lacks the permissions "
                "required to access the secret."
            )

        config.update(secret.secret_values)

    if model.secrets:
        config.update(
            {
                k: v.get_secret_value()
                for k, v in model.secrets.items()
                if v
            }
        )

    if method_spec.config_class is None:
        raise ValueError(
            f"the implementation of the {model.name} connector type is "
            "not available in the environment. Please check that you "
            "have installed the required dependencies."
        )

    # Validate the authentication configuration
    try:
        auth_config = method_spec.config_class(**config)
    except ValidationError as e:
        raise ValueError(
            f"connector configuration is not valid: {e}"
        ) from e

    assert isinstance(auth_config, AuthenticationConfig)

    connector = cls(
        auth_method=model.auth_method,
        resource_type=resource_type,
        resource_id=model.resource_id,
        config=auth_config,
        expires_at=model.expires_at,
        expires_skew_tolerance=model.expires_skew_tolerance,
        expiration_seconds=expiration_seconds,
    )
    if isinstance(model, ServiceConnectorResponse):
        connector.id = model.id
        connector.name = model.name

    return connector
get_connector_client(resource_type: Optional[str] = None, resource_id: Optional[str] = None) -> ServiceConnector

Get a connector client that can be used to connect to a resource.

The connector client can be used by consumers to connect to a resource (i.e. make calls to connect and configure_local_client).

The returned connector may be the same as the original connector or it may a different instance configured with different credentials or even of a different connector type.

Parameters:

Name Type Description Default
resource_type Optional[str]

The type of the resource to connect to.

None
resource_id Optional[str]

The ID of a particular resource to connect to.

None

Returns:

Type Description
ServiceConnector

A service connector client that can be used to connect to the

ServiceConnector

resource.

Raises:

Type Description
AuthorizationException

If authentication failed.

Source code in src/zenml/service_connectors/service_connector.py
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
def get_connector_client(
    self,
    resource_type: Optional[str] = None,
    resource_id: Optional[str] = None,
) -> "ServiceConnector":
    """Get a connector client that can be used to connect to a resource.

    The connector client can be used by consumers to connect to a resource
    (i.e. make calls to `connect` and `configure_local_client`).

    The returned connector may be the same as the original connector
    or it may a different instance configured with different credentials or
    even of a different connector type.

    Args:
        resource_type: The type of the resource to connect to.
        resource_id: The ID of a particular resource to connect to.

    Returns:
        A service connector client that can be used to connect to the
        resource.

    Raises:
        AuthorizationException: If authentication failed.
    """
    resource_type, resource_id = self.validate_runtime_args(
        resource_type=resource_type,
        resource_id=resource_id,
        require_resource_type=True,
        require_resource_id=True,
    )

    if self.has_expired():
        raise AuthorizationException(
            "the connector's authentication credentials have expired."
        )

    # Verify if the connector allows access to the requested resource type
    # and instance.
    self._verify(
        resource_type=resource_type,
        resource_id=resource_id,
    )

    assert resource_type is not None
    assert resource_id is not None

    connector_client = self._get_connector_client(
        resource_type=resource_type,
        resource_id=resource_id,
    )
    # Transfer the expiration skew tolerance to the connector client
    # if an expiration time is set for the connector client credentials.
    if connector_client.expires_at is not None:
        connector_client.expires_skew_tolerance = (
            self.expires_skew_tolerance
        )

    if connector_client.has_expired():
        raise AuthorizationException(
            "the connector's authentication credentials have expired."
        )

    connector_client._verify(
        resource_type=resource_type,
        resource_id=connector_client.resource_id,
    )

    return connector_client
get_type() -> ServiceConnectorTypeModel classmethod

Get the connector type specification.

Returns:

Type Description
ServiceConnectorTypeModel

The connector type specification.

Source code in src/zenml/service_connectors/service_connector.py
560
561
562
563
564
565
566
567
568
569
570
571
572
573
@classmethod
def get_type(cls) -> ServiceConnectorTypeModel:
    """Get the connector type specification.

    Returns:
        The connector type specification.
    """
    if cls._TYPE is not None:
        return cls._TYPE

    connector_type = cls._get_connector_type()
    connector_type.set_connector_class(cls)
    cls._TYPE = connector_type
    return cls._TYPE
has_expired() -> bool

Check if the connector authentication credentials have expired.

Verify that the authentication credentials associated with the connector have not expired by checking the expiration time against the current time.

Returns:

Type Description
bool

True if the connector has expired, False otherwise.

Source code in src/zenml/service_connectors/service_connector.py
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
def has_expired(self) -> bool:
    """Check if the connector authentication credentials have expired.

    Verify that the authentication credentials associated with the connector
    have not expired by checking the expiration time against the current
    time.

    Returns:
        True if the connector has expired, False otherwise.
    """
    if not self.expires_at:
        return False

    expires_at = self.expires_at.replace(tzinfo=timezone.utc)
    # Subtract some time to account for clock skew or other delays.
    expires_at = expires_at - timedelta(
        seconds=self.expires_skew_tolerance
        if self.expires_skew_tolerance is not None
        else SERVICE_CONNECTOR_SKEW_TOLERANCE_SECONDS
    )
    now = utc_now(tz_aware=expires_at)
    delta = expires_at - now
    result = delta < timedelta(seconds=0)

    logger.debug(
        f"Checking if connector {self.name} has expired.\n"
        f"Expires at: {self.expires_at}\n"
        f"Expires at (+skew): {expires_at}\n"
        f"Current UTC time: {now}\n"
        f"Delta: {delta}\n"
        f"Result: {result}\n"
    )

    return result
to_model(user: UUID, workspace: UUID, name: Optional[str] = None, description: str = '', labels: Optional[Dict[str, str]] = None) -> ServiceConnectorRequest

Convert the connector instance to a service connector model.

Parameters:

Name Type Description Default
name Optional[str]

The name of the connector.

None
user UUID

The ID of the user that created the connector.

required
workspace UUID

The ID of the workspace that the connector belongs to.

required
description str

The description of the connector.

''
labels Optional[Dict[str, str]]

The labels of the connector.

None

Returns:

Type Description
ServiceConnectorRequest

The service connector model corresponding to the connector

ServiceConnectorRequest

instance.

Raises:

Type Description
ValueError

If the connector configuration is not valid.

Source code in src/zenml/service_connectors/service_connector.py
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
def to_model(
    self,
    user: UUID,
    workspace: UUID,
    name: Optional[str] = None,
    description: str = "",
    labels: Optional[Dict[str, str]] = None,
) -> "ServiceConnectorRequest":
    """Convert the connector instance to a service connector model.

    Args:
        name: The name of the connector.
        user: The ID of the user that created the connector.
        workspace: The ID of the workspace that the connector belongs to.
        description: The description of the connector.
        labels: The labels of the connector.

    Returns:
        The service connector model corresponding to the connector
        instance.

    Raises:
        ValueError: If the connector configuration is not valid.
    """
    spec = self.get_type()

    name = name or self.name
    if name is None:
        raise ValueError(
            "connector configuration is not valid: name must be set"
        )

    model = ServiceConnectorRequest(
        connector_type=spec.connector_type,
        name=name,
        description=description,
        user=user,
        workspace=workspace,
        auth_method=self.auth_method,
        expires_at=self.expires_at,
        expires_skew_tolerance=self.expires_skew_tolerance,
        expiration_seconds=self.expiration_seconds,
        labels=labels or {},
    )

    # Validate the connector configuration.
    model.validate_and_configure_resources(
        connector_type=spec,
        resource_types=self.resource_type,
        resource_id=self.resource_id,
        configuration=self.config.non_secret_values,
        secrets=self.config.secret_values,  # type: ignore[arg-type]
    )

    return model
to_response_model(workspace: WorkspaceResponse, user: Optional[UserResponse] = None, name: Optional[str] = None, id: Optional[UUID] = None, description: str = '', labels: Optional[Dict[str, str]] = None) -> ServiceConnectorResponse

Convert the connector instance to a service connector response model.

Parameters:

Name Type Description Default
workspace WorkspaceResponse

The workspace that the connector belongs to.

required
user Optional[UserResponse]

The user that created the connector.

None
name Optional[str]

The name of the connector.

None
id Optional[UUID]

The ID of the connector.

None
description str

The description of the connector.

''
labels Optional[Dict[str, str]]

The labels of the connector.

None

Returns:

Type Description
ServiceConnectorResponse

The service connector response model corresponding to the connector

ServiceConnectorResponse

instance.

Raises:

Type Description
ValueError

If the connector configuration is not valid.

Source code in src/zenml/service_connectors/service_connector.py
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
def to_response_model(
    self,
    workspace: WorkspaceResponse,
    user: Optional[UserResponse] = None,
    name: Optional[str] = None,
    id: Optional[UUID] = None,
    description: str = "",
    labels: Optional[Dict[str, str]] = None,
) -> "ServiceConnectorResponse":
    """Convert the connector instance to a service connector response model.

    Args:
        workspace: The workspace that the connector belongs to.
        user: The user that created the connector.
        name: The name of the connector.
        id: The ID of the connector.
        description: The description of the connector.
        labels: The labels of the connector.

    Returns:
        The service connector response model corresponding to the connector
        instance.

    Raises:
        ValueError: If the connector configuration is not valid.
    """
    spec = self.get_type()

    name = name or self.name
    id = id or self.id
    if name is None or id is None:
        raise ValueError(
            "connector configuration is not valid: name and ID must be set"
        )

    now = utc_now()
    model = ServiceConnectorResponse(
        id=id,
        name=name,
        body=ServiceConnectorResponseBody(
            user=user,
            created=now,
            updated=now,
            description=description,
            connector_type=self.get_type(),
            auth_method=self.auth_method,
            expires_at=self.expires_at,
            expires_skew_tolerance=self.expires_skew_tolerance,
        ),
        metadata=ServiceConnectorResponseMetadata(
            workspace=workspace,
            expiration_seconds=self.expiration_seconds,
            labels=labels or {},
        ),
    )

    # Validate the connector configuration.
    model.validate_and_configure_resources(
        connector_type=spec,
        resource_types=self.resource_type,
        resource_id=self.resource_id,
        configuration=self.config.non_secret_values,
        secrets=self.config.secret_values,  # type: ignore[arg-type]
    )

    return model
validate_runtime_args(resource_type: Optional[str], resource_id: Optional[str] = None, require_resource_type: bool = False, require_resource_id: bool = False, **kwargs: Any) -> Tuple[Optional[str], Optional[str]]

Validate the runtime arguments against the connector configuration.

Validate that the supplied runtime arguments are compatible with the connector configuration and its specification. This includes validating that the resource type and resource ID are compatible with the connector configuration and its capabilities.

Parameters:

Name Type Description Default
resource_type Optional[str]

The type of the resource supplied at runtime by the connector's consumer. Must be the same as the resource type that the connector is configured to access, unless the connector is configured to access any resource type.

required
resource_id Optional[str]

The ID of the resource requested by the connector's consumer. Can be different than the resource ID that the connector is configured to access, e.g. if it is not in the canonical form.

None
require_resource_type bool

Whether the resource type is required.

False
require_resource_id bool

Whether the resource ID is required.

False
kwargs Any

Additional runtime arguments.

{}

Returns:

Type Description
Tuple[Optional[str], Optional[str]]

The validated resource type and resource ID.

Raises:

Type Description
ValueError

If the runtime arguments are not valid.

Source code in src/zenml/service_connectors/service_connector.py
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
def validate_runtime_args(
    self,
    resource_type: Optional[str],
    resource_id: Optional[str] = None,
    require_resource_type: bool = False,
    require_resource_id: bool = False,
    **kwargs: Any,
) -> Tuple[Optional[str], Optional[str]]:
    """Validate the runtime arguments against the connector configuration.

    Validate that the supplied runtime arguments are compatible with the
    connector configuration and its specification. This includes validating
    that the resource type and resource ID are compatible with the connector
    configuration and its capabilities.

    Args:
        resource_type: The type of the resource supplied at runtime by the
            connector's consumer. Must be the same as the resource type that
            the connector is configured to access, unless the connector is
            configured to access any resource type.
        resource_id: The ID of the resource requested by the connector's
            consumer. Can be different than the resource ID that the
            connector is configured to access, e.g. if it is not in the
            canonical form.
        require_resource_type: Whether the resource type is required.
        require_resource_id: Whether the resource ID is required.
        kwargs: Additional runtime arguments.

    Returns:
        The validated resource type and resource ID.

    Raises:
        ValueError: If the runtime arguments are not valid.
    """
    if (
        self.resource_type
        and resource_type
        and (self.resource_type != resource_type)
    ):
        raise ValueError(
            f"the connector is configured to provide access to a "
            f"'{self.resource_type}' resource type, but a different "
            f"resource type was requested: '{resource_type}'."
        )

    resource_type = resource_type or self.resource_type
    resource_id = resource_id or self.resource_id

    if require_resource_type and not resource_type:
        raise ValueError(
            "the connector is configured to provide access to multiple "
            "resource types. A resource type must be specified when "
            "requesting access to a resource."
        )

    spec = self.get_type()

    try:
        # Get the resource specification corresponding to the
        # connector configuration.
        _, resource_spec = spec.find_resource_specifications(
            self.auth_method,
            resource_type,
        )
    except (KeyError, ValueError) as e:
        raise ValueError(
            f"connector configuration is not valid: {e}"
        ) from e

    if not resource_type or not resource_spec:
        if resource_id:
            raise ValueError(
                "the connector is configured to provide access to multiple "
                "resource types, but only a resource name was specified. A "
                "resource type must also be specified when "
                "requesting access to a resource."
            )

        return resource_type, resource_id

    # Validate and convert the resource ID to its canonical form.
    # A default resource ID is returned for resource types that do not
    # support instances, if no resource ID is specified.
    resource_id = self._validate_resource_id(
        resource_type=resource_type,
        resource_id=resource_id,
    )

    if resource_id:
        if self.resource_id and self.resource_id != resource_id:
            raise ValueError(
                f"the connector is configured to provide access to a "
                f"single {resource_spec.name} resource with a "
                f"resource name of '{self.resource_id}', but a "
                f"different resource name was requested: "
                f"'{resource_id}'."
            )

    else:
        if not self.resource_id and require_resource_id:
            raise ValueError(
                f"the connector is configured to provide access to "
                f"multiple {resource_spec.name} resources. A resource name "
                "must be specified when requesting access to a resource."
            )

    return resource_type, resource_id
verify(resource_type: Optional[str] = None, resource_id: Optional[str] = None, list_resources: bool = True) -> ServiceConnectorResourcesModel

Verify and optionally list all the resources that the connector can access.

This method uses the connector's configuration to verify that it can authenticate and access the indicated resource(s).

If list_resources is set, the list of resources that the connector can access, scoped to the supplied resource type and resource ID is included in the response, otherwise the connector only verifies that it can globally authenticate and doesn't verify or return resource information (i.e. the resource_ids fields in the response are empty).

Parameters:

Name Type Description Default
resource_type Optional[str]

The type of the resource to verify. If the connector instance is already configured with a resource type, this argument must be the same as the one configured if supplied.

None
resource_id Optional[str]

The ID of a particular resource instance to check whether the connector can access. If the connector instance is already configured with a resource ID that is not the same or equivalent to the one requested, a ValueError exception is raised.

None
list_resources bool

Whether to list the resources that the connector can access.

True

Returns:

Type Description
ServiceConnectorResourcesModel

A list of resources that the connector can access.

Raises:

Type Description
ValueError

If the arguments or the connector configuration are not valid.

Source code in src/zenml/service_connectors/service_connector.py
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
def verify(
    self,
    resource_type: Optional[str] = None,
    resource_id: Optional[str] = None,
    list_resources: bool = True,
) -> ServiceConnectorResourcesModel:
    """Verify and optionally list all the resources that the connector can access.

    This method uses the connector's configuration to verify that it can
    authenticate and access the indicated resource(s).

    If `list_resources` is set, the list of resources that the connector can
    access, scoped to the supplied resource type and resource ID is included
    in the response, otherwise the connector only verifies that it can
    globally authenticate and doesn't verify or return resource information
    (i.e. the `resource_ids` fields in the response are empty).

    Args:
        resource_type: The type of the resource to verify. If the connector
            instance is already configured with a resource type, this
            argument must be the same as the one configured if supplied.
        resource_id: The ID of a particular resource instance to check
            whether the connector can access. If the connector instance is
            already configured with a resource ID that is not the same or
            equivalent to the one requested, a `ValueError` exception is
            raised.
        list_resources: Whether to list the resources that the connector can
            access.

    Returns:
        A list of resources that the connector can access.

    Raises:
        ValueError: If the arguments or the connector configuration are
            not valid.
    """
    spec = self.get_type()

    resources = ServiceConnectorResourcesModel(
        connector_type=spec,
        id=self.id,
        name=self.name,
    )

    name_msg = f" '{self.name}'" if self.name else ""

    resource_types = self.supported_resource_types
    if resource_type:
        if resource_type not in resource_types:
            raise ValueError(
                f"connector{name_msg} does not support resource type: "
                f"'{resource_type}'. Supported resource types are: "
                f"{', '.join(resource_types)}"
            )
        resource_types = [resource_type]

    # Pre-populate the list of resources with entries corresponding to the
    # supported resource types and scoped to the supplied resource type
    resources.resources = [
        ServiceConnectorTypedResourcesModel(
            resource_type=rt,
        )
        for rt in resource_types
    ]

    if self.has_expired():
        error = "the connector's authentication credentials have expired."
        # Log the error in the resources object
        resources.set_error(error)
        return resources

    verify_resource_types: List[Optional[str]] = []
    verify_resource_id = None
    if not list_resources and not resource_id:
        # If we're not listing resources, we're only verifying that the
        # connector can authenticate globally (i.e. without supplying a
        # resource type or resource ID). This is the same as if no resource
        # type or resource ID was supplied. The only exception is when the
        # verification scope is already set to a single resource ID, in
        # which case we still perform the verification on that resource ID.
        verify_resource_types = [None]
        verify_resource_id = None
    else:
        if len(resource_types) > 1:
            # This forces the connector to verify that it can authenticate
            # globally (i.e. without supplying a resource type or resource
            # ID) before listing individual resources in the case of
            # multi-type service connectors.
            verify_resource_types = [None]

        verify_resource_types.extend(resource_types)
        if len(verify_resource_types) == 1:
            verify_resource_id = resource_id

    # Here we go through each resource type and attempt to verify and list
    # the resources that the connector can access for that resource type.
    # This list may start with a `None` resource type, which indicates that
    # the connector should verify that it can authenticate globally.
    for resource_type in verify_resource_types:
        try:
            resource_type, resource_id = self.validate_runtime_args(
                resource_type=resource_type,
                resource_id=verify_resource_id,
                require_resource_type=False,
                require_resource_id=False,
            )

            resource_ids = self._verify(
                resource_type=resource_type,
                resource_id=resource_id,
            )
        except ValueError as exc:
            raise ValueError(
                f"The connector configuration is incomplete or invalid: "
                f"{exc}",
            )
        except AuthorizationException as exc:
            error = f"connector{name_msg} authorization failure: {exc}"
            # Log an exception if debug logging is enabled
            if logger.isEnabledFor(logging.DEBUG):
                logger.exception(error)
            else:
                logger.warning(error)

            # Log the error in the resources object
            resources.set_error(error, resource_type=resource_type)
            if resource_type:
                continue
            else:
                # We stop on a global failure
                break
        except Exception as exc:
            error = (
                f"connector{name_msg} verification failed with "
                f"unexpected error: {exc}"
            )
            # Log an exception if debug logging is enabled
            if logger.isEnabledFor(logging.DEBUG):
                logger.exception(error)
            else:
                logger.warning(error)
            error = (
                "an unexpected error occurred while verifying the "
                "connector."
            )
            # Log the error in the resources object
            resources.set_error(error, resource_type=resource_type)
            if resource_type:
                continue
            else:
                # We stop on a global failure
                break

        if not resource_type:
            # If a resource type is not provided as argument, we don't
            # expect any resources to be listed
            continue

        resource_type_spec = spec.resource_type_dict[resource_type]

        if resource_id:
            # A single resource was requested, so we expect a single
            # resource to be listed
            if [resource_id] != resource_ids:
                logger.error(
                    f"a different resource ID '{resource_ids}' was "
                    f"returned than the one requested: {resource_ids}. "
                    f"This is likely a bug in the {self.__class__} "
                    "connector implementation."
                )
            resources.set_resource_ids(resource_type, [resource_id])
        elif not resource_ids:
            # If no resources were listed, signal this as an error that the
            # connector cannot access any resources.
            error = (
                f"connector{name_msg} didn't list any "
                f"{resource_type_spec.name} resources. This is likely "
                "caused by the connector credentials not being valid or "
                "not having sufficient permissions to list or access "
                "resources of this type. Please check the connector "
                "configuration and its credentials and try again."
            )
            logger.debug(error)
            resources.set_error(error, resource_type=resource_type)
        else:
            resources.set_resource_ids(resource_type, resource_ids)

    return resources
ServiceConnectorMeta

Bases: ModelMetaclass

Metaclass responsible for automatically registering ServiceConnector classes.

Functions
__new__(mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]) -> ServiceConnectorMeta

Creates a new ServiceConnector class and registers it.

Parameters:

Name Type Description Default
name str

The name of the class.

required
bases Tuple[Type[Any], ...]

The base classes of the class.

required
dct Dict[str, Any]

The dictionary of the class.

required

Returns:

Type Description
ServiceConnectorMeta

The ServiceConnectorMeta class.

Source code in src/zenml/service_connectors/service_connector.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def __new__(
    mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "ServiceConnectorMeta":
    """Creates a new ServiceConnector class and registers it.

    Args:
        name: The name of the class.
        bases: The base classes of the class.
        dct: The dictionary of the class.

    Returns:
        The ServiceConnectorMeta class.
    """
    cls = cast(
        Type["ServiceConnector"], super().__new__(mcs, name, bases, dct)
    )

    # Skip the following validation and registration for the base class.
    if name == "ServiceConnector":
        return cls

    else:
        from zenml.service_connectors.service_connector_registry import (
            service_connector_registry,
        )

        # Register the service connector.
        service_connector_registry.register_service_connector_type(
            cls.get_type()
        )

    return cls
Functions

service_connector_registry

Implementation of a service connector registry.

Classes
ServiceConnectorRegistry()

Service connector registry.

Initialize the service connector registry.

Source code in src/zenml/service_connectors/service_connector_registry.py
34
35
36
37
38
def __init__(self) -> None:
    """Initialize the service connector registry."""
    self.service_connector_types: Dict[str, ServiceConnectorTypeModel] = {}
    self.initialized = False
    self.lock = threading.RLock()
Functions
get_service_connector_type(connector_type: str) -> ServiceConnectorTypeModel

Get a service connector type by its connector type identifier.

Parameters:

Name Type Description Default
connector_type str

The service connector type identifier.

required

Returns:

Type Description
ServiceConnectorTypeModel

A service connector type that was registered for the given

ServiceConnectorTypeModel

connector type identifier.

Raises:

Type Description
KeyError

If no service connector was registered for the given type identifier.

Source code in src/zenml/service_connectors/service_connector_registry.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def get_service_connector_type(
    self,
    connector_type: str,
) -> ServiceConnectorTypeModel:
    """Get a service connector type by its connector type identifier.

    Args:
        connector_type: The service connector type identifier.

    Returns:
        A service connector type that was registered for the given
        connector type identifier.

    Raises:
        KeyError: If no service connector was registered for the given type
            identifier.
    """
    self.register_builtin_service_connectors()

    if connector_type not in self.service_connector_types:
        raise KeyError(
            f"Service connector type {connector_type} is not available. "
            f"Please make sure the corresponding packages and/or ZenML "
            f"integration are installed and try again."
        )
    return self.service_connector_types[connector_type].model_copy()
instantiate_connector(model: Union[ServiceConnectorRequest, ServiceConnectorResponse]) -> ServiceConnector

Validate a service connector model and create an instance from it.

Parameters:

Name Type Description Default
model Union[ServiceConnectorRequest, ServiceConnectorResponse]

The service connector model to validate and instantiate.

required

Returns:

Type Description
ServiceConnector

A service connector instance.

Raises:

Type Description
NotImplementedError

If no service connector is registered for the given type identifier.

ValueError

If the service connector model is not valid.

Source code in src/zenml/service_connectors/service_connector_registry.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def instantiate_connector(
    self,
    model: Union[
        "ServiceConnectorRequest",
        "ServiceConnectorResponse",
    ],
) -> "ServiceConnector":
    """Validate a service connector model and create an instance from it.

    Args:
        model: The service connector model to validate and instantiate.

    Returns:
        A service connector instance.

    Raises:
        NotImplementedError: If no service connector is registered for the
            given type identifier.
        ValueError: If the service connector model is not valid.
    """
    try:
        service_connector_type = self.get_service_connector_type(
            model.type
        )
    except KeyError:
        raise NotImplementedError(
            f"Service connector type {model.type} is not available "
            "locally. Please make sure the corresponding packages and/or "
            "ZenML integration are installed and try again."
        )

    assert service_connector_type.connector_class is not None

    try:
        return service_connector_type.connector_class.from_model(model)
    except ValueError as e:
        raise ValueError(
            f"The service connector configuration is not valid: {e}"
        )
is_registered(connector_type: str) -> bool

Returns if a service connector is registered for the given type identifier.

Parameters:

Name Type Description Default
connector_type str

The service connector type identifier.

required

Returns:

Type Description
bool

True if a service connector is registered for the given type

bool

identifier, False otherwise.

Source code in src/zenml/service_connectors/service_connector_registry.py
110
111
112
113
114
115
116
117
118
119
120
121
def is_registered(self, connector_type: str) -> bool:
    """Returns if a service connector is registered for the given type identifier.

    Args:
        connector_type: The service connector type identifier.

    Returns:
        True if a service connector is registered for the given type
        identifier, False otherwise.
    """
    self.register_builtin_service_connectors()
    return connector_type in self.service_connector_types
list_service_connector_types(connector_type: Optional[str] = None, resource_type: Optional[str] = None, auth_method: Optional[str] = None) -> List[ServiceConnectorTypeModel]

Find one or more service connector types that match the given criteria.

Parameters:

Name Type Description Default
connector_type Optional[str]

Filter by service connector type identifier.

None
resource_type Optional[str]

Filter by a resource type that the connector can be used to give access to.

None
auth_method Optional[str]

Filter by an authentication method that the connector uses to authenticate with the resource provider.

None

Returns:

Type Description
List[ServiceConnectorTypeModel]

A list of service connector type models that match the given

List[ServiceConnectorTypeModel]

criteria.

Source code in src/zenml/service_connectors/service_connector_registry.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def list_service_connector_types(
    self,
    connector_type: Optional[str] = None,
    resource_type: Optional[str] = None,
    auth_method: Optional[str] = None,
) -> List[ServiceConnectorTypeModel]:
    """Find one or more service connector types that match the given criteria.

    Args:
        connector_type: Filter by service connector type identifier.
        resource_type: Filter by a resource type that the connector can
            be used to give access to.
        auth_method: Filter by an authentication method that the connector
            uses to authenticate with the resource provider.

    Returns:
        A list of service connector type models that match the given
        criteria.
    """
    self.register_builtin_service_connectors()

    matches: List[ServiceConnectorTypeModel] = []
    for service_connector_type in self.service_connector_types.values():
        if (
            (
                connector_type is None
                or connector_type == service_connector_type.connector_type
            )
            and (
                resource_type is None
                or resource_type
                in service_connector_type.resource_type_dict
            )
            and (
                auth_method is None
                or auth_method in service_connector_type.auth_method_dict
            )
        ):
            matches.append(service_connector_type.model_copy())

    return matches
register_builtin_service_connectors() -> None

Registers the default built-in service connectors.

Source code in src/zenml/service_connectors/service_connector_registry.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def register_builtin_service_connectors(self) -> None:
    """Registers the default built-in service connectors."""
    # Only register built-in service connectors once
    with self.lock:
        if self.initialized:
            return

        self.initialized = True

        try:
            from zenml.integrations.aws.service_connectors.aws_service_connector import (  # noqa
                AWSServiceConnector,
            )
        except ImportError as e:
            logger.warning(f"Could not import AWS service connector: {e}.")

        try:
            from zenml.integrations.gcp.service_connectors.gcp_service_connector import (  # noqa
                GCPServiceConnector,
            )
        except ImportError as e:
            logger.warning(f"Could not import GCP service connector: {e}.")

        try:
            from zenml.integrations.azure.service_connectors.azure_service_connector import (  # noqa
                AzureServiceConnector,
            )
        except ImportError as e:
            logger.warning(
                f"Could not import Azure service connector: {e}."
            )

        try:
            from zenml.integrations.kubernetes.service_connectors.kubernetes_service_connector import (  # noqa
                KubernetesServiceConnector,
            )
        except ImportError as e:
            logger.warning(
                f"Could not import Kubernetes service connector: {e}."
            )

        try:
            from zenml.service_connectors.docker_service_connector import (  # noqa
                DockerServiceConnector,
            )
        except ImportError as e:
            logger.warning(
                f"Could not import Docker service connector: {e}."
            )

        try:
            from zenml.integrations.hyperai.service_connectors.hyperai_service_connector import (  # noqa
                HyperAIServiceConnector,
            )
        except ImportError as e:
            logger.warning(
                f"Could not import HyperAI service connector: {e}."
            )
register_service_connector_type(service_connector_type: ServiceConnectorTypeModel, overwrite: bool = False) -> None

Registers a service connector type.

Parameters:

Name Type Description Default
service_connector_type ServiceConnectorTypeModel

Service connector type.

required
overwrite bool

Whether to overwrite an existing service connector type.

False
Source code in src/zenml/service_connectors/service_connector_registry.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def register_service_connector_type(
    self,
    service_connector_type: ServiceConnectorTypeModel,
    overwrite: bool = False,
) -> None:
    """Registers a service connector type.

    Args:
        service_connector_type: Service connector type.
        overwrite: Whether to overwrite an existing service connector type.
    """
    with self.lock:
        if (
            service_connector_type.connector_type
            not in self.service_connector_types
            or overwrite
        ):
            self.service_connector_types[
                service_connector_type.connector_type
            ] = service_connector_type
            logger.debug(
                "Registered service connector type "
                f"{service_connector_type.connector_type}."
            )
        else:
            logger.debug(
                f"Found existing service connector for type "
                f"{service_connector_type.connector_type}: Skipping "
                "registration."
            )
Functions

service_connector_utils

Utility methods for Service Connectors.

Classes
Functions
get_resources_options_from_resource_model_for_full_stack(connector_details: Union[UUID, ServiceConnectorInfo]) -> ServiceConnectorResourcesInfo

Get the resource options from the resource model for the full stack.

Parameters:

Name Type Description Default
connector_details Union[UUID, ServiceConnectorInfo]

The service connector details (UUID or Info).

required

Returns:

Type Description
ServiceConnectorResourcesInfo

All available service connector resource options.

Source code in src/zenml/service_connectors/service_connector_utils.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
def get_resources_options_from_resource_model_for_full_stack(
    connector_details: Union[UUID, ServiceConnectorInfo],
) -> ServiceConnectorResourcesInfo:
    """Get the resource options from the resource model for the full stack.

    Args:
        connector_details: The service connector details (UUID or Info).

    Returns:
        All available service connector resource options.
    """
    client = Client()
    zen_store = client.zen_store

    if isinstance(connector_details, UUID):
        resource_model = zen_store.verify_service_connector(
            connector_details,
            list_resources=True,
        )
    else:
        resource_model = zen_store.verify_service_connector_config(
            service_connector=ServiceConnectorRequest(
                user=client.active_user.id,
                workspace=client.active_workspace.id,
                name="fake",
                connector_type=connector_details.type,
                auth_method=connector_details.auth_method,
                configuration=connector_details.configuration,
                secrets={},
                labels={},
            ),
            list_resources=True,
        )

    resources = resource_model.resources

    if isinstance(
        resource_model.connector_type,
        str,
    ):
        connector_type = resource_model.connector_type
    else:
        connector_type = resource_model.connector_type.connector_type

    artifact_stores: List[ResourcesInfo] = []
    orchestrators: List[ResourcesInfo] = []
    container_registries: List[ResourcesInfo] = []

    if connector_type == "aws":
        for each in resources:
            if each.resource_ids:
                if each.resource_type == "s3-bucket":
                    artifact_stores.append(
                        _prepare_resource_info(
                            connector_details=connector_details,
                            resource_ids=each.resource_ids,
                            stack_component_type=StackComponentType.ARTIFACT_STORE,
                            flavor="s3",
                            required_configuration={"path": "Path"},
                            use_resource_value_as_fixed_config=True,
                            flavor_display_name="S3 Bucket",
                        )
                    )
                if each.resource_type == "aws-generic":
                    orchestrators.append(
                        _prepare_resource_info(
                            connector_details=connector_details,
                            resource_ids=each.resource_ids,
                            stack_component_type=StackComponentType.ORCHESTRATOR,
                            flavor="sagemaker",
                            required_configuration={
                                "execution_role": "execution role ARN"
                            },
                            flavor_display_name="AWS Sagemaker",
                        )
                    )
                    orchestrators.append(
                        _prepare_resource_info(
                            connector_details=connector_details,
                            resource_ids=each.resource_ids,
                            stack_component_type=StackComponentType.ORCHESTRATOR,
                            flavor="vm_aws",
                            required_configuration={"region": "region"},
                            use_resource_value_as_fixed_config=True,
                            flavor_display_name="Skypilot (EC2)",
                        )
                    )

                if each.resource_type == "kubernetes-cluster":
                    orchestrators.append(
                        _prepare_resource_info(
                            connector_details=connector_details,
                            resource_ids=each.resource_ids,
                            stack_component_type=StackComponentType.ORCHESTRATOR,
                            flavor="kubernetes",
                            required_configuration={},
                            flavor_display_name="Kubernetes",
                        )
                    )
                if each.resource_type == "docker-registry":
                    container_registries.append(
                        _prepare_resource_info(
                            connector_details=connector_details,
                            resource_ids=each.resource_ids,
                            stack_component_type=StackComponentType.CONTAINER_REGISTRY,
                            flavor="aws",
                            required_configuration={"uri": "URI"},
                            use_resource_value_as_fixed_config=True,
                            flavor_display_name="ECR",
                        )
                    )

    elif connector_type == "gcp":
        for each in resources:
            if each.resource_ids:
                if each.resource_type == "gcs-bucket":
                    artifact_stores.append(
                        _prepare_resource_info(
                            connector_details=connector_details,
                            resource_ids=each.resource_ids,
                            stack_component_type=StackComponentType.ARTIFACT_STORE,
                            flavor="gcp",
                            required_configuration={"path": "Path"},
                            use_resource_value_as_fixed_config=True,
                            flavor_display_name="GCS Bucket",
                        )
                    )
                if each.resource_type == "gcp-generic":
                    orchestrators.append(
                        _prepare_resource_info(
                            connector_details=connector_details,
                            resource_ids=each.resource_ids,
                            stack_component_type=StackComponentType.ORCHESTRATOR,
                            flavor="vertex",
                            required_configuration={"location": "region name"},
                            flavor_display_name="Vertex AI",
                        )
                    )
                    orchestrators.append(
                        _prepare_resource_info(
                            connector_details=connector_details,
                            resource_ids=each.resource_ids,
                            stack_component_type=StackComponentType.ORCHESTRATOR,
                            flavor="vm_gcp",
                            required_configuration={"region": "region name"},
                            flavor_display_name="Skypilot (Compute)",
                        )
                    )

                if each.resource_type == "kubernetes-cluster":
                    orchestrators.append(
                        _prepare_resource_info(
                            connector_details=connector_details,
                            resource_ids=each.resource_ids,
                            stack_component_type=StackComponentType.ORCHESTRATOR,
                            flavor="kubernetes",
                            required_configuration={},
                            flavor_display_name="Kubernetes",
                        )
                    )
                if each.resource_type == "docker-registry":
                    container_registries.append(
                        _prepare_resource_info(
                            connector_details=connector_details,
                            resource_ids=each.resource_ids,
                            stack_component_type=StackComponentType.CONTAINER_REGISTRY,
                            flavor="gcp",
                            required_configuration={"uri": "URI"},
                            use_resource_value_as_fixed_config=True,
                            flavor_display_name="GCR",
                        )
                    )

    elif connector_type == "azure":
        for each in resources:
            if each.resource_ids:
                if each.resource_type == "blob-container":
                    artifact_stores.append(
                        _prepare_resource_info(
                            connector_details=connector_details,
                            resource_ids=each.resource_ids,
                            stack_component_type=StackComponentType.ARTIFACT_STORE,
                            flavor="azure",
                            required_configuration={"path": "Path"},
                            use_resource_value_as_fixed_config=True,
                            flavor_display_name="Blob container",
                        )
                    )
                if each.resource_type == "azure-generic":
                    # No native orchestrator ATM
                    orchestrators.append(
                        _prepare_resource_info(
                            connector_details=connector_details,
                            resource_ids=each.resource_ids,
                            stack_component_type=StackComponentType.ORCHESTRATOR,
                            flavor="vm_azure",
                            required_configuration={"region": "region name"},
                            flavor_display_name="Skypilot (VM)",
                        )
                    )
                    orchestrators.append(
                        _prepare_resource_info(
                            connector_details=connector_details,
                            resource_ids=each.resource_ids,
                            stack_component_type=StackComponentType.ORCHESTRATOR,
                            flavor="azureml",
                            required_configuration={
                                "subscription_id": "subscription ID",
                                "resource_group": "resource group",
                                "workspace": "workspace",
                            },
                            flavor_display_name="AzureML",
                        )
                    )

                if each.resource_type == "kubernetes-cluster":
                    orchestrators.append(
                        _prepare_resource_info(
                            connector_details=connector_details,
                            resource_ids=each.resource_ids,
                            stack_component_type=StackComponentType.ORCHESTRATOR,
                            flavor="kubernetes",
                            required_configuration={},
                            flavor_display_name="Kubernetes",
                        )
                    )
                if each.resource_type == "docker-registry":
                    container_registries.append(
                        _prepare_resource_info(
                            connector_details=connector_details,
                            resource_ids=each.resource_ids,
                            stack_component_type=StackComponentType.CONTAINER_REGISTRY,
                            flavor="azure",
                            required_configuration={"uri": "URI"},
                            use_resource_value_as_fixed_config=True,
                            flavor_display_name="ACR",
                        )
                    )

    _raise_specific_cloud_exception_if_needed(
        cloud_provider=connector_type,
        artifact_stores=artifact_stores,
        orchestrators=orchestrators,
        container_registries=container_registries,
    )

    return ServiceConnectorResourcesInfo(
        connector_type=connector_type,
        components_resources_info={
            StackComponentType.ARTIFACT_STORE: artifact_stores,
            StackComponentType.ORCHESTRATOR: orchestrators,
            StackComponentType.CONTAINER_REGISTRY: container_registries,
        },
    )