Skip to content

Secrets Managers

zenml.secrets_managers special

Initialization for the ZenML secrets manager module.

base_secrets_manager

Base class for ZenML secrets managers.

BaseSecretsManager (StackComponent, ABC) pydantic-model

Base class for all ZenML secrets managers.

Attributes:

Name Type Description
scope SecretsManagerScope

The Secrets Manager scope determines how secrets are visible and shared across different Secrets Manager instances:

  • global: secrets are shared across all Secrets Manager instances that connect to the same backend and have a global scope.
  • component: secrets are not shared outside this Secrets Manager instance.
  • namespace: secrets are shared only by Secrets Manager instances that connect to the same backend and have the same namespace value configured.
  • none: only used to preserve backwards compatibility with old Secrets Manager instances that do not yet understand the concept of secrets scoping. Will be deprecated and removed in future versions.
namespace Optional[str]

Optional namespace to use with a namespace scoped Secrets Manager.

Source code in zenml/secrets_managers/base_secrets_manager.py
class BaseSecretsManager(StackComponent, ABC):
    """Base class for all ZenML secrets managers.

    Attributes:
        scope: The Secrets Manager scope determines how secrets are visible and
            shared across different Secrets Manager instances:

            * global: secrets are shared across all Secrets Manager instances
            that connect to the same backend and have a global scope.
            * component: secrets are not shared outside this Secrets Manager
            instance.
            * namespace: secrets are shared only by Secrets Manager instances
            that connect to the same backend *and* have the same `namespace`
            value configured.
            * none: only used to preserve backwards compatibility with
            old Secrets Manager instances that do not yet understand the
            concept of secrets scoping. Will be deprecated and removed in
            future versions.
        namespace: Optional namespace to use with a `namespace` scoped Secrets
            Manager.
    """

    # Class configuration
    TYPE: ClassVar[StackComponentType] = StackComponentType.SECRETS_MANAGER
    FLAVOR: ClassVar[str]
    SUPPORTS_SCOPING: ClassVar[bool] = False

    scope: SecretsManagerScope = SecretsManagerScope.COMPONENT
    namespace: Optional[str] = None

    @root_validator(pre=True)
    def scope_initializer(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        """Pydantic root_validator for the scope.

        This root validator is used for backwards compatibility purposes. It
        ensures that existing Secrets Managers continue to use no scope for
        secrets while new instances default to using the component scope.

        Args:
            values: Values passed to the object constructor

        Returns:
            Values passed to the object constructor

        Raises:
            ValueError: If the scope value is not valid.
        """
        scope = values.get("scope")

        if scope:
            # fail if the user tries to explicitly use a scope with a
            # Secrets Manager that doesn't support scoping
            if scope != SecretsManagerScope.NONE and not cls.SUPPORTS_SCOPING:
                raise ValueError(
                    f"The {cls.FLAVOR} Secrets Manager does not support "
                    f"scoping. You can only use a `none` scope value."
                )
        elif not cls.SUPPORTS_SCOPING:
            # disable scoping by default for Secrets Managers that don't
            # support scoping
            values["scope"] = SecretsManagerScope.NONE

        elif "uuid" in values:
            # this is an existing Secrets Manager instance without a scope
            # explicitly set (i.e. a legacy Secrets Manager that was already
            # in operation before scoping was introduced). Continue to use
            # unscoped secrets.
            values["scope"] = SecretsManagerScope.NONE

        # warn if the user tries to explicitly disable scoping for a
        # Secrets Manager that does support scoping
        if scope == SecretsManagerScope.NONE and cls.SUPPORTS_SCOPING:
            logger.warning(
                f"Unscoped support for the {cls.FLAVOR} Secrets "
                f"Manager is deprecated and will be removed in a future "
                f"release. You should use the `global` scope instead."
            )

        return values

    @root_validator
    def namespace_validator(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        """Pydantic root validator for the namespace.

        For a namespace scoped Secret Manager, the namespace is required.

        Args:
            values: Values passed to the object constructor

        Returns:
            Values passed to the object constructor

        Raises:
            ValueError: If a namespace is not configured for a namespace scoped
                Secret Manager.
        """
        scope = cast(SecretsManagerScope, values.get("scope"))
        if scope == SecretsManagerScope.NAMESPACE and not values.get(
            "namespace"
        ):
            raise ValueError(
                "A `namespace` value is required for a namespace scoped "
                "Secrets Manager."
            )
        cls._validate_scope(scope, values.get("namespace"))
        return values

    @classmethod
    def _validate_scope(
        cls,
        scope: SecretsManagerScope,
        namespace: Optional[str],
    ) -> None:
        """Validate the scope and namespace value.

        Subclasses should override this method to implement their own scope
        and namespace validation logic (e.g. raise an exception if a scope is
        not supported or if a namespace has an invalid value).

        Args:
            scope: Scope value.
            namespace: Namespace value.
        """
        ...

    def _get_scope_path(self) -> List[str]:
        """Get the secret path for the current scope.

        This utility method can be used with Secrets Managers that can map
        the concept of a scope to a hierarchical path.

        Returns:
            the secret scope path
        """
        if self.scope == SecretsManagerScope.NONE:
            return []

        path = [ZENML_SCOPE_PATH_PREFIX, self.scope]
        if self.scope == SecretsManagerScope.COMPONENT:
            path.append(str(self.uuid))
        elif self.scope == SecretsManagerScope.NAMESPACE and self.namespace:
            path.append(self.namespace)

        return path

    def _get_scoped_secret_path(self, secret_name: str) -> List[str]:
        """Convert a ZenML secret name into a scoped secret path.

        This utility method can be used with Secrets Managers that can map
        the concept of a scope to a hierarchical path.

        Args:
            secret_name: the name of the secret

        Returns:
            The scoped secret path
        """
        path = self._get_scope_path()
        path.append(secret_name)
        return path

    def _get_secret_name_from_path(
        self, secret_path: List[str]
    ) -> Optional[str]:
        """Extract the name of a ZenML secret from a scoped secret path.

        This utility method can be used with Secrets Managers that can map
        the concept of a scope to a hierarchical path.

        Args:
            secret_path: the full scoped secret path including the secret name

        Returns:
            The ZenML secret name or None, if the input secret path does not
            belong to the current scope.
        """
        if not len(secret_path):
            return None
        secret_name = secret_path[-1]
        secret_path = secret_path[:-1]
        if not secret_name or secret_path != self._get_scope_path():
            # secret name is not in the current scope
            return None

        return secret_name

    def _get_scoped_secret_name_prefix(
        self,
        separator: str = ZENML_DEFAULT_SECRET_SCOPE_PATH_SEPARATOR,
    ) -> str:
        """Convert the ZenML secret scope into a scoped secret name prefix.

        This utility method can be used with Secrets Managers that can map
        the concept of a scope to a hierarchical path to compose a scoped
        secret name prefix from the scope path.

        Args:
            separator: the path separator to use when constructing a scoped
                secret prefix from a scope path

        Returns:
            The scoped secret name prefix
        """
        return separator.join(self._get_scope_path()) + separator

    def _get_scoped_secret_name(
        self,
        name: str,
        separator: str = ZENML_DEFAULT_SECRET_SCOPE_PATH_SEPARATOR,
    ) -> str:
        """Convert a ZenML secret name into a scoped secret name.

        This utility method can be used with Secrets Managers that can map
        the concept of a scope to a hierarchical path to compose a scoped
        secret name that includes the scope path from a ZenML secret name.

        Args:
            name: the name of the secret
            separator: the path separator to use when constructing a scoped
                secret name from a scope path

        Returns:
            The scoped secret name
        """
        return separator.join(self._get_scoped_secret_path(name))

    def _get_unscoped_secret_name(
        self,
        name: str,
        separator: str = ZENML_DEFAULT_SECRET_SCOPE_PATH_SEPARATOR,
    ) -> Optional[str]:
        """Extract the name of a ZenML secret from a scoped secret name.

        This utility method can be used with Secrets Managers that can map
        the concept of a scope to a hierarchical path to extract the original
        ZenML secret name from a scoped secret name that includes the scope
        path.

        Args:
            name: the name of the scoped secret
            separator: the path separator to use when constructing a scoped
                secret name from a scope path

        Returns:
            The ZenML secret name or None, if the input secret name does not
            belong to the current scope.
        """
        return self._get_secret_name_from_path(name.split(separator))

    def _get_secret_scope_metadata(
        self, secret_name: Optional[str] = None
    ) -> Dict[str, str]:
        """Get a dictionary with metadata uniquely identifying one or more scoped secrets.

        This utility method can be used with Secrets Managers that can
        associate metadata (e.g. tags, labels) with a secret. The scope related
        metadata can be used as a filter criteria when running queries against
        the backend to retrieve all the secrets within the current scope or
        to retrieve a named secret within the current scope (if the
        `secret_name` is supplied).

        Args:
            secret_name: Optional secret name for which to get the scope
                metadata.

        Returns:
            Dictionary with scope metadata information uniquely identifying the
            secret.
        """
        if self.scope == SecretsManagerScope.NONE:
            # unscoped secrets do not have tags, for backwards compatibility
            # purposes
            return {}

        metadata = {
            "zenml_scope": self.scope.value,
        }
        if secret_name:
            metadata[ZENML_SECRET_NAME_LABEL] = secret_name
        if self.scope == SecretsManagerScope.NAMESPACE and self.namespace:
            metadata["zenml_namespace"] = self.namespace
        if self.scope == SecretsManagerScope.COMPONENT:
            metadata["zenml_component_uuid"] = str(self.uuid)

        return metadata

    def _get_secret_metadata(self, secret: BaseSecretSchema) -> Dict[str, str]:
        """Get a dictionary with metadata describing a secret.

        This is utility method can be used with Secrets Managers that can
        associate metadata (e.g. tags, labels) with a secret. The metadata
        can be used to relay more information about the secret in the Secrets
        Manager backend. Part of the metadata can also be used as a filter
        criteria when running queries against the backend to retrieve all the
        secrets within the current scope or to retrieve a named secret (see
        `_get_secret_scope_metadata`).

        Args:
            secret: The secret for which to get the metadata.

        Returns:
            Dictionary with metadata information describing the secret.
        """
        if self.scope == SecretsManagerScope.NONE:
            # unscoped secrets do not have tags, for backwards compatibility
            # purposes
            return {}

        scope_metadata = self._get_secret_scope_metadata(secret.name)
        # include additional metadata not necessarily related to the secret
        # scope
        scope_metadata.update(
            {
                "zenml_component_name": self.name,
                "zenml_component_uuid": str(self.uuid),
                "zenml_secret_schema": secret.TYPE,
            }
        )

        return scope_metadata

    @abstractmethod
    def register_secret(self, secret: BaseSecretSchema) -> None:
        """Registers a new secret.

        The implementation should throw a `SecretExistsError` exception if the
        secret already exists.

        Args:
            secret: The secret to register.
        """

    @abstractmethod
    def get_secret(self, secret_name: str) -> BaseSecretSchema:
        """Gets the value of a secret.

        The implementation should throw a `KeyError` exception if the
        secret doesn't exist.

        Args:
            secret_name: The name of the secret to get.
        """

    @abstractmethod
    def get_all_secret_keys(self) -> List[str]:
        """Get all secret keys."""

    @abstractmethod
    def update_secret(self, secret: BaseSecretSchema) -> None:
        """Update an existing secret.

        The implementation should throw a `KeyError` exception if the
        secret doesn't exist.

        Args:
            secret: The secret to update.
        """

    @abstractmethod
    def delete_secret(self, secret_name: str) -> None:
        """Delete an existing secret.

        The implementation should throw a `KeyError` exception if the
        secret doesn't exist.

        Args:
            secret_name: The name of the secret to delete.
        """

    @abstractmethod
    def delete_all_secrets(self) -> None:
        """Delete all existing secrets."""
delete_all_secrets(self)

Delete all existing secrets.

Source code in zenml/secrets_managers/base_secrets_manager.py
@abstractmethod
def delete_all_secrets(self) -> None:
    """Delete all existing secrets."""
delete_secret(self, secret_name)

Delete an existing secret.

The implementation should throw a KeyError exception if the secret doesn't exist.

Parameters:

Name Type Description Default
secret_name str

The name of the secret to delete.

required
Source code in zenml/secrets_managers/base_secrets_manager.py
@abstractmethod
def delete_secret(self, secret_name: str) -> None:
    """Delete an existing secret.

    The implementation should throw a `KeyError` exception if the
    secret doesn't exist.

    Args:
        secret_name: The name of the secret to delete.
    """
get_all_secret_keys(self)

Get all secret keys.

Source code in zenml/secrets_managers/base_secrets_manager.py
@abstractmethod
def get_all_secret_keys(self) -> List[str]:
    """Get all secret keys."""
get_secret(self, secret_name)

Gets the value of a secret.

The implementation should throw a KeyError exception if the secret doesn't exist.

Parameters:

Name Type Description Default
secret_name str

The name of the secret to get.

required
Source code in zenml/secrets_managers/base_secrets_manager.py
@abstractmethod
def get_secret(self, secret_name: str) -> BaseSecretSchema:
    """Gets the value of a secret.

    The implementation should throw a `KeyError` exception if the
    secret doesn't exist.

    Args:
        secret_name: The name of the secret to get.
    """
namespace_validator(values) classmethod

Pydantic root validator for the namespace.

For a namespace scoped Secret Manager, the namespace is required.

Parameters:

Name Type Description Default
values Dict[str, Any]

Values passed to the object constructor

required

Returns:

Type Description
Dict[str, Any]

Values passed to the object constructor

Exceptions:

Type Description
ValueError

If a namespace is not configured for a namespace scoped Secret Manager.

Source code in zenml/secrets_managers/base_secrets_manager.py
@root_validator
def namespace_validator(cls, values: Dict[str, Any]) -> Dict[str, Any]:
    """Pydantic root validator for the namespace.

    For a namespace scoped Secret Manager, the namespace is required.

    Args:
        values: Values passed to the object constructor

    Returns:
        Values passed to the object constructor

    Raises:
        ValueError: If a namespace is not configured for a namespace scoped
            Secret Manager.
    """
    scope = cast(SecretsManagerScope, values.get("scope"))
    if scope == SecretsManagerScope.NAMESPACE and not values.get(
        "namespace"
    ):
        raise ValueError(
            "A `namespace` value is required for a namespace scoped "
            "Secrets Manager."
        )
    cls._validate_scope(scope, values.get("namespace"))
    return values
register_secret(self, secret)

Registers a new secret.

The implementation should throw a SecretExistsError exception if the secret already exists.

Parameters:

Name Type Description Default
secret BaseSecretSchema

The secret to register.

required
Source code in zenml/secrets_managers/base_secrets_manager.py
@abstractmethod
def register_secret(self, secret: BaseSecretSchema) -> None:
    """Registers a new secret.

    The implementation should throw a `SecretExistsError` exception if the
    secret already exists.

    Args:
        secret: The secret to register.
    """
scope_initializer(values) classmethod

Pydantic root_validator for the scope.

This root validator is used for backwards compatibility purposes. It ensures that existing Secrets Managers continue to use no scope for secrets while new instances default to using the component scope.

Parameters:

Name Type Description Default
values Dict[str, Any]

Values passed to the object constructor

required

Returns:

Type Description
Dict[str, Any]

Values passed to the object constructor

Exceptions:

Type Description
ValueError

If the scope value is not valid.

Source code in zenml/secrets_managers/base_secrets_manager.py
@root_validator(pre=True)
def scope_initializer(cls, values: Dict[str, Any]) -> Dict[str, Any]:
    """Pydantic root_validator for the scope.

    This root validator is used for backwards compatibility purposes. It
    ensures that existing Secrets Managers continue to use no scope for
    secrets while new instances default to using the component scope.

    Args:
        values: Values passed to the object constructor

    Returns:
        Values passed to the object constructor

    Raises:
        ValueError: If the scope value is not valid.
    """
    scope = values.get("scope")

    if scope:
        # fail if the user tries to explicitly use a scope with a
        # Secrets Manager that doesn't support scoping
        if scope != SecretsManagerScope.NONE and not cls.SUPPORTS_SCOPING:
            raise ValueError(
                f"The {cls.FLAVOR} Secrets Manager does not support "
                f"scoping. You can only use a `none` scope value."
            )
    elif not cls.SUPPORTS_SCOPING:
        # disable scoping by default for Secrets Managers that don't
        # support scoping
        values["scope"] = SecretsManagerScope.NONE

    elif "uuid" in values:
        # this is an existing Secrets Manager instance without a scope
        # explicitly set (i.e. a legacy Secrets Manager that was already
        # in operation before scoping was introduced). Continue to use
        # unscoped secrets.
        values["scope"] = SecretsManagerScope.NONE

    # warn if the user tries to explicitly disable scoping for a
    # Secrets Manager that does support scoping
    if scope == SecretsManagerScope.NONE and cls.SUPPORTS_SCOPING:
        logger.warning(
            f"Unscoped support for the {cls.FLAVOR} Secrets "
            f"Manager is deprecated and will be removed in a future "
            f"release. You should use the `global` scope instead."
        )

    return values
update_secret(self, secret)

Update an existing secret.

The implementation should throw a KeyError exception if the secret doesn't exist.

Parameters:

Name Type Description Default
secret BaseSecretSchema

The secret to update.

required
Source code in zenml/secrets_managers/base_secrets_manager.py
@abstractmethod
def update_secret(self, secret: BaseSecretSchema) -> None:
    """Update an existing secret.

    The implementation should throw a `KeyError` exception if the
    secret doesn't exist.

    Args:
        secret: The secret to update.
    """

SecretsManagerScope (StrEnum)

Secrets Manager scope enum.

Source code in zenml/secrets_managers/base_secrets_manager.py
class SecretsManagerScope(StrEnum):
    """Secrets Manager scope enum."""

    NONE = "none"
    GLOBAL = "global"
    COMPONENT = "component"
    NAMESPACE = "namespace"

local special

Initialization of the ZenML local secrets manager.

local_secrets_manager

Implementation of the ZenML local secrets manager.

LocalSecretsManager (BaseSecretsManager) pydantic-model

Class for ZenML local file-based secret manager.

Source code in zenml/secrets_managers/local/local_secrets_manager.py
class LocalSecretsManager(BaseSecretsManager):
    """Class for ZenML local file-based secret manager."""

    secrets_file: str = ""

    # Class configuration
    FLAVOR: ClassVar[str] = "local"

    @root_validator(skip_on_failure=True)
    def set_secrets_file(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        """Sets the secrets_file attribute value according to the component UUID.

        Args:
            values: The values to validate.

        Returns:
            The validated values.
        """
        if values.get("secrets_file"):
            return values

        # not likely to happen, due to Pydantic validation, but mypy complains
        assert "uuid" in values

        values["secrets_file"] = cls.get_secret_store_path(values["uuid"])
        return values

    @staticmethod
    def get_secret_store_path(uuid: uuid.UUID) -> str:
        """Get the path to the secret store.

        Args:
            uuid: The UUID of the secret store.

        Returns:
            The path to the secret store.
        """
        return os.path.join(
            get_global_config_directory(),
            LOCAL_STORES_DIRECTORY_NAME,
            str(uuid),
            LOCAL_SECRETS_FILENAME,
        )

    @property
    def local_path(self) -> str:
        """Path to the local directory where the secrets are stored.

        Returns:
            The path to the local directory where the secrets are stored.
        """
        return str(Path(self.secrets_file).parent)

    def _create_secrets_file__if_not_exists(self) -> None:
        """Makes sure the secrets yaml file exists."""
        create_file_if_not_exists(self.secrets_file)

    def _verify_secret_key_exists(self, secret_name: str) -> bool:
        """Checks if a secret key exists.

        Args:
            secret_name: The name of the secret key.

        Returns:
            True if the secret key exists, False otherwise.
        """
        self._create_secrets_file__if_not_exists()
        secrets_store_items = yaml_utils.read_yaml(self.secrets_file)
        try:
            return secret_name in secrets_store_items
        except TypeError:
            return False

    def _get_all_secrets(self) -> Dict[str, Dict[str, str]]:
        """Gets all secrets.

        Returns:
            A dictionary containing all secrets.
        """
        self._create_secrets_file__if_not_exists()
        return yaml_utils.read_yaml(self.secrets_file) or {}

    def register_secret(self, secret: BaseSecretSchema) -> None:
        """Registers a new secret.

        Args:
            secret: The secret to register.

        Raises:
            SecretExistsError: If the secret already exists.
        """
        self._create_secrets_file__if_not_exists()

        if self._verify_secret_key_exists(secret_name=secret.name):
            raise SecretExistsError(f"Secret `{secret.name}` already exists.")
        encoded_secret = encode_secret(secret)

        secrets_store_items = self._get_all_secrets()
        secrets_store_items[secret.name] = encoded_secret
        yaml_utils.append_yaml(self.secrets_file, secrets_store_items)

    def get_secret(self, secret_name: str) -> BaseSecretSchema:
        """Gets a specific secret.

        Args:
            secret_name: The name of the secret.

        Returns:
            The secret.

        Raises:
            KeyError: If the secret does not exist.
        """
        self._create_secrets_file__if_not_exists()

        secret_store_items = self._get_all_secrets()
        if not self._verify_secret_key_exists(secret_name=secret_name):
            raise KeyError(f"Secret `{secret_name}` does not exists.")
        secret_dict = secret_store_items[secret_name]

        decoded_secret_dict, zenml_schema_name = decode_secret_dict(secret_dict)
        decoded_secret_dict["name"] = secret_name

        secret_schema = SecretSchemaClassRegistry.get_class(
            secret_schema=zenml_schema_name
        )
        return secret_schema(**decoded_secret_dict)

    def get_all_secret_keys(self) -> List[str]:
        """Get all secret keys.

        Returns:
            A list of all secret keys.
        """
        self._create_secrets_file__if_not_exists()

        secrets_store_items = self._get_all_secrets()
        return list(secrets_store_items.keys())

    def update_secret(self, secret: BaseSecretSchema) -> None:
        """Update an existing secret.

        Args:
            secret: The secret to update.

        Raises:
            KeyError: If the secret does not exist.
        """
        self._create_secrets_file__if_not_exists()

        if not self._verify_secret_key_exists(secret_name=secret.name):
            raise KeyError(f"Secret `{secret.name}` did not exist.")
        encoded_secret = encode_secret(secret)

        secrets_store_items = self._get_all_secrets()
        secrets_store_items[secret.name] = encoded_secret
        yaml_utils.append_yaml(self.secrets_file, secrets_store_items)

    def delete_secret(self, secret_name: str) -> None:
        """Delete an existing secret.

        Args:
            secret_name: The name of the secret to delete.

        Raises:
            KeyError: If the secret does not exist.
        """
        self._create_secrets_file__if_not_exists()

        if not self._verify_secret_key_exists(secret_name=secret_name):
            raise KeyError(f"Secret `{secret_name}` does not exists.")
        secrets_store_items = self._get_all_secrets()

        try:
            secrets_store_items.pop(secret_name)
            yaml_utils.write_yaml(self.secrets_file, secrets_store_items)
        except KeyError:
            error(f"Secret {secret_name} does not exist.")

    def delete_all_secrets(self) -> None:
        """Delete all existing secrets."""
        self._create_secrets_file__if_not_exists()
        remove(self.secrets_file)
local_path: str property readonly

Path to the local directory where the secrets are stored.

Returns:

Type Description
str

The path to the local directory where the secrets are stored.

delete_all_secrets(self)

Delete all existing secrets.

Source code in zenml/secrets_managers/local/local_secrets_manager.py
def delete_all_secrets(self) -> None:
    """Delete all existing secrets."""
    self._create_secrets_file__if_not_exists()
    remove(self.secrets_file)
delete_secret(self, secret_name)

Delete an existing secret.

Parameters:

Name Type Description Default
secret_name str

The name of the secret to delete.

required

Exceptions:

Type Description
KeyError

If the secret does not exist.

Source code in zenml/secrets_managers/local/local_secrets_manager.py
def delete_secret(self, secret_name: str) -> None:
    """Delete an existing secret.

    Args:
        secret_name: The name of the secret to delete.

    Raises:
        KeyError: If the secret does not exist.
    """
    self._create_secrets_file__if_not_exists()

    if not self._verify_secret_key_exists(secret_name=secret_name):
        raise KeyError(f"Secret `{secret_name}` does not exists.")
    secrets_store_items = self._get_all_secrets()

    try:
        secrets_store_items.pop(secret_name)
        yaml_utils.write_yaml(self.secrets_file, secrets_store_items)
    except KeyError:
        error(f"Secret {secret_name} does not exist.")
get_all_secret_keys(self)

Get all secret keys.

Returns:

Type Description
List[str]

A list of all secret keys.

Source code in zenml/secrets_managers/local/local_secrets_manager.py
def get_all_secret_keys(self) -> List[str]:
    """Get all secret keys.

    Returns:
        A list of all secret keys.
    """
    self._create_secrets_file__if_not_exists()

    secrets_store_items = self._get_all_secrets()
    return list(secrets_store_items.keys())
get_secret(self, secret_name)

Gets a specific secret.

Parameters:

Name Type Description Default
secret_name str

The name of the secret.

required

Returns:

Type Description
BaseSecretSchema

The secret.

Exceptions:

Type Description
KeyError

If the secret does not exist.

Source code in zenml/secrets_managers/local/local_secrets_manager.py
def get_secret(self, secret_name: str) -> BaseSecretSchema:
    """Gets a specific secret.

    Args:
        secret_name: The name of the secret.

    Returns:
        The secret.

    Raises:
        KeyError: If the secret does not exist.
    """
    self._create_secrets_file__if_not_exists()

    secret_store_items = self._get_all_secrets()
    if not self._verify_secret_key_exists(secret_name=secret_name):
        raise KeyError(f"Secret `{secret_name}` does not exists.")
    secret_dict = secret_store_items[secret_name]

    decoded_secret_dict, zenml_schema_name = decode_secret_dict(secret_dict)
    decoded_secret_dict["name"] = secret_name

    secret_schema = SecretSchemaClassRegistry.get_class(
        secret_schema=zenml_schema_name
    )
    return secret_schema(**decoded_secret_dict)
get_secret_store_path(uuid) staticmethod

Get the path to the secret store.

Parameters:

Name Type Description Default
uuid UUID

The UUID of the secret store.

required

Returns:

Type Description
str

The path to the secret store.

Source code in zenml/secrets_managers/local/local_secrets_manager.py
@staticmethod
def get_secret_store_path(uuid: uuid.UUID) -> str:
    """Get the path to the secret store.

    Args:
        uuid: The UUID of the secret store.

    Returns:
        The path to the secret store.
    """
    return os.path.join(
        get_global_config_directory(),
        LOCAL_STORES_DIRECTORY_NAME,
        str(uuid),
        LOCAL_SECRETS_FILENAME,
    )
register_secret(self, secret)

Registers a new secret.

Parameters:

Name Type Description Default
secret BaseSecretSchema

The secret to register.

required

Exceptions:

Type Description
SecretExistsError

If the secret already exists.

Source code in zenml/secrets_managers/local/local_secrets_manager.py
def register_secret(self, secret: BaseSecretSchema) -> None:
    """Registers a new secret.

    Args:
        secret: The secret to register.

    Raises:
        SecretExistsError: If the secret already exists.
    """
    self._create_secrets_file__if_not_exists()

    if self._verify_secret_key_exists(secret_name=secret.name):
        raise SecretExistsError(f"Secret `{secret.name}` already exists.")
    encoded_secret = encode_secret(secret)

    secrets_store_items = self._get_all_secrets()
    secrets_store_items[secret.name] = encoded_secret
    yaml_utils.append_yaml(self.secrets_file, secrets_store_items)
set_secrets_file(values) classmethod

Sets the secrets_file attribute value according to the component UUID.

Parameters:

Name Type Description Default
values Dict[str, Any]

The values to validate.

required

Returns:

Type Description
Dict[str, Any]

The validated values.

Source code in zenml/secrets_managers/local/local_secrets_manager.py
@root_validator(skip_on_failure=True)
def set_secrets_file(cls, values: Dict[str, Any]) -> Dict[str, Any]:
    """Sets the secrets_file attribute value according to the component UUID.

    Args:
        values: The values to validate.

    Returns:
        The validated values.
    """
    if values.get("secrets_file"):
        return values

    # not likely to happen, due to Pydantic validation, but mypy complains
    assert "uuid" in values

    values["secrets_file"] = cls.get_secret_store_path(values["uuid"])
    return values
update_secret(self, secret)

Update an existing secret.

Parameters:

Name Type Description Default
secret BaseSecretSchema

The secret to update.

required

Exceptions:

Type Description
KeyError

If the secret does not exist.

Source code in zenml/secrets_managers/local/local_secrets_manager.py
def update_secret(self, secret: BaseSecretSchema) -> None:
    """Update an existing secret.

    Args:
        secret: The secret to update.

    Raises:
        KeyError: If the secret does not exist.
    """
    self._create_secrets_file__if_not_exists()

    if not self._verify_secret_key_exists(secret_name=secret.name):
        raise KeyError(f"Secret `{secret.name}` did not exist.")
    encoded_secret = encode_secret(secret)

    secrets_store_items = self._get_all_secrets()
    secrets_store_items[secret.name] = encoded_secret
    yaml_utils.append_yaml(self.secrets_file, secrets_store_items)

utils

Utility functions for the ZenML secrets manager module.

decode_secret_dict(secret_dict)

Base64 decode a Secret.

Parameters:

Name Type Description Default
secret_dict Dict[str, str]

dict containing key-value pairs to decode

required

Returns:

Type Description
Tuple[Dict[str, str], str]

Decoded secret Dict containing key-value pairs

Source code in zenml/secrets_managers/utils.py
def decode_secret_dict(
    secret_dict: Dict[str, str]
) -> Tuple[Dict[str, str], str]:
    """Base64 decode a Secret.

    Args:
        secret_dict: dict containing key-value pairs to decode

    Returns:
        Decoded secret Dict containing key-value pairs
    """
    zenml_schema_name = secret_dict.pop(ZENML_SCHEMA_NAME)

    decoded_secret = {k: decode_string(v) for k, v in secret_dict.items()}
    return decoded_secret, zenml_schema_name

decode_string(string)

Base64 decode a string.

Parameters:

Name Type Description Default
string str

String to decode

required

Returns:

Type Description
str

Decoded string

Source code in zenml/secrets_managers/utils.py
def decode_string(string: str) -> str:
    """Base64 decode a string.

    Args:
        string: String to decode

    Returns:
        Decoded string
    """
    decoded_bytes = base64.b64decode(string)
    return str(decoded_bytes, "utf-8")

encode_secret(secret)

Base64 encode all values within a secret.

Parameters:

Name Type Description Default
secret BaseSecretSchema

Secret containing key-value pairs

required

Returns:

Type Description
Dict[str, str]

Encoded secret Dict containing key-value pairs

Source code in zenml/secrets_managers/utils.py
def encode_secret(secret: BaseSecretSchema) -> Dict[str, str]:
    """Base64 encode all values within a secret.

    Args:
        secret: Secret containing key-value pairs

    Returns:
        Encoded secret Dict containing key-value pairs
    """
    encoded_secret = {
        k: encode_string(str(v))
        for k, v in secret.content.items()
        if v is not None
    }
    encoded_secret[ZENML_SCHEMA_NAME] = secret.TYPE
    return encoded_secret

encode_string(string)

Base64 encode a string.

Parameters:

Name Type Description Default
string str

String to encode

required

Returns:

Type Description
str

Encoded string

Source code in zenml/secrets_managers/utils.py
def encode_string(string: str) -> str:
    """Base64 encode a string.

    Args:
        string: String to encode

    Returns:
        Encoded string
    """
    encoded_bytes = base64.b64encode(string.encode("utf-8"))
    return str(encoded_bytes, "utf-8")

secret_from_dict(secret_dict, secret_name='', decode=False)

Converts a dictionary secret representation into a secret.

Parameters:

Name Type Description Default
secret_dict Dict[str, Any]

a dictionary representation of a secret

required
secret_name str

optional name for the secret, defaults to empty string

''
decode bool

if true, decodes the secret values using base64

False

Returns:

Type Description
BaseSecretSchema

A secret instance containing all key-value pairs loaded from the JSON representation and of the ZenML schema type indicated in the JSON.

Source code in zenml/secrets_managers/utils.py
def secret_from_dict(
    secret_dict: Dict[str, Any], secret_name: str = "", decode: bool = False
) -> BaseSecretSchema:
    """Converts a dictionary secret representation into a secret.

    Args:
        secret_dict: a dictionary representation of a secret
        secret_name: optional name for the secret, defaults to empty string
        decode: if true, decodes the secret values using base64

    Returns:
        A secret instance containing all key-value pairs loaded from the JSON
        representation and of the ZenML schema type indicated in the JSON.
    """
    from zenml.secret.secret_schema_class_registry import (
        SecretSchemaClassRegistry,
    )

    secret_contents = secret_dict.copy()

    if decode:
        secret_contents, zenml_schema_name = decode_secret_dict(secret_contents)
    else:
        zenml_schema_name = secret_contents.pop(ZENML_SCHEMA_NAME)

    secret_contents["name"] = secret_name

    secret_schema = SecretSchemaClassRegistry.get_class(
        secret_schema=zenml_schema_name
    )
    return secret_schema(**secret_contents)

secret_to_dict(secret, encode=False)

Converts a secret to a dict representation with the schema.

This includes the schema type in the secret's JSON representation, so that the correct SecretSchema can be retrieved when the secret is loaded.

Parameters:

Name Type Description Default
secret BaseSecretSchema

a subclass of the BaseSecretSchema class

required
encode bool

if true, encodes the secret values using base64 encoding

False

Returns:

Type Description
Dict[str, Any]

A dict representation containing all key-value pairs and the ZenML schema type.

Source code in zenml/secrets_managers/utils.py
def secret_to_dict(
    secret: BaseSecretSchema, encode: bool = False
) -> Dict[str, Any]:
    """Converts a secret to a dict representation with the schema.

    This includes the schema type in the secret's JSON representation, so that
    the correct SecretSchema can be retrieved when the secret is loaded.

    Args:
        secret: a subclass of the BaseSecretSchema class
        encode: if true, encodes the secret values using base64 encoding

    Returns:
        A dict representation containing all key-value pairs and the ZenML
        schema type.
    """
    if encode:
        secret_contents = encode_secret(secret)
    else:
        secret_contents = secret.content
        secret_contents[ZENML_SCHEMA_NAME] = secret.TYPE

    return secret_contents