Vault
zenml.integrations.vault
special
Initialization for the Vault Secrets Manager integration.
The Vault secrets manager integration submodule provides a way to access the HashiCorp Vault secrets manager from within your ZenML pipeline runs.
VaultSecretsManagerIntegration (Integration)
Definition of HashiCorp Vault integration with ZenML.
Source code in zenml/integrations/vault/__init__.py
class VaultSecretsManagerIntegration(Integration):
"""Definition of HashiCorp Vault integration with ZenML."""
NAME = VAULT
REQUIREMENTS = ["hvac>=0.11.2"]
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Vault integration.
Returns:
List of stack component flavors.
"""
from zenml.integrations.vault.flavors import VaultSecretsManagerFlavor
return [VaultSecretsManagerFlavor]
flavors()
classmethod
Declare the stack component flavors for the Vault integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors. |
Source code in zenml/integrations/vault/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Vault integration.
Returns:
List of stack component flavors.
"""
from zenml.integrations.vault.flavors import VaultSecretsManagerFlavor
return [VaultSecretsManagerFlavor]
flavors
special
HashiCorp Vault integration flavors.
vault_secrets_manager_flavor
HashiCorp Vault secrets manager flavor.
VaultSecretsManagerConfig (BaseSecretsManagerConfig)
pydantic-model
Configuration for the Vault Secrets Manager.
Attributes:
Name | Type | Description |
---|---|---|
url |
str |
The url of the Vault server. |
token |
str |
The token to use to authenticate with Vault. |
cert |
Optional[str] |
The path to the certificate to use to authenticate with Vault. |
verify |
Optional[str] |
Whether to verify the certificate or not. |
mount_point |
str |
The mount point of the secrets manager. |
Source code in zenml/integrations/vault/flavors/vault_secrets_manager_flavor.py
class VaultSecretsManagerConfig(BaseSecretsManagerConfig):
"""Configuration for the Vault Secrets Manager.
Attributes:
url: The url of the Vault server.
token: The token to use to authenticate with Vault.
cert: The path to the certificate to use to authenticate with Vault.
verify: Whether to verify the certificate or not.
mount_point: The mount point of the secrets manager.
"""
SUPPORTS_SCOPING: ClassVar[bool] = True
url: str
token: str
mount_point: str
cert: Optional[str] # TODO: unused
verify: Optional[str] # TODO: unused
@classmethod
def _validate_scope(
cls,
scope: SecretsManagerScope,
namespace: Optional[str],
) -> None:
"""Validate the scope and namespace value.
Args:
scope: Scope value.
namespace: Optional namespace value.
"""
if namespace:
validate_vault_secret_name_or_namespace(namespace)
VaultSecretsManagerFlavor (BaseSecretsManagerFlavor)
Class for the VaultSecretsManagerFlavor
.
Source code in zenml/integrations/vault/flavors/vault_secrets_manager_flavor.py
class VaultSecretsManagerFlavor(BaseSecretsManagerFlavor):
"""Class for the `VaultSecretsManagerFlavor`."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return VAULT_SECRETS_MANAGER_FLAVOR
@property
def config_class(self) -> Type[VaultSecretsManagerConfig]:
"""Returns `VaultSecretsManagerConfig` config class.
Returns:
The config class.
"""
return VaultSecretsManagerConfig
@property
def implementation_class(self) -> Type["VaultSecretsManager"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.vault.secrets_manager import (
VaultSecretsManager,
)
return VaultSecretsManager
config_class: Type[zenml.integrations.vault.flavors.vault_secrets_manager_flavor.VaultSecretsManagerConfig]
property
readonly
Returns VaultSecretsManagerConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.vault.flavors.vault_secrets_manager_flavor.VaultSecretsManagerConfig] |
The config class. |
implementation_class: Type[VaultSecretsManager]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[VaultSecretsManager] |
The implementation class. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
validate_vault_secret_name_or_namespace(name)
Validate a secret name or namespace.
For compatibility across secret managers the secret names should contain
only alphanumeric characters and the characters /_+=.@-. The /
character is only used internally to delimit scopes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
the secret name or namespace |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
if the secret name or namespace is invalid |
Source code in zenml/integrations/vault/flavors/vault_secrets_manager_flavor.py
def validate_vault_secret_name_or_namespace(name: str) -> None:
"""Validate a secret name or namespace.
For compatibility across secret managers the secret names should contain
only alphanumeric characters and the characters /_+=.@-. The `/`
character is only used internally to delimit scopes.
Args:
name: the secret name or namespace
Raises:
ValueError: if the secret name or namespace is invalid
"""
if not re.fullmatch(r"[a-zA-Z0-9_+=\.@\-]*", name):
raise ValueError(
f"Invalid secret name or namespace '{name}'. Must contain "
f"only alphanumeric characters and the characters _+=.@-."
)
secrets_manager
special
HashiCorp Vault Secrets Manager.
vault_secrets_manager
Implementation of the HashiCorp Vault Secrets Manager integration.
VaultSecretsManager (BaseSecretsManager)
Class to interact with the Vault secrets manager - Key/value Engine.
Source code in zenml/integrations/vault/secrets_manager/vault_secrets_manager.py
class VaultSecretsManager(BaseSecretsManager):
"""Class to interact with the Vault secrets manager - Key/value Engine."""
CLIENT: ClassVar[Any] = None
@property
def config(self) -> VaultSecretsManagerConfig:
"""Returns the `VaultSecretsManagerConfig` config.
Returns:
The configuration.
"""
return cast(VaultSecretsManagerConfig, self._config)
@classmethod
def _ensure_client_connected(cls, url: str, token: str) -> None:
"""Ensure the client is connected.
This function initializes the client if it is not initialized.
Args:
url: The url of the Vault server.
token: The token to use to authenticate with Vault.
"""
if cls.CLIENT is None:
# Create a Vault Secrets Manager client
cls.CLIENT = hvac.Client(
url=url,
token=token,
)
def _ensure_client_is_authenticated(self) -> None:
"""Ensure the client is authenticated.
Raises:
RuntimeError: If the client is not initialized or authenticated.
"""
self._ensure_client_connected(
url=self.config.url, token=self.config.token
)
if not self.CLIENT.is_authenticated():
raise RuntimeError(
"There was an error authenticating with Vault. Please check "
"your configuration."
)
else:
pass
def register_secret(self, secret: BaseSecretSchema) -> None:
"""Registers a new secret.
Args:
secret: The secret to register.
Raises:
SecretExistsError: If the secret already exists.
"""
self._ensure_client_is_authenticated()
validate_vault_secret_name_or_namespace(secret.name)
try:
self.get_secret(secret.name)
raise SecretExistsError(
f"A Secret with the name '{secret.name}' already " f"exists."
)
except KeyError:
pass
secret_path = self._get_scoped_secret_name(secret.name)
secret_value = secret_to_dict(secret, encode=False)
self.CLIENT.secrets.kv.v2.create_or_update_secret(
path=secret_path,
mount_point=self.config.mount_point,
secret=secret_value,
)
logger.info("Created secret: %s", f"{secret_path}")
logger.info("Added value to secret.")
def get_secret(self, secret_name: str) -> BaseSecretSchema:
"""Gets the value of a secret.
Args:
secret_name: The name of the secret to get.
Returns:
The secret.
Raises:
KeyError: If the secret does not exist.
"""
self._ensure_client_is_authenticated()
secret_path = self._get_scoped_secret_name(secret_name)
try:
secret_items = (
self.CLIENT.secrets.kv.v2.read_secret_version(
path=secret_path,
mount_point=self.config.mount_point,
)
.get("data", {})
.get("data", {})
)
except InvalidPath as e:
raise KeyError(e)
zenml_schema_name = secret_items.pop(ZENML_SCHEMA_NAME)
secret_schema = SecretSchemaClassRegistry.get_class(
secret_schema=zenml_schema_name
)
secret_items["name"] = secret_name
return secret_schema(**secret_items)
def get_all_secret_keys(self) -> List[str]:
"""List all secrets in Vault without any reformatting.
This function tries to get all secrets from Vault and returns
them as a list of strings (all secrets' names).
Returns:
A list of all secrets in the secrets manager.
"""
self._ensure_client_is_authenticated()
set_of_secrets: Set[str] = set()
secret_path = "/".join(self._get_scope_path())
try:
secrets = self.CLIENT.secrets.kv.v2.list_secrets(
path=secret_path, mount_point=self.config.mount_point
)
except hvac.exceptions.InvalidPath:
logger.error(
f"There are no secrets created within the scope `{secret_path}`"
)
return list(set_of_secrets)
secrets_keys = secrets.get("data", {}).get("keys", [])
for secret_key in secrets_keys:
# vault scopes end with / and are not themselves secrets
if "/" not in secret_key:
set_of_secrets.add(secret_key)
return list(set_of_secrets)
def update_secret(self, secret: BaseSecretSchema) -> None:
"""Update an existing secret.
Args:
secret: The secret to update.
Raises:
KeyError: If the secret does not exist.
"""
self._ensure_client_is_authenticated()
validate_vault_secret_name_or_namespace(secret.name)
if secret.name in self.get_all_secret_keys():
secret_path = self._get_scoped_secret_name(secret.name)
secret_value = secret_to_dict(secret, encode=False)
self.CLIENT.secrets.kv.v2.create_or_update_secret(
path=secret_path,
mount_point=self.config.mount_point,
secret=secret_value,
)
else:
raise KeyError(
f"A Secret with the name '{secret.name}'" f" does not exist."
)
logger.info("Updated secret: %s", secret_path)
logger.info("Added value to secret.")
def delete_secret(self, secret_name: str) -> None:
"""Delete an existing secret.
Args:
secret_name: The name of the secret to delete.
"""
self._ensure_client_is_authenticated()
secret_path = self._get_scoped_secret_name(secret_name)
self.CLIENT.secrets.kv.v2.delete_metadata_and_all_versions(
path=secret_path,
mount_point=self.config.mount_point,
)
logger.info("Deleted secret: %s", f"{secret_path}")
def delete_all_secrets(self) -> None:
"""Delete all existing secrets."""
self._ensure_client_is_authenticated()
for secret_name in self.get_all_secret_keys():
self.delete_secret(secret_name)
logger.info("Deleted all secrets.")
config: VaultSecretsManagerConfig
property
readonly
Returns the VaultSecretsManagerConfig
config.
Returns:
Type | Description |
---|---|
VaultSecretsManagerConfig |
The configuration. |
delete_all_secrets(self)
Delete all existing secrets.
Source code in zenml/integrations/vault/secrets_manager/vault_secrets_manager.py
def delete_all_secrets(self) -> None:
"""Delete all existing secrets."""
self._ensure_client_is_authenticated()
for secret_name in self.get_all_secret_keys():
self.delete_secret(secret_name)
logger.info("Deleted all secrets.")
delete_secret(self, secret_name)
Delete an existing secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_name |
str |
The name of the secret to delete. |
required |
Source code in zenml/integrations/vault/secrets_manager/vault_secrets_manager.py
def delete_secret(self, secret_name: str) -> None:
"""Delete an existing secret.
Args:
secret_name: The name of the secret to delete.
"""
self._ensure_client_is_authenticated()
secret_path = self._get_scoped_secret_name(secret_name)
self.CLIENT.secrets.kv.v2.delete_metadata_and_all_versions(
path=secret_path,
mount_point=self.config.mount_point,
)
logger.info("Deleted secret: %s", f"{secret_path}")
get_all_secret_keys(self)
List all secrets in Vault without any reformatting.
This function tries to get all secrets from Vault and returns them as a list of strings (all secrets' names).
Returns:
Type | Description |
---|---|
List[str] |
A list of all secrets in the secrets manager. |
Source code in zenml/integrations/vault/secrets_manager/vault_secrets_manager.py
def get_all_secret_keys(self) -> List[str]:
"""List all secrets in Vault without any reformatting.
This function tries to get all secrets from Vault and returns
them as a list of strings (all secrets' names).
Returns:
A list of all secrets in the secrets manager.
"""
self._ensure_client_is_authenticated()
set_of_secrets: Set[str] = set()
secret_path = "/".join(self._get_scope_path())
try:
secrets = self.CLIENT.secrets.kv.v2.list_secrets(
path=secret_path, mount_point=self.config.mount_point
)
except hvac.exceptions.InvalidPath:
logger.error(
f"There are no secrets created within the scope `{secret_path}`"
)
return list(set_of_secrets)
secrets_keys = secrets.get("data", {}).get("keys", [])
for secret_key in secrets_keys:
# vault scopes end with / and are not themselves secrets
if "/" not in secret_key:
set_of_secrets.add(secret_key)
return list(set_of_secrets)
get_secret(self, secret_name)
Gets the value of a secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_name |
str |
The name of the secret to get. |
required |
Returns:
Type | Description |
---|---|
BaseSecretSchema |
The secret. |
Exceptions:
Type | Description |
---|---|
KeyError |
If the secret does not exist. |
Source code in zenml/integrations/vault/secrets_manager/vault_secrets_manager.py
def get_secret(self, secret_name: str) -> BaseSecretSchema:
"""Gets the value of a secret.
Args:
secret_name: The name of the secret to get.
Returns:
The secret.
Raises:
KeyError: If the secret does not exist.
"""
self._ensure_client_is_authenticated()
secret_path = self._get_scoped_secret_name(secret_name)
try:
secret_items = (
self.CLIENT.secrets.kv.v2.read_secret_version(
path=secret_path,
mount_point=self.config.mount_point,
)
.get("data", {})
.get("data", {})
)
except InvalidPath as e:
raise KeyError(e)
zenml_schema_name = secret_items.pop(ZENML_SCHEMA_NAME)
secret_schema = SecretSchemaClassRegistry.get_class(
secret_schema=zenml_schema_name
)
secret_items["name"] = secret_name
return secret_schema(**secret_items)
register_secret(self, secret)
Registers a new secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
The secret to register. |
required |
Exceptions:
Type | Description |
---|---|
SecretExistsError |
If the secret already exists. |
Source code in zenml/integrations/vault/secrets_manager/vault_secrets_manager.py
def register_secret(self, secret: BaseSecretSchema) -> None:
"""Registers a new secret.
Args:
secret: The secret to register.
Raises:
SecretExistsError: If the secret already exists.
"""
self._ensure_client_is_authenticated()
validate_vault_secret_name_or_namespace(secret.name)
try:
self.get_secret(secret.name)
raise SecretExistsError(
f"A Secret with the name '{secret.name}' already " f"exists."
)
except KeyError:
pass
secret_path = self._get_scoped_secret_name(secret.name)
secret_value = secret_to_dict(secret, encode=False)
self.CLIENT.secrets.kv.v2.create_or_update_secret(
path=secret_path,
mount_point=self.config.mount_point,
secret=secret_value,
)
logger.info("Created secret: %s", f"{secret_path}")
logger.info("Added value to secret.")
update_secret(self, secret)
Update an existing secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
The secret to update. |
required |
Exceptions:
Type | Description |
---|---|
KeyError |
If the secret does not exist. |
Source code in zenml/integrations/vault/secrets_manager/vault_secrets_manager.py
def update_secret(self, secret: BaseSecretSchema) -> None:
"""Update an existing secret.
Args:
secret: The secret to update.
Raises:
KeyError: If the secret does not exist.
"""
self._ensure_client_is_authenticated()
validate_vault_secret_name_or_namespace(secret.name)
if secret.name in self.get_all_secret_keys():
secret_path = self._get_scoped_secret_name(secret.name)
secret_value = secret_to_dict(secret, encode=False)
self.CLIENT.secrets.kv.v2.create_or_update_secret(
path=secret_path,
mount_point=self.config.mount_point,
secret=secret_value,
)
else:
raise KeyError(
f"A Secret with the name '{secret.name}'" f" does not exist."
)
logger.info("Updated secret: %s", secret_path)
logger.info("Added value to secret.")