Secrets Managers
zenml.secrets_managers
special
Initialization for the ZenML secrets manager module.
base_secrets_manager
Base class for ZenML secrets managers.
BaseSecretsManager (StackComponent, ABC)
pydantic-model
Base class for all ZenML secrets managers.
Attributes:
Name | Type | Description |
---|---|---|
scope |
SecretsManagerScope |
The Secrets Manager scope determines how secrets are visible and shared across different Secrets Manager instances:
|
namespace |
Optional[str] |
Optional namespace to use with a |
Source code in zenml/secrets_managers/base_secrets_manager.py
class BaseSecretsManager(StackComponent, ABC):
"""Base class for all ZenML secrets managers.
Attributes:
scope: The Secrets Manager scope determines how secrets are visible and
shared across different Secrets Manager instances:
* global: secrets are shared across all Secrets Manager instances
that connect to the same backend and have a global scope.
* component: secrets are not shared outside this Secrets Manager
instance.
* namespace: secrets are shared only by Secrets Manager instances
that connect to the same backend *and* have the same `namespace`
value configured.
* none: only used to preserve backwards compatibility with
old Secrets Manager instances that do not yet understand the
concept of secrets scoping. Will be deprecated and removed in
future versions.
namespace: Optional namespace to use with a `namespace` scoped Secrets
Manager.
"""
# Class configuration
TYPE: ClassVar[StackComponentType] = StackComponentType.SECRETS_MANAGER
FLAVOR: ClassVar[str]
SUPPORTS_SCOPING: ClassVar[bool] = False
scope: SecretsManagerScope = SecretsManagerScope.COMPONENT
namespace: Optional[str] = None
def __init__(self, **kwargs: Any) -> None:
"""Ensures that no attributes are specified as a secret reference.
Args:
**kwargs: Arguments to initialize this secrets manager.
Raises:
ValueError: If any of the secrets manager attributes are specified
as a secret reference.
"""
for key, value in kwargs.items():
if secret_utils.is_secret_reference(value):
raise ValueError(
"Using secret references to specify attributes on a "
"secrets manager is not allowed. Please specify the "
f"real value for the attribute {key}."
)
super().__init__(**kwargs)
@root_validator(pre=True)
def scope_initializer(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Pydantic root_validator for the scope.
This root validator is used for backwards compatibility purposes. It
ensures that existing Secrets Managers continue to use no scope for
secrets while new instances default to using the component scope.
Args:
values: Values passed to the object constructor
Returns:
Values passed to the object constructor
Raises:
ValueError: If the scope value is not valid.
"""
scope = values.get("scope")
if scope:
# fail if the user tries to explicitly use a scope with a
# Secrets Manager that doesn't support scoping
if scope != SecretsManagerScope.NONE and not cls.SUPPORTS_SCOPING:
raise ValueError(
f"The {cls.FLAVOR} Secrets Manager does not support "
f"scoping. You can only use a `none` scope value."
)
elif not cls.SUPPORTS_SCOPING:
# disable scoping by default for Secrets Managers that don't
# support scoping
values["scope"] = SecretsManagerScope.NONE
elif "uuid" in values:
# this is an existing Secrets Manager instance without a scope
# explicitly set (i.e. a legacy Secrets Manager that was already
# in operation before scoping was introduced). Continue to use
# unscoped secrets.
values["scope"] = SecretsManagerScope.NONE
# warn if the user tries to explicitly disable scoping for a
# Secrets Manager that does support scoping
if scope == SecretsManagerScope.NONE and cls.SUPPORTS_SCOPING:
logger.warning(
f"Unscoped support for the {cls.FLAVOR} Secrets "
f"Manager is deprecated and will be removed in a future "
f"release. You should use the `global` scope instead."
)
return values
@root_validator
def namespace_validator(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Pydantic root validator for the namespace.
For a namespace scoped Secret Manager, the namespace is required.
Args:
values: Values passed to the object constructor
Returns:
Values passed to the object constructor
Raises:
ValueError: If a namespace is not configured for a namespace scoped
Secret Manager.
"""
scope = cast(SecretsManagerScope, values.get("scope"))
if scope == SecretsManagerScope.NAMESPACE and not values.get(
"namespace"
):
raise ValueError(
"A `namespace` value is required for a namespace scoped "
"Secrets Manager."
)
cls._validate_scope(scope, values.get("namespace"))
return values
@classmethod
def _validate_scope(
cls,
scope: SecretsManagerScope,
namespace: Optional[str],
) -> None:
"""Validate the scope and namespace value.
Subclasses should override this method to implement their own scope
and namespace validation logic (e.g. raise an exception if a scope is
not supported or if a namespace has an invalid value).
Args:
scope: Scope value.
namespace: Namespace value.
"""
...
def _get_scope_path(self) -> List[str]:
"""Get the secret path for the current scope.
This utility method can be used with Secrets Managers that can map
the concept of a scope to a hierarchical path.
Returns:
the secret scope path
"""
if self.scope == SecretsManagerScope.NONE:
return []
path = [ZENML_SCOPE_PATH_PREFIX, self.scope]
if self.scope == SecretsManagerScope.COMPONENT:
path.append(str(self.uuid))
elif self.scope == SecretsManagerScope.NAMESPACE and self.namespace:
path.append(self.namespace)
return path
def _get_scoped_secret_path(self, secret_name: str) -> List[str]:
"""Convert a ZenML secret name into a scoped secret path.
This utility method can be used with Secrets Managers that can map
the concept of a scope to a hierarchical path.
Args:
secret_name: the name of the secret
Returns:
The scoped secret path
"""
path = self._get_scope_path()
path.append(secret_name)
return path
def _get_secret_name_from_path(
self, secret_path: List[str]
) -> Optional[str]:
"""Extract the name of a ZenML secret from a scoped secret path.
This utility method can be used with Secrets Managers that can map
the concept of a scope to a hierarchical path.
Args:
secret_path: the full scoped secret path including the secret name
Returns:
The ZenML secret name or None, if the input secret path does not
belong to the current scope.
"""
if not len(secret_path):
return None
secret_name = secret_path[-1]
secret_path = secret_path[:-1]
if not secret_name or secret_path != self._get_scope_path():
# secret name is not in the current scope
return None
return secret_name
def _get_scoped_secret_name_prefix(
self,
separator: str = ZENML_DEFAULT_SECRET_SCOPE_PATH_SEPARATOR,
) -> str:
"""Convert the ZenML secret scope into a scoped secret name prefix.
This utility method can be used with Secrets Managers that can map
the concept of a scope to a hierarchical path to compose a scoped
secret name prefix from the scope path.
Args:
separator: the path separator to use when constructing a scoped
secret prefix from a scope path
Returns:
The scoped secret name prefix
"""
return separator.join(self._get_scope_path()) + separator
def _get_scoped_secret_name(
self,
name: str,
separator: str = ZENML_DEFAULT_SECRET_SCOPE_PATH_SEPARATOR,
) -> str:
"""Convert a ZenML secret name into a scoped secret name.
This utility method can be used with Secrets Managers that can map
the concept of a scope to a hierarchical path to compose a scoped
secret name that includes the scope path from a ZenML secret name.
Args:
name: the name of the secret
separator: the path separator to use when constructing a scoped
secret name from a scope path
Returns:
The scoped secret name
"""
return separator.join(self._get_scoped_secret_path(name))
def _get_unscoped_secret_name(
self,
name: str,
separator: str = ZENML_DEFAULT_SECRET_SCOPE_PATH_SEPARATOR,
) -> Optional[str]:
"""Extract the name of a ZenML secret from a scoped secret name.
This utility method can be used with Secrets Managers that can map
the concept of a scope to a hierarchical path to extract the original
ZenML secret name from a scoped secret name that includes the scope
path.
Args:
name: the name of the scoped secret
separator: the path separator to use when constructing a scoped
secret name from a scope path
Returns:
The ZenML secret name or None, if the input secret name does not
belong to the current scope.
"""
return self._get_secret_name_from_path(name.split(separator))
def _get_secret_scope_metadata(
self, secret_name: Optional[str] = None
) -> Dict[str, str]:
"""Get a dictionary with metadata uniquely identifying one or more scoped secrets.
This utility method can be used with Secrets Managers that can
associate metadata (e.g. tags, labels) with a secret. The scope related
metadata can be used as a filter criteria when running queries against
the backend to retrieve all the secrets within the current scope or
to retrieve a named secret within the current scope (if the
`secret_name` is supplied).
Args:
secret_name: Optional secret name for which to get the scope
metadata.
Returns:
Dictionary with scope metadata information uniquely identifying the
secret.
"""
if self.scope == SecretsManagerScope.NONE:
# unscoped secrets do not have tags, for backwards compatibility
# purposes
return {}
metadata = {
"zenml_scope": self.scope.value,
}
if secret_name:
metadata[ZENML_SECRET_NAME_LABEL] = secret_name
if self.scope == SecretsManagerScope.NAMESPACE and self.namespace:
metadata["zenml_namespace"] = self.namespace
if self.scope == SecretsManagerScope.COMPONENT:
metadata["zenml_component_uuid"] = str(self.uuid)
return metadata
def _get_secret_metadata(self, secret: BaseSecretSchema) -> Dict[str, str]:
"""Get a dictionary with metadata describing a secret.
This is utility method can be used with Secrets Managers that can
associate metadata (e.g. tags, labels) with a secret. The metadata
can be used to relay more information about the secret in the Secrets
Manager backend. Part of the metadata can also be used as a filter
criteria when running queries against the backend to retrieve all the
secrets within the current scope or to retrieve a named secret (see
`_get_secret_scope_metadata`).
Args:
secret: The secret for which to get the metadata.
Returns:
Dictionary with metadata information describing the secret.
"""
if self.scope == SecretsManagerScope.NONE:
# unscoped secrets do not have tags, for backwards compatibility
# purposes
return {}
scope_metadata = self._get_secret_scope_metadata(secret.name)
# include additional metadata not necessarily related to the secret
# scope
scope_metadata.update(
{
"zenml_component_name": self.name,
"zenml_component_uuid": str(self.uuid),
"zenml_secret_schema": secret.TYPE,
}
)
return scope_metadata
@abstractmethod
def register_secret(self, secret: BaseSecretSchema) -> None:
"""Registers a new secret.
The implementation should throw a `SecretExistsError` exception if the
secret already exists.
Args:
secret: The secret to register.
"""
@abstractmethod
def get_secret(self, secret_name: str) -> BaseSecretSchema:
"""Gets the value of a secret.
The implementation should throw a `KeyError` exception if the
secret doesn't exist.
Args:
secret_name: The name of the secret to get.
"""
@abstractmethod
def get_all_secret_keys(self) -> List[str]:
"""Get all secret keys."""
@abstractmethod
def update_secret(self, secret: BaseSecretSchema) -> None:
"""Update an existing secret.
The implementation should throw a `KeyError` exception if the
secret doesn't exist.
Args:
secret: The secret to update.
"""
@abstractmethod
def delete_secret(self, secret_name: str) -> None:
"""Delete an existing secret.
The implementation should throw a `KeyError` exception if the
secret doesn't exist.
Args:
secret_name: The name of the secret to delete.
"""
@abstractmethod
def delete_all_secrets(self) -> None:
"""Delete all existing secrets."""
__init__(self, **kwargs)
special
Ensures that no attributes are specified as a secret reference.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs |
Any |
Arguments to initialize this secrets manager. |
{} |
Exceptions:
Type | Description |
---|---|
ValueError |
If any of the secrets manager attributes are specified as a secret reference. |
Source code in zenml/secrets_managers/base_secrets_manager.py
def __init__(self, **kwargs: Any) -> None:
"""Ensures that no attributes are specified as a secret reference.
Args:
**kwargs: Arguments to initialize this secrets manager.
Raises:
ValueError: If any of the secrets manager attributes are specified
as a secret reference.
"""
for key, value in kwargs.items():
if secret_utils.is_secret_reference(value):
raise ValueError(
"Using secret references to specify attributes on a "
"secrets manager is not allowed. Please specify the "
f"real value for the attribute {key}."
)
super().__init__(**kwargs)
delete_all_secrets(self)
Delete all existing secrets.
Source code in zenml/secrets_managers/base_secrets_manager.py
@abstractmethod
def delete_all_secrets(self) -> None:
"""Delete all existing secrets."""
delete_secret(self, secret_name)
Delete an existing secret.
The implementation should throw a KeyError
exception if the
secret doesn't exist.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_name |
str |
The name of the secret to delete. |
required |
Source code in zenml/secrets_managers/base_secrets_manager.py
@abstractmethod
def delete_secret(self, secret_name: str) -> None:
"""Delete an existing secret.
The implementation should throw a `KeyError` exception if the
secret doesn't exist.
Args:
secret_name: The name of the secret to delete.
"""
get_all_secret_keys(self)
Get all secret keys.
Source code in zenml/secrets_managers/base_secrets_manager.py
@abstractmethod
def get_all_secret_keys(self) -> List[str]:
"""Get all secret keys."""
get_secret(self, secret_name)
Gets the value of a secret.
The implementation should throw a KeyError
exception if the
secret doesn't exist.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_name |
str |
The name of the secret to get. |
required |
Source code in zenml/secrets_managers/base_secrets_manager.py
@abstractmethod
def get_secret(self, secret_name: str) -> BaseSecretSchema:
"""Gets the value of a secret.
The implementation should throw a `KeyError` exception if the
secret doesn't exist.
Args:
secret_name: The name of the secret to get.
"""
namespace_validator(values)
classmethod
Pydantic root validator for the namespace.
For a namespace scoped Secret Manager, the namespace is required.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
values |
Dict[str, Any] |
Values passed to the object constructor |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
Values passed to the object constructor |
Exceptions:
Type | Description |
---|---|
ValueError |
If a namespace is not configured for a namespace scoped Secret Manager. |
Source code in zenml/secrets_managers/base_secrets_manager.py
@root_validator
def namespace_validator(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Pydantic root validator for the namespace.
For a namespace scoped Secret Manager, the namespace is required.
Args:
values: Values passed to the object constructor
Returns:
Values passed to the object constructor
Raises:
ValueError: If a namespace is not configured for a namespace scoped
Secret Manager.
"""
scope = cast(SecretsManagerScope, values.get("scope"))
if scope == SecretsManagerScope.NAMESPACE and not values.get(
"namespace"
):
raise ValueError(
"A `namespace` value is required for a namespace scoped "
"Secrets Manager."
)
cls._validate_scope(scope, values.get("namespace"))
return values
register_secret(self, secret)
Registers a new secret.
The implementation should throw a SecretExistsError
exception if the
secret already exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
The secret to register. |
required |
Source code in zenml/secrets_managers/base_secrets_manager.py
@abstractmethod
def register_secret(self, secret: BaseSecretSchema) -> None:
"""Registers a new secret.
The implementation should throw a `SecretExistsError` exception if the
secret already exists.
Args:
secret: The secret to register.
"""
scope_initializer(values)
classmethod
Pydantic root_validator for the scope.
This root validator is used for backwards compatibility purposes. It ensures that existing Secrets Managers continue to use no scope for secrets while new instances default to using the component scope.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
values |
Dict[str, Any] |
Values passed to the object constructor |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
Values passed to the object constructor |
Exceptions:
Type | Description |
---|---|
ValueError |
If the scope value is not valid. |
Source code in zenml/secrets_managers/base_secrets_manager.py
@root_validator(pre=True)
def scope_initializer(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Pydantic root_validator for the scope.
This root validator is used for backwards compatibility purposes. It
ensures that existing Secrets Managers continue to use no scope for
secrets while new instances default to using the component scope.
Args:
values: Values passed to the object constructor
Returns:
Values passed to the object constructor
Raises:
ValueError: If the scope value is not valid.
"""
scope = values.get("scope")
if scope:
# fail if the user tries to explicitly use a scope with a
# Secrets Manager that doesn't support scoping
if scope != SecretsManagerScope.NONE and not cls.SUPPORTS_SCOPING:
raise ValueError(
f"The {cls.FLAVOR} Secrets Manager does not support "
f"scoping. You can only use a `none` scope value."
)
elif not cls.SUPPORTS_SCOPING:
# disable scoping by default for Secrets Managers that don't
# support scoping
values["scope"] = SecretsManagerScope.NONE
elif "uuid" in values:
# this is an existing Secrets Manager instance without a scope
# explicitly set (i.e. a legacy Secrets Manager that was already
# in operation before scoping was introduced). Continue to use
# unscoped secrets.
values["scope"] = SecretsManagerScope.NONE
# warn if the user tries to explicitly disable scoping for a
# Secrets Manager that does support scoping
if scope == SecretsManagerScope.NONE and cls.SUPPORTS_SCOPING:
logger.warning(
f"Unscoped support for the {cls.FLAVOR} Secrets "
f"Manager is deprecated and will be removed in a future "
f"release. You should use the `global` scope instead."
)
return values
update_secret(self, secret)
Update an existing secret.
The implementation should throw a KeyError
exception if the
secret doesn't exist.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
The secret to update. |
required |
Source code in zenml/secrets_managers/base_secrets_manager.py
@abstractmethod
def update_secret(self, secret: BaseSecretSchema) -> None:
"""Update an existing secret.
The implementation should throw a `KeyError` exception if the
secret doesn't exist.
Args:
secret: The secret to update.
"""
SecretsManagerScope (StrEnum)
Secrets Manager scope enum.
Source code in zenml/secrets_managers/base_secrets_manager.py
class SecretsManagerScope(StrEnum):
"""Secrets Manager scope enum."""
NONE = "none"
GLOBAL = "global"
COMPONENT = "component"
NAMESPACE = "namespace"
local
special
Initialization of the ZenML local secrets manager.
local_secrets_manager
Implementation of the ZenML local secrets manager.
LocalSecretsManager (BaseSecretsManager)
pydantic-model
Class for ZenML local file-based secret manager.
Source code in zenml/secrets_managers/local/local_secrets_manager.py
class LocalSecretsManager(BaseSecretsManager):
"""Class for ZenML local file-based secret manager."""
secrets_file: str = ""
# Class configuration
FLAVOR: ClassVar[str] = "local"
@root_validator(skip_on_failure=True)
def set_secrets_file(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Sets the secrets_file attribute value according to the component UUID.
Args:
values: The values to validate.
Returns:
The validated values.
"""
if values.get("secrets_file"):
return values
# not likely to happen, due to Pydantic validation, but mypy complains
assert "uuid" in values
values["secrets_file"] = cls.get_secret_store_path(values["uuid"])
return values
@staticmethod
def get_secret_store_path(uuid: uuid.UUID) -> str:
"""Get the path to the secret store.
Args:
uuid: The UUID of the secret store.
Returns:
The path to the secret store.
"""
return os.path.join(
get_global_config_directory(),
LOCAL_STORES_DIRECTORY_NAME,
str(uuid),
LOCAL_SECRETS_FILENAME,
)
@property
def local_path(self) -> str:
"""Path to the local directory where the secrets are stored.
Returns:
The path to the local directory where the secrets are stored.
"""
return str(Path(self.secrets_file).parent)
def _create_secrets_file__if_not_exists(self) -> None:
"""Makes sure the secrets yaml file exists."""
create_file_if_not_exists(self.secrets_file)
def _verify_secret_key_exists(self, secret_name: str) -> bool:
"""Checks if a secret key exists.
Args:
secret_name: The name of the secret key.
Returns:
True if the secret key exists, False otherwise.
"""
self._create_secrets_file__if_not_exists()
secrets_store_items = yaml_utils.read_yaml(self.secrets_file)
try:
return secret_name in secrets_store_items
except TypeError:
return False
def _get_all_secrets(self) -> Dict[str, Dict[str, str]]:
"""Gets all secrets.
Returns:
A dictionary containing all secrets.
"""
self._create_secrets_file__if_not_exists()
return yaml_utils.read_yaml(self.secrets_file) or {}
def register_secret(self, secret: BaseSecretSchema) -> None:
"""Registers a new secret.
Args:
secret: The secret to register.
Raises:
SecretExistsError: If the secret already exists.
"""
self._create_secrets_file__if_not_exists()
if self._verify_secret_key_exists(secret_name=secret.name):
raise SecretExistsError(f"Secret `{secret.name}` already exists.")
encoded_secret = encode_secret(secret)
secrets_store_items = self._get_all_secrets()
secrets_store_items[secret.name] = encoded_secret
yaml_utils.append_yaml(self.secrets_file, secrets_store_items)
def get_secret(self, secret_name: str) -> BaseSecretSchema:
"""Gets a specific secret.
Args:
secret_name: The name of the secret.
Returns:
The secret.
Raises:
KeyError: If the secret does not exist.
"""
self._create_secrets_file__if_not_exists()
secret_store_items = self._get_all_secrets()
if not self._verify_secret_key_exists(secret_name=secret_name):
raise KeyError(f"Secret `{secret_name}` does not exists.")
secret_dict = secret_store_items[secret_name]
decoded_secret_dict, zenml_schema_name = decode_secret_dict(secret_dict)
decoded_secret_dict["name"] = secret_name
secret_schema = SecretSchemaClassRegistry.get_class(
secret_schema=zenml_schema_name
)
return secret_schema(**decoded_secret_dict)
def get_all_secret_keys(self) -> List[str]:
"""Get all secret keys.
Returns:
A list of all secret keys.
"""
self._create_secrets_file__if_not_exists()
secrets_store_items = self._get_all_secrets()
return list(secrets_store_items.keys())
def update_secret(self, secret: BaseSecretSchema) -> None:
"""Update an existing secret.
Args:
secret: The secret to update.
Raises:
KeyError: If the secret does not exist.
"""
self._create_secrets_file__if_not_exists()
if not self._verify_secret_key_exists(secret_name=secret.name):
raise KeyError(f"Secret `{secret.name}` did not exist.")
encoded_secret = encode_secret(secret)
secrets_store_items = self._get_all_secrets()
secrets_store_items[secret.name] = encoded_secret
yaml_utils.append_yaml(self.secrets_file, secrets_store_items)
def delete_secret(self, secret_name: str) -> None:
"""Delete an existing secret.
Args:
secret_name: The name of the secret to delete.
Raises:
KeyError: If the secret does not exist.
"""
self._create_secrets_file__if_not_exists()
if not self._verify_secret_key_exists(secret_name=secret_name):
raise KeyError(f"Secret `{secret_name}` does not exists.")
secrets_store_items = self._get_all_secrets()
try:
secrets_store_items.pop(secret_name)
yaml_utils.write_yaml(self.secrets_file, secrets_store_items)
except KeyError:
error(f"Secret {secret_name} does not exist.")
def delete_all_secrets(self) -> None:
"""Delete all existing secrets."""
self._create_secrets_file__if_not_exists()
remove(self.secrets_file)
local_path: str
property
readonly
Path to the local directory where the secrets are stored.
Returns:
Type | Description |
---|---|
str |
The path to the local directory where the secrets are stored. |
delete_all_secrets(self)
Delete all existing secrets.
Source code in zenml/secrets_managers/local/local_secrets_manager.py
def delete_all_secrets(self) -> None:
"""Delete all existing secrets."""
self._create_secrets_file__if_not_exists()
remove(self.secrets_file)
delete_secret(self, secret_name)
Delete an existing secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_name |
str |
The name of the secret to delete. |
required |
Exceptions:
Type | Description |
---|---|
KeyError |
If the secret does not exist. |
Source code in zenml/secrets_managers/local/local_secrets_manager.py
def delete_secret(self, secret_name: str) -> None:
"""Delete an existing secret.
Args:
secret_name: The name of the secret to delete.
Raises:
KeyError: If the secret does not exist.
"""
self._create_secrets_file__if_not_exists()
if not self._verify_secret_key_exists(secret_name=secret_name):
raise KeyError(f"Secret `{secret_name}` does not exists.")
secrets_store_items = self._get_all_secrets()
try:
secrets_store_items.pop(secret_name)
yaml_utils.write_yaml(self.secrets_file, secrets_store_items)
except KeyError:
error(f"Secret {secret_name} does not exist.")
get_all_secret_keys(self)
Get all secret keys.
Returns:
Type | Description |
---|---|
List[str] |
A list of all secret keys. |
Source code in zenml/secrets_managers/local/local_secrets_manager.py
def get_all_secret_keys(self) -> List[str]:
"""Get all secret keys.
Returns:
A list of all secret keys.
"""
self._create_secrets_file__if_not_exists()
secrets_store_items = self._get_all_secrets()
return list(secrets_store_items.keys())
get_secret(self, secret_name)
Gets a specific secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_name |
str |
The name of the secret. |
required |
Returns:
Type | Description |
---|---|
BaseSecretSchema |
The secret. |
Exceptions:
Type | Description |
---|---|
KeyError |
If the secret does not exist. |
Source code in zenml/secrets_managers/local/local_secrets_manager.py
def get_secret(self, secret_name: str) -> BaseSecretSchema:
"""Gets a specific secret.
Args:
secret_name: The name of the secret.
Returns:
The secret.
Raises:
KeyError: If the secret does not exist.
"""
self._create_secrets_file__if_not_exists()
secret_store_items = self._get_all_secrets()
if not self._verify_secret_key_exists(secret_name=secret_name):
raise KeyError(f"Secret `{secret_name}` does not exists.")
secret_dict = secret_store_items[secret_name]
decoded_secret_dict, zenml_schema_name = decode_secret_dict(secret_dict)
decoded_secret_dict["name"] = secret_name
secret_schema = SecretSchemaClassRegistry.get_class(
secret_schema=zenml_schema_name
)
return secret_schema(**decoded_secret_dict)
get_secret_store_path(uuid)
staticmethod
Get the path to the secret store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
The UUID of the secret store. |
required |
Returns:
Type | Description |
---|---|
str |
The path to the secret store. |
Source code in zenml/secrets_managers/local/local_secrets_manager.py
@staticmethod
def get_secret_store_path(uuid: uuid.UUID) -> str:
"""Get the path to the secret store.
Args:
uuid: The UUID of the secret store.
Returns:
The path to the secret store.
"""
return os.path.join(
get_global_config_directory(),
LOCAL_STORES_DIRECTORY_NAME,
str(uuid),
LOCAL_SECRETS_FILENAME,
)
register_secret(self, secret)
Registers a new secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
The secret to register. |
required |
Exceptions:
Type | Description |
---|---|
SecretExistsError |
If the secret already exists. |
Source code in zenml/secrets_managers/local/local_secrets_manager.py
def register_secret(self, secret: BaseSecretSchema) -> None:
"""Registers a new secret.
Args:
secret: The secret to register.
Raises:
SecretExistsError: If the secret already exists.
"""
self._create_secrets_file__if_not_exists()
if self._verify_secret_key_exists(secret_name=secret.name):
raise SecretExistsError(f"Secret `{secret.name}` already exists.")
encoded_secret = encode_secret(secret)
secrets_store_items = self._get_all_secrets()
secrets_store_items[secret.name] = encoded_secret
yaml_utils.append_yaml(self.secrets_file, secrets_store_items)
set_secrets_file(values)
classmethod
Sets the secrets_file attribute value according to the component UUID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
values |
Dict[str, Any] |
The values to validate. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The validated values. |
Source code in zenml/secrets_managers/local/local_secrets_manager.py
@root_validator(skip_on_failure=True)
def set_secrets_file(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Sets the secrets_file attribute value according to the component UUID.
Args:
values: The values to validate.
Returns:
The validated values.
"""
if values.get("secrets_file"):
return values
# not likely to happen, due to Pydantic validation, but mypy complains
assert "uuid" in values
values["secrets_file"] = cls.get_secret_store_path(values["uuid"])
return values
update_secret(self, secret)
Update an existing secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
The secret to update. |
required |
Exceptions:
Type | Description |
---|---|
KeyError |
If the secret does not exist. |
Source code in zenml/secrets_managers/local/local_secrets_manager.py
def update_secret(self, secret: BaseSecretSchema) -> None:
"""Update an existing secret.
Args:
secret: The secret to update.
Raises:
KeyError: If the secret does not exist.
"""
self._create_secrets_file__if_not_exists()
if not self._verify_secret_key_exists(secret_name=secret.name):
raise KeyError(f"Secret `{secret.name}` did not exist.")
encoded_secret = encode_secret(secret)
secrets_store_items = self._get_all_secrets()
secrets_store_items[secret.name] = encoded_secret
yaml_utils.append_yaml(self.secrets_file, secrets_store_items)
utils
Utility functions for the ZenML secrets manager module.
decode_secret_dict(secret_dict)
Base64 decode a Secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_dict |
Dict[str, str] |
dict containing key-value pairs to decode |
required |
Returns:
Type | Description |
---|---|
Tuple[Dict[str, str], str] |
Decoded secret Dict containing key-value pairs |
Source code in zenml/secrets_managers/utils.py
def decode_secret_dict(
secret_dict: Dict[str, str]
) -> Tuple[Dict[str, str], str]:
"""Base64 decode a Secret.
Args:
secret_dict: dict containing key-value pairs to decode
Returns:
Decoded secret Dict containing key-value pairs
"""
zenml_schema_name = secret_dict.pop(ZENML_SCHEMA_NAME)
decoded_secret = {k: decode_string(v) for k, v in secret_dict.items()}
return decoded_secret, zenml_schema_name
decode_string(string)
Base64 decode a string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
string |
str |
String to decode |
required |
Returns:
Type | Description |
---|---|
str |
Decoded string |
Source code in zenml/secrets_managers/utils.py
def decode_string(string: str) -> str:
"""Base64 decode a string.
Args:
string: String to decode
Returns:
Decoded string
"""
decoded_bytes = base64.b64decode(string)
return str(decoded_bytes, "utf-8")
encode_secret(secret)
Base64 encode all values within a secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
Secret containing key-value pairs |
required |
Returns:
Type | Description |
---|---|
Dict[str, str] |
Encoded secret Dict containing key-value pairs |
Source code in zenml/secrets_managers/utils.py
def encode_secret(secret: BaseSecretSchema) -> Dict[str, str]:
"""Base64 encode all values within a secret.
Args:
secret: Secret containing key-value pairs
Returns:
Encoded secret Dict containing key-value pairs
"""
encoded_secret = {
k: encode_string(str(v))
for k, v in secret.content.items()
if v is not None
}
encoded_secret[ZENML_SCHEMA_NAME] = secret.TYPE
return encoded_secret
encode_string(string)
Base64 encode a string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
string |
str |
String to encode |
required |
Returns:
Type | Description |
---|---|
str |
Encoded string |
Source code in zenml/secrets_managers/utils.py
def encode_string(string: str) -> str:
"""Base64 encode a string.
Args:
string: String to encode
Returns:
Encoded string
"""
encoded_bytes = base64.b64encode(string.encode("utf-8"))
return str(encoded_bytes, "utf-8")
secret_from_dict(secret_dict, secret_name='', decode=False)
Converts a dictionary secret representation into a secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_dict |
Dict[str, Any] |
a dictionary representation of a secret |
required |
secret_name |
str |
optional name for the secret, defaults to empty string |
'' |
decode |
bool |
if true, decodes the secret values using base64 |
False |
Returns:
Type | Description |
---|---|
BaseSecretSchema |
A secret instance containing all key-value pairs loaded from the JSON representation and of the ZenML schema type indicated in the JSON. |
Source code in zenml/secrets_managers/utils.py
def secret_from_dict(
secret_dict: Dict[str, Any], secret_name: str = "", decode: bool = False
) -> BaseSecretSchema:
"""Converts a dictionary secret representation into a secret.
Args:
secret_dict: a dictionary representation of a secret
secret_name: optional name for the secret, defaults to empty string
decode: if true, decodes the secret values using base64
Returns:
A secret instance containing all key-value pairs loaded from the JSON
representation and of the ZenML schema type indicated in the JSON.
"""
from zenml.secret.secret_schema_class_registry import (
SecretSchemaClassRegistry,
)
secret_contents = secret_dict.copy()
if decode:
secret_contents, zenml_schema_name = decode_secret_dict(secret_contents)
else:
zenml_schema_name = secret_contents.pop(ZENML_SCHEMA_NAME)
secret_contents["name"] = secret_name
secret_schema = SecretSchemaClassRegistry.get_class(
secret_schema=zenml_schema_name
)
return secret_schema(**secret_contents)
secret_to_dict(secret, encode=False)
Converts a secret to a dict representation with the schema.
This includes the schema type in the secret's JSON representation, so that the correct SecretSchema can be retrieved when the secret is loaded.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
a subclass of the BaseSecretSchema class |
required |
encode |
bool |
if true, encodes the secret values using base64 encoding |
False |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
A dict representation containing all key-value pairs and the ZenML schema type. |
Source code in zenml/secrets_managers/utils.py
def secret_to_dict(
secret: BaseSecretSchema, encode: bool = False
) -> Dict[str, Any]:
"""Converts a secret to a dict representation with the schema.
This includes the schema type in the secret's JSON representation, so that
the correct SecretSchema can be retrieved when the secret is loaded.
Args:
secret: a subclass of the BaseSecretSchema class
encode: if true, encodes the secret values using base64 encoding
Returns:
A dict representation containing all key-value pairs and the ZenML
schema type.
"""
if encode:
secret_contents = encode_secret(secret)
else:
secret_contents = secret.content
secret_contents[ZENML_SCHEMA_NAME] = secret.TYPE
return secret_contents