Skip to content

Stack Stores

zenml.stack_stores special

The stack store defines exactly where and how stacks are persisted across their life.

base_stack_store

BaseStackStore (ABC)

Base class for accessing data in ZenML Repository and new Service.

Source code in zenml/stack_stores/base_stack_store.py
class BaseStackStore(ABC):
    """Base class for accessing data in ZenML Repository and new Service."""

    def initialize(
        self,
        url: str,
        skip_default_stack: bool = False,
        *args: Any,
        **kwargs: Any,
    ) -> "BaseStackStore":
        """Initialize the store.
        Args:
            url: The URL of the store.
            skip_default_stack: If True, the creation of the default stack will
                be skipped.
            *args: Additional arguments to pass to the concrete store
                implementation.
            **kwargs: Additional keyword arguments to pass to the concrete
                store implementation.
        Returns:
            The initialized concrete store instance.
        """

        if not skip_default_stack and self.is_empty:

            logger.info("Initializing store...")
            self.register_default_stack()

        return self

    # Statics:

    @staticmethod
    @abstractmethod
    def get_path_from_url(url: str) -> Optional[Path]:
        """Get the path from a URL, if it points or is backed by a local file.

        Args:
            url: The URL to get the path from.

        Returns:
            The local path backed by the URL, or None if the URL is not backed
            by a local file or directory
        """

    @staticmethod
    @abstractmethod
    def get_local_url(path: str) -> str:
        """Get a local URL for a given local path.

        Args:
             path: the path string to build a URL out of.

        Returns:
            Url pointing to the path for the store type.
        """

    @staticmethod
    @abstractmethod
    def is_valid_url(url: str) -> bool:
        """Check if the given url is valid."""

    # Public Interface:

    @property
    @abstractmethod
    def type(self) -> StoreType:
        """The type of stack store."""

    @property
    @abstractmethod
    def url(self) -> str:
        """Get the repository URL."""

    @property
    @abstractmethod
    def is_empty(self) -> bool:
        """Check if the store is empty (no stacks are configured).

        The implementation of this method should check if the store is empty
        without having to load all the stacks from the persistent storage.
        """

    @abstractmethod
    def get_stack_configuration(
        self, name: str
    ) -> Dict[StackComponentType, str]:
        """Fetches a stack configuration by name.

        Args:
            name: The name of the stack to fetch.

        Returns:
            Dict[StackComponentType, str] for the requested stack name.

        Raises:
            KeyError: If no stack exists for the given name.
        """

    @property
    @abstractmethod
    def stack_configurations(self) -> Dict[str, Dict[StackComponentType, str]]:
        """Configurations for all stacks registered in this stack store.

        Returns:
            Dictionary mapping stack names to Dict[StackComponentType, str]'s
        """

    @abstractmethod
    def register_stack_component(
        self,
        component: StackComponentWrapper,
    ) -> None:
        """Register a stack component.

        Args:
            component: The component to register.

        Raises:
            StackComponentExistsError: If a stack component with the same type
                and name already exists.
        """

    @abstractmethod
    def deregister_stack(self, name: str) -> None:
        """Delete a stack from storage.

        Args:
            name: The name of the stack to be deleted.

        Raises:
            KeyError: If no stack exists for the given name.
        """

    # Private interface (must be implemented, not to be called by user):

    @abstractmethod
    def _create_stack(
        self, name: str, stack_configuration: Dict[StackComponentType, str]
    ) -> None:
        """Add a stack to storage.

        Args:
            name: The name to save the stack as.
            stack_configuration: Dict[StackComponentType, str] to persist.
        """

    @abstractmethod
    def _get_component_flavor_and_config(
        self, component_type: StackComponentType, name: str
    ) -> Tuple[str, bytes]:
        """Fetch the flavor and configuration for a stack component.

        Args:
            component_type: The type of the component to fetch.
            name: The name of the component to fetch.

        Returns:
            Pair of (flavor, configuration) for stack component, as string and
            base64-encoded yaml document, respectively

        Raises:
            KeyError: If no stack component exists for the given type and name.
        """

    @abstractmethod
    def _get_stack_component_names(
        self, component_type: StackComponentType
    ) -> List[str]:
        """Get names of all registered stack components of a given type.

        Args:
            component_type: The type of the component to list names for.

        Returns:
            A list of names as strings.
        """

    @abstractmethod
    def _delete_stack_component(
        self, component_type: StackComponentType, name: str
    ) -> None:
        """Remove a StackComponent from storage.

        Args:
            component_type: The type of component to delete.
            name: Then name of the component to delete.

        Raises:
            KeyError: If no component exists for given type and name.
        """

    # Common code (user facing):

    @property
    def stacks(self) -> List[StackWrapper]:
        """All stacks registered in this stack store."""
        return [
            self._stack_from_dict(name, conf)
            for name, conf in self.stack_configurations.items()
        ]

    def get_stack(self, name: str) -> StackWrapper:
        """Fetch a stack by name.

        Args:
            name: The name of the stack to retrieve.

        Returns:
            StackWrapper instance if the stack exists.

        Raises:
            KeyError: If no stack exists for the given name.
        """
        return self._stack_from_dict(name, self.get_stack_configuration(name))

    def register_stack(self, stack: StackWrapper) -> Dict[str, str]:
        """Register a stack and its components.

        If any of the stacks' components aren't registered in the stack store
        yet, this method will try to register them as well.

        Args:
            stack: The stack to register.

        Returns:
            metadata dict for telemetry or logging.

        Raises:
            StackExistsError: If a stack with the same name already exists.
            StackComponentExistsError: If a component of the stack wasn't
                registered and a different component with the same name
                already exists.
        """
        try:
            self.get_stack(stack.name)
        except KeyError:
            pass
        else:
            raise StackExistsError(
                f"Unable to register stack with name '{stack.name}': Found "
                f"existing stack with this name."
            )

        def __check_component(
            component: StackComponentWrapper,
        ) -> Tuple[StackComponentType, str]:
            """Try to register a stack component, if it doesn't exist.

            Args:
                component: StackComponentWrapper to register.

            Returns:
                metadata key value pair for telemetry.

            Raises:
                StackComponentExistsError: If a component with same name exists.
            """
            try:
                existing_component = self.get_stack_component(
                    component_type=component.type, name=component.name
                )
                if existing_component.uuid != component.uuid:
                    raise StackComponentExistsError(
                        f"Unable to register one of the stacks components: "
                        f"A component of type '{component.type}' and name "
                        f"'{component.name}' already exists."
                    )
            except KeyError:
                self.register_stack_component(component)
            return component.type, component.name

        stack_configuration = {
            typ: name for typ, name in map(__check_component, stack.components)
        }
        metadata = {c.type.value: c.flavor for c in stack.components}
        self._create_stack(stack.name, stack_configuration)
        return metadata

    def get_stack_component(
        self, component_type: StackComponentType, name: str
    ) -> StackComponentWrapper:
        """Get a registered stack component.

        Raises:
            KeyError: If no component with the requested type and name exists.
        """
        flavor, config = self._get_component_flavor_and_config(
            component_type, name=name
        )
        uuid = yaml.safe_load(base64.b64decode(config).decode())["uuid"]
        return StackComponentWrapper(
            type=component_type,
            flavor=flavor,
            name=name,
            uuid=uuid,
            config=config,
        )

    def get_stack_components(
        self, component_type: StackComponentType
    ) -> List[StackComponentWrapper]:
        """Fetches all registered stack components of the given type.

        Args:
            component_type: StackComponentType to list members of

        Returns:
            A list of StackComponentConfiguration instances.
        """
        return [
            self.get_stack_component(component_type=component_type, name=name)
            for name in self._get_stack_component_names(component_type)
        ]

    def deregister_stack_component(
        self, component_type: StackComponentType, name: str
    ) -> None:
        """Deregisters a stack component.

        Args:
            component_type: The type of the component to deregister.
            name: The name of the component to deregister.

        Raises:
            ValueError: if trying to deregister a component that's part
                of a stack.
        """
        for stack_name, stack_config in self.stack_configurations.items():
            if stack_config.get(component_type) == name:
                raise ValueError(
                    f"Unable to deregister stack component (type: "
                    f"{component_type}, name: {name}) that is part of a "
                    f"registered stack (stack name: '{stack_name}')."
                )
        self._delete_stack_component(component_type, name=name)

    def register_default_stack(self) -> None:
        """Populates the store with the default Stack.

        The default stack contains a local orchestrator,
        a local artifact store and a local SQLite metadata store.
        """
        stack = Stack.default_local_stack()
        metadata = self.register_stack(StackWrapper.from_stack(stack))
        metadata["store_type"] = self.type.value
        track_event(AnalyticsEvent.REGISTERED_STACK, metadata=metadata)

    # Common code (internal implementations, private):

    def _stack_from_dict(
        self, name: str, stack_configuration: Dict[StackComponentType, str]
    ) -> StackWrapper:
        """Build a StackWrapper from stored configurations"""
        stack_components = [
            self.get_stack_component(
                component_type=component_type, name=component_name
            )
            for component_type, component_name in stack_configuration.items()
        ]
        return StackWrapper(name=name, components=stack_components)
is_empty: bool property readonly

Check if the store is empty (no stacks are configured).

The implementation of this method should check if the store is empty without having to load all the stacks from the persistent storage.

stack_configurations: Dict[str, Dict[zenml.enums.StackComponentType, str]] property readonly

Configurations for all stacks registered in this stack store.

Returns:

Type Description
Dict[str, Dict[zenml.enums.StackComponentType, str]]

Dictionary mapping stack names to Dict[StackComponentType, str]'s

stacks: List[zenml.stack_stores.models.stack_wrapper.StackWrapper] property readonly

All stacks registered in this stack store.

type: StoreType property readonly

The type of stack store.

url: str property readonly

Get the repository URL.

deregister_stack(self, name)

Delete a stack from storage.

Parameters:

Name Type Description Default
name str

The name of the stack to be deleted.

required

Exceptions:

Type Description
KeyError

If no stack exists for the given name.

Source code in zenml/stack_stores/base_stack_store.py
@abstractmethod
def deregister_stack(self, name: str) -> None:
    """Delete a stack from storage.

    Args:
        name: The name of the stack to be deleted.

    Raises:
        KeyError: If no stack exists for the given name.
    """
deregister_stack_component(self, component_type, name)

Deregisters a stack component.

Parameters:

Name Type Description Default
component_type StackComponentType

The type of the component to deregister.

required
name str

The name of the component to deregister.

required

Exceptions:

Type Description
ValueError

if trying to deregister a component that's part of a stack.

Source code in zenml/stack_stores/base_stack_store.py
def deregister_stack_component(
    self, component_type: StackComponentType, name: str
) -> None:
    """Deregisters a stack component.

    Args:
        component_type: The type of the component to deregister.
        name: The name of the component to deregister.

    Raises:
        ValueError: if trying to deregister a component that's part
            of a stack.
    """
    for stack_name, stack_config in self.stack_configurations.items():
        if stack_config.get(component_type) == name:
            raise ValueError(
                f"Unable to deregister stack component (type: "
                f"{component_type}, name: {name}) that is part of a "
                f"registered stack (stack name: '{stack_name}')."
            )
    self._delete_stack_component(component_type, name=name)
get_local_url(path) staticmethod

Get a local URL for a given local path.

Parameters:

Name Type Description Default
path str

the path string to build a URL out of.

required

Returns:

Type Description
str

Url pointing to the path for the store type.

Source code in zenml/stack_stores/base_stack_store.py
@staticmethod
@abstractmethod
def get_local_url(path: str) -> str:
    """Get a local URL for a given local path.

    Args:
         path: the path string to build a URL out of.

    Returns:
        Url pointing to the path for the store type.
    """
get_path_from_url(url) staticmethod

Get the path from a URL, if it points or is backed by a local file.

Parameters:

Name Type Description Default
url str

The URL to get the path from.

required

Returns:

Type Description
Optional[pathlib.Path]

The local path backed by the URL, or None if the URL is not backed by a local file or directory

Source code in zenml/stack_stores/base_stack_store.py
@staticmethod
@abstractmethod
def get_path_from_url(url: str) -> Optional[Path]:
    """Get the path from a URL, if it points or is backed by a local file.

    Args:
        url: The URL to get the path from.

    Returns:
        The local path backed by the URL, or None if the URL is not backed
        by a local file or directory
    """
get_stack(self, name)

Fetch a stack by name.

Parameters:

Name Type Description Default
name str

The name of the stack to retrieve.

required

Returns:

Type Description
StackWrapper

StackWrapper instance if the stack exists.

Exceptions:

Type Description
KeyError

If no stack exists for the given name.

Source code in zenml/stack_stores/base_stack_store.py
def get_stack(self, name: str) -> StackWrapper:
    """Fetch a stack by name.

    Args:
        name: The name of the stack to retrieve.

    Returns:
        StackWrapper instance if the stack exists.

    Raises:
        KeyError: If no stack exists for the given name.
    """
    return self._stack_from_dict(name, self.get_stack_configuration(name))
get_stack_component(self, component_type, name)

Get a registered stack component.

Exceptions:

Type Description
KeyError

If no component with the requested type and name exists.

Source code in zenml/stack_stores/base_stack_store.py
def get_stack_component(
    self, component_type: StackComponentType, name: str
) -> StackComponentWrapper:
    """Get a registered stack component.

    Raises:
        KeyError: If no component with the requested type and name exists.
    """
    flavor, config = self._get_component_flavor_and_config(
        component_type, name=name
    )
    uuid = yaml.safe_load(base64.b64decode(config).decode())["uuid"]
    return StackComponentWrapper(
        type=component_type,
        flavor=flavor,
        name=name,
        uuid=uuid,
        config=config,
    )
get_stack_components(self, component_type)

Fetches all registered stack components of the given type.

Parameters:

Name Type Description Default
component_type StackComponentType

StackComponentType to list members of

required

Returns:

Type Description
List[zenml.stack_stores.models.stack_component_wrapper.StackComponentWrapper]

A list of StackComponentConfiguration instances.

Source code in zenml/stack_stores/base_stack_store.py
def get_stack_components(
    self, component_type: StackComponentType
) -> List[StackComponentWrapper]:
    """Fetches all registered stack components of the given type.

    Args:
        component_type: StackComponentType to list members of

    Returns:
        A list of StackComponentConfiguration instances.
    """
    return [
        self.get_stack_component(component_type=component_type, name=name)
        for name in self._get_stack_component_names(component_type)
    ]
get_stack_configuration(self, name)

Fetches a stack configuration by name.

Parameters:

Name Type Description Default
name str

The name of the stack to fetch.

required

Returns:

Type Description
Dict[zenml.enums.StackComponentType, str]

Dict[StackComponentType, str] for the requested stack name.

Exceptions:

Type Description
KeyError

If no stack exists for the given name.

Source code in zenml/stack_stores/base_stack_store.py
@abstractmethod
def get_stack_configuration(
    self, name: str
) -> Dict[StackComponentType, str]:
    """Fetches a stack configuration by name.

    Args:
        name: The name of the stack to fetch.

    Returns:
        Dict[StackComponentType, str] for the requested stack name.

    Raises:
        KeyError: If no stack exists for the given name.
    """
initialize(self, url, skip_default_stack=False, *args, **kwargs)

Initialize the store.

Parameters:

Name Type Description Default
url str

The URL of the store.

required
skip_default_stack bool

If True, the creation of the default stack will be skipped.

False
*args Any

Additional arguments to pass to the concrete store implementation.

()
**kwargs Any

Additional keyword arguments to pass to the concrete store implementation.

{}

Returns:

Type Description
BaseStackStore

The initialized concrete store instance.

Source code in zenml/stack_stores/base_stack_store.py
def initialize(
    self,
    url: str,
    skip_default_stack: bool = False,
    *args: Any,
    **kwargs: Any,
) -> "BaseStackStore":
    """Initialize the store.
    Args:
        url: The URL of the store.
        skip_default_stack: If True, the creation of the default stack will
            be skipped.
        *args: Additional arguments to pass to the concrete store
            implementation.
        **kwargs: Additional keyword arguments to pass to the concrete
            store implementation.
    Returns:
        The initialized concrete store instance.
    """

    if not skip_default_stack and self.is_empty:

        logger.info("Initializing store...")
        self.register_default_stack()

    return self
is_valid_url(url) staticmethod

Check if the given url is valid.

Source code in zenml/stack_stores/base_stack_store.py
@staticmethod
@abstractmethod
def is_valid_url(url: str) -> bool:
    """Check if the given url is valid."""
register_default_stack(self)

Populates the store with the default Stack.

The default stack contains a local orchestrator, a local artifact store and a local SQLite metadata store.

Source code in zenml/stack_stores/base_stack_store.py
def register_default_stack(self) -> None:
    """Populates the store with the default Stack.

    The default stack contains a local orchestrator,
    a local artifact store and a local SQLite metadata store.
    """
    stack = Stack.default_local_stack()
    metadata = self.register_stack(StackWrapper.from_stack(stack))
    metadata["store_type"] = self.type.value
    track_event(AnalyticsEvent.REGISTERED_STACK, metadata=metadata)
register_stack(self, stack)

Register a stack and its components.

If any of the stacks' components aren't registered in the stack store yet, this method will try to register them as well.

Parameters:

Name Type Description Default
stack StackWrapper

The stack to register.

required

Returns:

Type Description
Dict[str, str]

metadata dict for telemetry or logging.

Exceptions:

Type Description
StackExistsError

If a stack with the same name already exists.

StackComponentExistsError

If a component of the stack wasn't registered and a different component with the same name already exists.

Source code in zenml/stack_stores/base_stack_store.py
def register_stack(self, stack: StackWrapper) -> Dict[str, str]:
    """Register a stack and its components.

    If any of the stacks' components aren't registered in the stack store
    yet, this method will try to register them as well.

    Args:
        stack: The stack to register.

    Returns:
        metadata dict for telemetry or logging.

    Raises:
        StackExistsError: If a stack with the same name already exists.
        StackComponentExistsError: If a component of the stack wasn't
            registered and a different component with the same name
            already exists.
    """
    try:
        self.get_stack(stack.name)
    except KeyError:
        pass
    else:
        raise StackExistsError(
            f"Unable to register stack with name '{stack.name}': Found "
            f"existing stack with this name."
        )

    def __check_component(
        component: StackComponentWrapper,
    ) -> Tuple[StackComponentType, str]:
        """Try to register a stack component, if it doesn't exist.

        Args:
            component: StackComponentWrapper to register.

        Returns:
            metadata key value pair for telemetry.

        Raises:
            StackComponentExistsError: If a component with same name exists.
        """
        try:
            existing_component = self.get_stack_component(
                component_type=component.type, name=component.name
            )
            if existing_component.uuid != component.uuid:
                raise StackComponentExistsError(
                    f"Unable to register one of the stacks components: "
                    f"A component of type '{component.type}' and name "
                    f"'{component.name}' already exists."
                )
        except KeyError:
            self.register_stack_component(component)
        return component.type, component.name

    stack_configuration = {
        typ: name for typ, name in map(__check_component, stack.components)
    }
    metadata = {c.type.value: c.flavor for c in stack.components}
    self._create_stack(stack.name, stack_configuration)
    return metadata
register_stack_component(self, component)

Register a stack component.

Parameters:

Name Type Description Default
component StackComponentWrapper

The component to register.

required

Exceptions:

Type Description
StackComponentExistsError

If a stack component with the same type and name already exists.

Source code in zenml/stack_stores/base_stack_store.py
@abstractmethod
def register_stack_component(
    self,
    component: StackComponentWrapper,
) -> None:
    """Register a stack component.

    Args:
        component: The component to register.

    Raises:
        StackComponentExistsError: If a stack component with the same type
            and name already exists.
    """

local_stack_store

LocalStackStore (BaseStackStore)

Source code in zenml/stack_stores/local_stack_store.py
class LocalStackStore(BaseStackStore):
    def initialize(
        self,
        url: str,
        *args: Any,
        stack_data: Optional[StackStoreModel] = None,
        **kwargs: Any,
    ) -> "LocalStackStore":
        """Initializes a local stack store instance.

        Args:
            url: URL of local directory of the repository to use for
                stack storage.
            stack_data: optional stack data store object to pre-populate the
                stack store with.
            args: additional positional arguments (ignored).
            kwargs: additional keyword arguments (ignored).

        Returns:
            The initialized stack store instance.
        """
        if not self.is_valid_url(url):
            raise ValueError(f"Invalid URL for local store: {url}")

        self._root = self.get_path_from_url(url)
        self._url = f"file://{self._root}"
        utils.create_dir_recursive_if_not_exists(str(self._root))

        if stack_data is not None:
            self.__store = stack_data
            self._write_store()
        elif fileio.exists(self._store_path()):
            config_dict = yaml_utils.read_yaml(self._store_path())
            self.__store = StackStoreModel.parse_obj(config_dict)
        else:
            self.__store = StackStoreModel.empty_store()
            self._write_store()

        super().initialize(url, *args, **kwargs)
        return self

    # Public interface implementations:

    @property
    def type(self) -> StoreType:
        """The type of stack store."""
        return StoreType.LOCAL

    @property
    def url(self) -> str:
        """URL of the repository."""
        return self._url

    @staticmethod
    def get_path_from_url(url: str) -> Optional[Path]:
        """Get the path from a URL.

        Args:
            url: The URL to get the path from.

        Returns:
            The path from the URL.
        """
        if not LocalStackStore.is_valid_url(url):
            raise ValueError(f"Invalid URL for local store: {url}")
        url = url.replace("file://", "")
        return Path(url)

    @staticmethod
    def get_local_url(path: str) -> str:
        """Get a local URL for a given local path."""
        return f"file://{path}"

    @staticmethod
    def is_valid_url(url: str) -> bool:
        """Check if the given url is a valid local path."""
        scheme = re.search("^([a-z0-9]+://)", url)
        return not scheme or scheme.group() == "file://"

    @property
    def is_empty(self) -> bool:
        """Check if the stack store is empty."""
        return len(self.__store.stacks) == 0

    def get_stack_configuration(
        self, name: str
    ) -> Dict[StackComponentType, str]:
        """Fetches a stack configuration by name.

        Args:
            name: The name of the stack to fetch.

        Returns:
            Dict[StackComponentType, str] for the requested stack name.

        Raises:
            KeyError: If no stack exists for the given name.
        """
        logger.debug("Fetching stack with name '%s'.", name)
        if name not in self.__store.stacks:
            raise KeyError(
                f"Unable to find stack with name '{name}'. Available names: "
                f"{set(self.__store.stacks)}."
            )

        return self.__store.stacks[name]

    @property
    def stack_configurations(self) -> Dict[str, Dict[StackComponentType, str]]:
        """Configuration for all stacks registered in this stack store.

        Returns:
            Dictionary mapping stack names to Dict[StackComponentType, str]
        """
        return self.__store.stacks.copy()

    def register_stack_component(
        self,
        component: StackComponentWrapper,
    ) -> None:
        """Register a stack component.

        Args:
            component: The component to register.

        Raises:
            StackComponentExistsError: If a stack component with the same type
                and name already exists.
        """
        components = self.__store.stack_components[component.type]
        if component.name in components:
            raise StackComponentExistsError(
                f"Unable to register stack component (type: {component.type}) "
                f"with name '{component.name}': Found existing stack component "
                f"with this name."
            )

        # write the component configuration file
        component_config_path = self._get_stack_component_config_path(
            component_type=component.type, name=component.name
        )
        utils.create_dir_recursive_if_not_exists(
            os.path.dirname(component_config_path)
        )
        utils.write_file_contents_as_string(
            component_config_path,
            base64.b64decode(component.config).decode(),
        )

        # add the component to the stack store dict and write it to disk
        components[component.name] = component.flavor
        self._write_store()
        logger.info(
            "Registered stack component with type '%s' and name '%s'.",
            component.type,
            component.name,
        )

    def deregister_stack(self, name: str) -> None:
        """Remove a stack from storage.

        Args:
            name: The name of the stack to be deleted.

        Raises:
            KeyError: If no stack exists for the given name.
        """
        del self.__store.stacks[name]
        self._write_store()

    # Private interface implementations:

    def _create_stack(
        self, name: str, stack_configuration: Dict[StackComponentType, str]
    ) -> None:
        """Add a stack to storage.

        Args:
            name: The name to save the stack as.
            stack_configuration: Dict[StackComponentType, str] to persist.
        """
        self.__store.stacks[name] = stack_configuration
        self._write_store()
        logger.info("Registered stack with name '%s'.", name)

    def _get_component_flavor_and_config(
        self, component_type: StackComponentType, name: str
    ) -> Tuple[str, bytes]:
        """Fetch the flavor and configuration for a stack component.

        Args:
            component_type: The type of the component to fetch.
            name: The name of the component to fetch.

        Returns:
            Pair of (flavor, configuration) for stack component, as string and
            base64-encoded yaml document, respectively

        Raises:
            KeyError: If no stack component exists for the given type and name.
        """
        components: Dict[str, str] = self.__store.stack_components[
            component_type
        ]
        if name not in components:
            raise KeyError(
                f"Unable to find stack component (type: {component_type}) "
                f"with name '{name}'. Available names: {set(components)}."
            )

        component_config_path = self._get_stack_component_config_path(
            component_type=component_type, name=name
        )
        flavor = components[name]
        config = base64.b64encode(
            utils.read_file_contents_as_string(component_config_path).encode()
        )
        return flavor, config

    def _get_stack_component_names(
        self, component_type: StackComponentType
    ) -> List[str]:
        """Get names of all registered stack components of a given type."""
        return list(self.__store.stack_components[component_type])

    def _delete_stack_component(
        self, component_type: StackComponentType, name: str
    ) -> None:
        """Remove a StackComponent from storage.

        Args:
            component_type: The type of component to delete.
            name: Then name of the component to delete.

        Raises:
            KeyError: If no component exists for given type and name.
        """
        component_config_path = self._get_stack_component_config_path(
            component_type=component_type, name=name
        )

        if fileio.exists(component_config_path):
            fileio.remove(component_config_path)

        components = self.__store.stack_components[component_type]
        del components[name]
        self._write_store()

    # Implementation-specific internal methods:

    @property
    def root(self) -> Path:
        """The root directory of the stack store."""
        if not self._root:
            raise RuntimeError(
                "Local stack store has not been initialized. Call `initialize` "
                "before using the store."
            )
        return self._root

    def _get_stack_component_config_path(
        self, component_type: StackComponentType, name: str
    ) -> str:
        """Path to the configuration file of a stack component."""
        path = self.root / component_type.plural / f"{name}.yaml"
        return str(path)

    def _store_path(self) -> str:
        """Path to the stack store yaml file."""
        return str(self.root / "stacks.yaml")

    def _write_store(self) -> None:
        """Writes the stack store yaml file."""
        config_dict = json.loads(self.__store.json())
        yaml_utils.write_yaml(self._store_path(), config_dict)
is_empty: bool property readonly

Check if the stack store is empty.

root: Path property readonly

The root directory of the stack store.

stack_configurations: Dict[str, Dict[zenml.enums.StackComponentType, str]] property readonly

Configuration for all stacks registered in this stack store.

Returns:

Type Description
Dict[str, Dict[zenml.enums.StackComponentType, str]]

Dictionary mapping stack names to Dict[StackComponentType, str]

type: StoreType property readonly

The type of stack store.

url: str property readonly

URL of the repository.

deregister_stack(self, name)

Remove a stack from storage.

Parameters:

Name Type Description Default
name str

The name of the stack to be deleted.

required

Exceptions:

Type Description
KeyError

If no stack exists for the given name.

Source code in zenml/stack_stores/local_stack_store.py
def deregister_stack(self, name: str) -> None:
    """Remove a stack from storage.

    Args:
        name: The name of the stack to be deleted.

    Raises:
        KeyError: If no stack exists for the given name.
    """
    del self.__store.stacks[name]
    self._write_store()
get_local_url(path) staticmethod

Get a local URL for a given local path.

Source code in zenml/stack_stores/local_stack_store.py
@staticmethod
def get_local_url(path: str) -> str:
    """Get a local URL for a given local path."""
    return f"file://{path}"
get_path_from_url(url) staticmethod

Get the path from a URL.

Parameters:

Name Type Description Default
url str

The URL to get the path from.

required

Returns:

Type Description
Optional[pathlib.Path]

The path from the URL.

Source code in zenml/stack_stores/local_stack_store.py
@staticmethod
def get_path_from_url(url: str) -> Optional[Path]:
    """Get the path from a URL.

    Args:
        url: The URL to get the path from.

    Returns:
        The path from the URL.
    """
    if not LocalStackStore.is_valid_url(url):
        raise ValueError(f"Invalid URL for local store: {url}")
    url = url.replace("file://", "")
    return Path(url)
get_stack_configuration(self, name)

Fetches a stack configuration by name.

Parameters:

Name Type Description Default
name str

The name of the stack to fetch.

required

Returns:

Type Description
Dict[zenml.enums.StackComponentType, str]

Dict[StackComponentType, str] for the requested stack name.

Exceptions:

Type Description
KeyError

If no stack exists for the given name.

Source code in zenml/stack_stores/local_stack_store.py
def get_stack_configuration(
    self, name: str
) -> Dict[StackComponentType, str]:
    """Fetches a stack configuration by name.

    Args:
        name: The name of the stack to fetch.

    Returns:
        Dict[StackComponentType, str] for the requested stack name.

    Raises:
        KeyError: If no stack exists for the given name.
    """
    logger.debug("Fetching stack with name '%s'.", name)
    if name not in self.__store.stacks:
        raise KeyError(
            f"Unable to find stack with name '{name}'. Available names: "
            f"{set(self.__store.stacks)}."
        )

    return self.__store.stacks[name]
initialize(self, url, *args, *, stack_data=None, **kwargs)

Initializes a local stack store instance.

Parameters:

Name Type Description Default
url str

URL of local directory of the repository to use for stack storage.

required
stack_data Optional[zenml.stack_stores.models.stack_store_model.StackStoreModel]

optional stack data store object to pre-populate the stack store with.

None
args Any

additional positional arguments (ignored).

()
kwargs Any

additional keyword arguments (ignored).

{}

Returns:

Type Description
LocalStackStore

The initialized stack store instance.

Source code in zenml/stack_stores/local_stack_store.py
def initialize(
    self,
    url: str,
    *args: Any,
    stack_data: Optional[StackStoreModel] = None,
    **kwargs: Any,
) -> "LocalStackStore":
    """Initializes a local stack store instance.

    Args:
        url: URL of local directory of the repository to use for
            stack storage.
        stack_data: optional stack data store object to pre-populate the
            stack store with.
        args: additional positional arguments (ignored).
        kwargs: additional keyword arguments (ignored).

    Returns:
        The initialized stack store instance.
    """
    if not self.is_valid_url(url):
        raise ValueError(f"Invalid URL for local store: {url}")

    self._root = self.get_path_from_url(url)
    self._url = f"file://{self._root}"
    utils.create_dir_recursive_if_not_exists(str(self._root))

    if stack_data is not None:
        self.__store = stack_data
        self._write_store()
    elif fileio.exists(self._store_path()):
        config_dict = yaml_utils.read_yaml(self._store_path())
        self.__store = StackStoreModel.parse_obj(config_dict)
    else:
        self.__store = StackStoreModel.empty_store()
        self._write_store()

    super().initialize(url, *args, **kwargs)
    return self
is_valid_url(url) staticmethod

Check if the given url is a valid local path.

Source code in zenml/stack_stores/local_stack_store.py
@staticmethod
def is_valid_url(url: str) -> bool:
    """Check if the given url is a valid local path."""
    scheme = re.search("^([a-z0-9]+://)", url)
    return not scheme or scheme.group() == "file://"
register_stack_component(self, component)

Register a stack component.

Parameters:

Name Type Description Default
component StackComponentWrapper

The component to register.

required

Exceptions:

Type Description
StackComponentExistsError

If a stack component with the same type and name already exists.

Source code in zenml/stack_stores/local_stack_store.py
def register_stack_component(
    self,
    component: StackComponentWrapper,
) -> None:
    """Register a stack component.

    Args:
        component: The component to register.

    Raises:
        StackComponentExistsError: If a stack component with the same type
            and name already exists.
    """
    components = self.__store.stack_components[component.type]
    if component.name in components:
        raise StackComponentExistsError(
            f"Unable to register stack component (type: {component.type}) "
            f"with name '{component.name}': Found existing stack component "
            f"with this name."
        )

    # write the component configuration file
    component_config_path = self._get_stack_component_config_path(
        component_type=component.type, name=component.name
    )
    utils.create_dir_recursive_if_not_exists(
        os.path.dirname(component_config_path)
    )
    utils.write_file_contents_as_string(
        component_config_path,
        base64.b64decode(component.config).decode(),
    )

    # add the component to the stack store dict and write it to disk
    components[component.name] = component.flavor
    self._write_store()
    logger.info(
        "Registered stack component with type '%s' and name '%s'.",
        component.type,
        component.name,
    )

models special

stack_component_wrapper

StackComponentWrapper (BaseModel) pydantic-model

Serializable Configuration of a StackComponent

Source code in zenml/stack_stores/models/stack_component_wrapper.py
class StackComponentWrapper(BaseModel):
    """Serializable Configuration of a StackComponent"""

    type: StackComponentType
    flavor: str  # due to subclassing, can't properly use enum type here
    name: str
    uuid: UUID
    config: bytes  # b64 encoded yaml config

    @classmethod
    def from_component(
        cls, component: StackComponent
    ) -> "StackComponentWrapper":
        return cls(
            type=component.TYPE,
            flavor=component.FLAVOR,
            name=component.name,
            uuid=component.uuid,
            config=base64.b64encode(
                yaml.dump(json.loads(component.json())).encode()
            ),
        )

stack_store_model

StackStoreModel (BaseModel) pydantic-model

Pydantic object used for serializing a ZenML Stack Store.

Attributes:

Name Type Description
version

zenml version number

stacks Dict[str, Dict[zenml.enums.StackComponentType, str]]

Maps stack names to a configuration object containing the names and flavors of all stack components.

stack_components DefaultDict[zenml.enums.StackComponentType, Dict[str, str]]

Contains names and flavors of all registered stack components.

Source code in zenml/stack_stores/models/stack_store_model.py
class StackStoreModel(BaseModel):
    """Pydantic object used for serializing a ZenML Stack Store.

    Attributes:
        version: zenml version number
        stacks: Maps stack names to a configuration object containing the
            names and flavors of all stack components.
        stack_components: Contains names and flavors of all registered stack
            components.
    """

    stacks: Dict[str, Dict[StackComponentType, str]]
    stack_components: DefaultDict[StackComponentType, Dict[str, str]]

    @validator("stack_components")
    def _construct_defaultdict(
        cls, stack_components: Dict[StackComponentType, Dict[str, str]]
    ) -> DefaultDict[StackComponentType, Dict[str, str]]:
        """Ensures that `stack_components` is a defaultdict so stack
        components of a new component type can be added without issues."""
        return defaultdict(dict, stack_components)

    @classmethod
    def empty_store(cls) -> "StackStoreModel":
        """Initialize a new empty stack store with current zen version."""
        return cls(stacks={}, stack_components={})

    class Config:
        """Pydantic configuration class."""

        # Validate attributes when assigning them. We need to set this in order
        # to have a mix of mutable and immutable attributes
        validate_assignment = True
        # Ignore extra attributes from configs of previous ZenML versions
        extra = "ignore"
Config

Pydantic configuration class.

Source code in zenml/stack_stores/models/stack_store_model.py
class Config:
    """Pydantic configuration class."""

    # Validate attributes when assigning them. We need to set this in order
    # to have a mix of mutable and immutable attributes
    validate_assignment = True
    # Ignore extra attributes from configs of previous ZenML versions
    extra = "ignore"
empty_store() classmethod

Initialize a new empty stack store with current zen version.

Source code in zenml/stack_stores/models/stack_store_model.py
@classmethod
def empty_store(cls) -> "StackStoreModel":
    """Initialize a new empty stack store with current zen version."""
    return cls(stacks={}, stack_components={})

stack_wrapper

StackWrapper (BaseModel) pydantic-model

Network Serializable Wrapper describing a Stack.

Source code in zenml/stack_stores/models/stack_wrapper.py
class StackWrapper(BaseModel):
    """Network Serializable Wrapper describing a Stack."""

    name: str
    components: List[StackComponentWrapper]

    @classmethod
    def from_stack(cls, stack: Stack) -> "StackWrapper":
        return cls(
            name=stack.name,
            components=[
                StackComponentWrapper.from_component(component)
                for t, component in stack.components.items()
            ],
        )

rest_stack_store

RestStackStore (BaseStackStore)

StackStore implementation for accessing stack data from a REST api.

Source code in zenml/stack_stores/rest_stack_store.py
class RestStackStore(BaseStackStore):
    """StackStore implementation for accessing stack data from a REST api."""

    def initialize(
        self,
        url: str,
        *args: Any,
        **kwargs: Any,
    ) -> "RestStackStore":
        """Initializes a local stack store instance.

        Args:
            url: Endpoint URL of the service for stack storage.
            args: additional positional arguments (ignored).
            kwargs: additional keyword arguments (ignored).

        Returns:
            The initialized stack store instance.
        """
        if not self.is_valid_url(url.strip("/")):
            raise ValueError("Invalid URL for REST store: {url}")
        self._url = url.strip("/")
        if "skip_default_stack" not in kwargs:
            kwargs["skip_default_stack"] = True
        # breakpoint()
        super().initialize(url, *args, **kwargs)
        return self

    # Statics:

    @staticmethod
    def get_path_from_url(url: str) -> Optional[Path]:
        """Get the path from a URL, if it points or is backed by a local file.

        Args:
            url: The URL to get the path from.

        Returns:
            None, because there are no local paths from REST urls.
        """
        return None

    @staticmethod
    def get_local_url(path: str) -> str:
        """Get a local URL for a given local path.

        Args:
             path: the path string to build a URL out of.

        Returns:
            Url pointing to the path for the store type.

        Raises:
            NotImplementedError: always
        """
        raise NotImplementedError("Cannot build a REST url from a path.")

    @staticmethod
    def is_valid_url(url: str) -> bool:
        """Check if the given url is a valid local path."""
        scheme = re.search("^([a-z0-9]+://)", url)
        return (
            scheme is not None
            and scheme.group() in ("https://", "http://")
            and url[-1] != "/"
        )

    # Public Interface:

    @property
    def type(self) -> StoreType:
        """The type of stack store."""
        return StoreType.REST

    @property
    def url(self) -> str:
        """Get the stack store URL."""
        return self._url

    @property
    def is_empty(self) -> bool:
        """Check if the store is empty (no stacks are configured).

        The implementation of this method should check if the store is empty
        without having to load all the stacks from the persistent storage.
        """
        empty = self.get(IS_EMPTY)
        if not isinstance(empty, bool):
            raise ValueError(
                f"Bad API Response. Expected boolean, got:\n{empty}"
            )
        return empty

    def get_stack_configuration(
        self, name: str
    ) -> Dict[StackComponentType, str]:
        """Fetches a stack configuration by name.

        Args:
            name: The name of the stack to fetch.

        Returns:
            Dict[StackComponentType, str] for the requested stack name.

        Raises:
            KeyError: If no stack exists for the given name.
        """
        return self._parse_stack_configuration(
            self.get(f"{STACK_CONFIGURATIONS}/{name}")
        )

    @property
    def stack_configurations(self) -> Dict[str, Dict[StackComponentType, str]]:
        """Configurations for all stacks registered in this stack store.

        Returns:
            Dictionary mapping stack names to Dict[StackComponentType, str]'s
        """
        body = self.get(STACK_CONFIGURATIONS)
        if not isinstance(body, dict):
            raise ValueError(
                f"Bad API Response. Expected dict, got {type(body)}"
            )
        return {
            key: self._parse_stack_configuration(value)
            for key, value in body.items()
        }

    def register_stack_component(
        self,
        component: StackComponentWrapper,
    ) -> None:
        """Register a stack component.

        Args:
            component: The component to register.

        Raises:
            StackComponentExistsError: If a stack component with the same type
                and name already exists.
        """
        self.post(STACK_COMPONENTS, body=component)

    def deregister_stack(self, name: str) -> None:
        """Delete a stack from storage.

        Args:
            name: The name of the stack to be deleted.

        Raises:
            KeyError: If no stack exists for the given name.
        """
        self.delete(f"{STACKS}/{name}")

    # Custom implementations:

    @property
    def stacks(self) -> List[StackWrapper]:
        """All stacks registered in this repository."""
        body = self.get(STACKS)
        if not isinstance(body, list):
            raise ValueError(
                f"Bad API Response. Expected list, got {type(body)}"
            )
        return [StackWrapper.parse_obj(s) for s in body]

    def get_stack(self, name: str) -> StackWrapper:
        """Fetch a stack by name.

        Args:
            name: The name of the stack to retrieve.

        Returns:
            StackWrapper instance if the stack exists.

        Raises:
            KeyError: If no stack exists for the given name.
        """
        return StackWrapper.parse_obj(self.get(f"{STACKS}/{name}"))

    def register_stack(self, stack: StackWrapper) -> Dict[str, str]:
        """Register a stack and its components.

        If any of the stacks' components aren't registered in the stack store
        yet, this method will try to register them as well.

        Args:
            stack: The stack to register.

        Returns:
            metadata dict for telemetry or logging.

        Raises:
            StackExistsError: If a stack with the same name already exists.
            StackComponentExistsError: If a component of the stack wasn't
                registered and a different component with the same name
                already exists.
        """
        body = self.post(STACKS, stack)
        if isinstance(body, dict):
            return cast(Dict[str, str], body)
        else:
            raise ValueError(
                f"Bad API Response. Expected dict, got {type(body)}"
            )

    def get_stack_component(
        self, component_type: StackComponentType, name: str
    ) -> StackComponentWrapper:
        """Get a registered stack component.

        Raises:
            KeyError: If no component with the requested type and name exists.
        """
        return StackComponentWrapper.parse_obj(
            self.get(f"{STACK_COMPONENTS}/{component_type}/{name}")
        )

    def get_stack_components(
        self, component_type: StackComponentType
    ) -> List[StackComponentWrapper]:
        """Fetches all registered stack components of the given type.

        Args:
            component_type: StackComponentType to list members of

        Returns:
            A list of StackComponentConfiguration instances.
        """
        body = self.get(f"{STACK_COMPONENTS}/{component_type}")
        if not isinstance(body, list):
            raise ValueError(
                f"Bad API Response. Expected list, got {type(body)}"
            )
        return [StackComponentWrapper.parse_obj(c) for c in body]

    def deregister_stack_component(
        self, component_type: StackComponentType, name: str
    ) -> None:
        """Deregisters a stack component.

        Args:
            component_type: The type of the component to deregister.
            name: The name of the component to deregister.

        Raises:
            ValueError: if trying to deregister a component that's part
                of a stack.
        """
        self.delete(f"{STACK_COMPONENTS}/{component_type}/{name}")

    # Private interface shall not be implemented for REST store, instead the
    # API only provides all public methods, including the ones that would
    # otherwise be inherited from the BaseStackStore in other implementations.
    # Don't call these! ABC complains that they aren't implemented, but they
    # aren't needed with the custom implementations of base methods.

    def _create_stack(
        self, name: str, stack_configuration: Dict[StackComponentType, str]
    ) -> None:
        """Add a stack to storage"""
        raise NotImplementedError("Not to be accessed directly in client!")

    def _get_component_flavor_and_config(
        self, component_type: StackComponentType, name: str
    ) -> Tuple[str, bytes]:
        """Fetch the flavor and configuration for a stack component."""
        raise NotImplementedError("Not to be accessed directly in client!")

    def _get_stack_component_names(
        self, component_type: StackComponentType
    ) -> List[str]:
        """Get names of all registered stack components of a given type."""
        raise NotImplementedError("Not to be accessed directly in client!")

    def _delete_stack_component(
        self, component_type: StackComponentType, name: str
    ) -> None:
        """Remove a StackComponent from storage."""
        raise NotImplementedError("Not to be accessed directly in client!")

    # Implementation specific methods:

    def _parse_stack_configuration(
        self, to_parse: Json
    ) -> Dict[StackComponentType, str]:
        """Parse an API response into `Dict[StackComponentType, str]`."""
        if not isinstance(to_parse, dict):
            raise ValueError(
                f"Bad API Response. Expected dict, got {type(to_parse)}."
            )
        return {
            StackComponentType(typ): component_name
            for typ, component_name in to_parse.items()
        }

    def _handle_response(self, response: requests.Response) -> Json:
        """Handle API response, translating http status codes to Exception."""
        if response.status_code >= 200 and response.status_code < 300:
            try:
                payload: Json = response.json()
                return payload
            except requests.exceptions.JSONDecodeError:
                raise ValueError(
                    "Bad response from API. Expected json, got\n"
                    f"{response.text}"
                )
        elif response.status_code == 404:
            raise KeyError(*response.json().get("detail", (response.text,)))
        elif response.status_code == 409:
            if "StackComponentExistsError" in response.text:
                raise StackComponentExistsError(
                    *response.json().get("detail", (response.text,))
                )
            elif "StackExistsError" in response.text:
                raise StackExistsError(
                    *response.json().get("detail", (response.text,))
                )
            else:
                raise ValueError(
                    *response.json().get("detail", (response.text,))
                )
        elif response.status_code == 422:
            raise RuntimeError(*response.json().get("detail", (response.text,)))
        else:
            raise RuntimeError(
                "Error retrieving from API. Got response "
                f"{response.status_code} with body:\n{response.text}"
            )

    def get(self, path: str) -> Json:
        """Make a GET request to the given endpoint path."""
        return self._handle_response(requests.get(self.url + path))

    def delete(self, path: str) -> Json:
        """Make a GET request to the given endpoint path."""
        return self._handle_response(requests.delete(self.url + path))

    def post(self, path: str, body: BaseModel) -> Json:
        """Make a POST request to the given endpoint path."""
        endpoint = self.url + path
        return self._handle_response(requests.post(endpoint, data=body.json()))
is_empty: bool property readonly

Check if the store is empty (no stacks are configured).

The implementation of this method should check if the store is empty without having to load all the stacks from the persistent storage.

stack_configurations: Dict[str, Dict[zenml.enums.StackComponentType, str]] property readonly

Configurations for all stacks registered in this stack store.

Returns:

Type Description
Dict[str, Dict[zenml.enums.StackComponentType, str]]

Dictionary mapping stack names to Dict[StackComponentType, str]'s

stacks: List[zenml.stack_stores.models.stack_wrapper.StackWrapper] property readonly

All stacks registered in this repository.

type: StoreType property readonly

The type of stack store.

url: str property readonly

Get the stack store URL.

delete(self, path)

Make a GET request to the given endpoint path.

Source code in zenml/stack_stores/rest_stack_store.py
def delete(self, path: str) -> Json:
    """Make a GET request to the given endpoint path."""
    return self._handle_response(requests.delete(self.url + path))
deregister_stack(self, name)

Delete a stack from storage.

Parameters:

Name Type Description Default
name str

The name of the stack to be deleted.

required

Exceptions:

Type Description
KeyError

If no stack exists for the given name.

Source code in zenml/stack_stores/rest_stack_store.py
def deregister_stack(self, name: str) -> None:
    """Delete a stack from storage.

    Args:
        name: The name of the stack to be deleted.

    Raises:
        KeyError: If no stack exists for the given name.
    """
    self.delete(f"{STACKS}/{name}")
deregister_stack_component(self, component_type, name)

Deregisters a stack component.

Parameters:

Name Type Description Default
component_type StackComponentType

The type of the component to deregister.

required
name str

The name of the component to deregister.

required

Exceptions:

Type Description
ValueError

if trying to deregister a component that's part of a stack.

Source code in zenml/stack_stores/rest_stack_store.py
def deregister_stack_component(
    self, component_type: StackComponentType, name: str
) -> None:
    """Deregisters a stack component.

    Args:
        component_type: The type of the component to deregister.
        name: The name of the component to deregister.

    Raises:
        ValueError: if trying to deregister a component that's part
            of a stack.
    """
    self.delete(f"{STACK_COMPONENTS}/{component_type}/{name}")
get(self, path)

Make a GET request to the given endpoint path.

Source code in zenml/stack_stores/rest_stack_store.py
def get(self, path: str) -> Json:
    """Make a GET request to the given endpoint path."""
    return self._handle_response(requests.get(self.url + path))
get_local_url(path) staticmethod

Get a local URL for a given local path.

Parameters:

Name Type Description Default
path str

the path string to build a URL out of.

required

Returns:

Type Description
str

Url pointing to the path for the store type.

Exceptions:

Type Description
NotImplementedError

always

Source code in zenml/stack_stores/rest_stack_store.py
@staticmethod
def get_local_url(path: str) -> str:
    """Get a local URL for a given local path.

    Args:
         path: the path string to build a URL out of.

    Returns:
        Url pointing to the path for the store type.

    Raises:
        NotImplementedError: always
    """
    raise NotImplementedError("Cannot build a REST url from a path.")
get_path_from_url(url) staticmethod

Get the path from a URL, if it points or is backed by a local file.

Parameters:

Name Type Description Default
url str

The URL to get the path from.

required

Returns:

Type Description
Optional[pathlib.Path]

None, because there are no local paths from REST urls.

Source code in zenml/stack_stores/rest_stack_store.py
@staticmethod
def get_path_from_url(url: str) -> Optional[Path]:
    """Get the path from a URL, if it points or is backed by a local file.

    Args:
        url: The URL to get the path from.

    Returns:
        None, because there are no local paths from REST urls.
    """
    return None
get_stack(self, name)

Fetch a stack by name.

Parameters:

Name Type Description Default
name str

The name of the stack to retrieve.

required

Returns:

Type Description
StackWrapper

StackWrapper instance if the stack exists.

Exceptions:

Type Description
KeyError

If no stack exists for the given name.

Source code in zenml/stack_stores/rest_stack_store.py
def get_stack(self, name: str) -> StackWrapper:
    """Fetch a stack by name.

    Args:
        name: The name of the stack to retrieve.

    Returns:
        StackWrapper instance if the stack exists.

    Raises:
        KeyError: If no stack exists for the given name.
    """
    return StackWrapper.parse_obj(self.get(f"{STACKS}/{name}"))
get_stack_component(self, component_type, name)

Get a registered stack component.

Exceptions:

Type Description
KeyError

If no component with the requested type and name exists.

Source code in zenml/stack_stores/rest_stack_store.py
def get_stack_component(
    self, component_type: StackComponentType, name: str
) -> StackComponentWrapper:
    """Get a registered stack component.

    Raises:
        KeyError: If no component with the requested type and name exists.
    """
    return StackComponentWrapper.parse_obj(
        self.get(f"{STACK_COMPONENTS}/{component_type}/{name}")
    )
get_stack_components(self, component_type)

Fetches all registered stack components of the given type.

Parameters:

Name Type Description Default
component_type StackComponentType

StackComponentType to list members of

required

Returns:

Type Description
List[zenml.stack_stores.models.stack_component_wrapper.StackComponentWrapper]

A list of StackComponentConfiguration instances.

Source code in zenml/stack_stores/rest_stack_store.py
def get_stack_components(
    self, component_type: StackComponentType
) -> List[StackComponentWrapper]:
    """Fetches all registered stack components of the given type.

    Args:
        component_type: StackComponentType to list members of

    Returns:
        A list of StackComponentConfiguration instances.
    """
    body = self.get(f"{STACK_COMPONENTS}/{component_type}")
    if not isinstance(body, list):
        raise ValueError(
            f"Bad API Response. Expected list, got {type(body)}"
        )
    return [StackComponentWrapper.parse_obj(c) for c in body]
get_stack_configuration(self, name)

Fetches a stack configuration by name.

Parameters:

Name Type Description Default
name str

The name of the stack to fetch.

required

Returns:

Type Description
Dict[zenml.enums.StackComponentType, str]

Dict[StackComponentType, str] for the requested stack name.

Exceptions:

Type Description
KeyError

If no stack exists for the given name.

Source code in zenml/stack_stores/rest_stack_store.py
def get_stack_configuration(
    self, name: str
) -> Dict[StackComponentType, str]:
    """Fetches a stack configuration by name.

    Args:
        name: The name of the stack to fetch.

    Returns:
        Dict[StackComponentType, str] for the requested stack name.

    Raises:
        KeyError: If no stack exists for the given name.
    """
    return self._parse_stack_configuration(
        self.get(f"{STACK_CONFIGURATIONS}/{name}")
    )
initialize(self, url, *args, **kwargs)

Initializes a local stack store instance.

Parameters:

Name Type Description Default
url str

Endpoint URL of the service for stack storage.

required
args Any

additional positional arguments (ignored).

()
kwargs Any

additional keyword arguments (ignored).

{}

Returns:

Type Description
RestStackStore

The initialized stack store instance.

Source code in zenml/stack_stores/rest_stack_store.py
def initialize(
    self,
    url: str,
    *args: Any,
    **kwargs: Any,
) -> "RestStackStore":
    """Initializes a local stack store instance.

    Args:
        url: Endpoint URL of the service for stack storage.
        args: additional positional arguments (ignored).
        kwargs: additional keyword arguments (ignored).

    Returns:
        The initialized stack store instance.
    """
    if not self.is_valid_url(url.strip("/")):
        raise ValueError("Invalid URL for REST store: {url}")
    self._url = url.strip("/")
    if "skip_default_stack" not in kwargs:
        kwargs["skip_default_stack"] = True
    # breakpoint()
    super().initialize(url, *args, **kwargs)
    return self
is_valid_url(url) staticmethod

Check if the given url is a valid local path.

Source code in zenml/stack_stores/rest_stack_store.py
@staticmethod
def is_valid_url(url: str) -> bool:
    """Check if the given url is a valid local path."""
    scheme = re.search("^([a-z0-9]+://)", url)
    return (
        scheme is not None
        and scheme.group() in ("https://", "http://")
        and url[-1] != "/"
    )
post(self, path, body)

Make a POST request to the given endpoint path.

Source code in zenml/stack_stores/rest_stack_store.py
def post(self, path: str, body: BaseModel) -> Json:
    """Make a POST request to the given endpoint path."""
    endpoint = self.url + path
    return self._handle_response(requests.post(endpoint, data=body.json()))
register_stack(self, stack)

Register a stack and its components.

If any of the stacks' components aren't registered in the stack store yet, this method will try to register them as well.

Parameters:

Name Type Description Default
stack StackWrapper

The stack to register.

required

Returns:

Type Description
Dict[str, str]

metadata dict for telemetry or logging.

Exceptions:

Type Description
StackExistsError

If a stack with the same name already exists.

StackComponentExistsError

If a component of the stack wasn't registered and a different component with the same name already exists.

Source code in zenml/stack_stores/rest_stack_store.py
def register_stack(self, stack: StackWrapper) -> Dict[str, str]:
    """Register a stack and its components.

    If any of the stacks' components aren't registered in the stack store
    yet, this method will try to register them as well.

    Args:
        stack: The stack to register.

    Returns:
        metadata dict for telemetry or logging.

    Raises:
        StackExistsError: If a stack with the same name already exists.
        StackComponentExistsError: If a component of the stack wasn't
            registered and a different component with the same name
            already exists.
    """
    body = self.post(STACKS, stack)
    if isinstance(body, dict):
        return cast(Dict[str, str], body)
    else:
        raise ValueError(
            f"Bad API Response. Expected dict, got {type(body)}"
        )
register_stack_component(self, component)

Register a stack component.

Parameters:

Name Type Description Default
component StackComponentWrapper

The component to register.

required

Exceptions:

Type Description
StackComponentExistsError

If a stack component with the same type and name already exists.

Source code in zenml/stack_stores/rest_stack_store.py
def register_stack_component(
    self,
    component: StackComponentWrapper,
) -> None:
    """Register a stack component.

    Args:
        component: The component to register.

    Raises:
        StackComponentExistsError: If a stack component with the same type
            and name already exists.
    """
    self.post(STACK_COMPONENTS, body=component)

sql_stack_store

SqlStackStore (BaseStackStore)

Repository Implementation that uses SQL database backend

Source code in zenml/stack_stores/sql_stack_store.py
class SqlStackStore(BaseStackStore):
    """Repository Implementation that uses SQL database backend"""

    def initialize(
        self,
        url: str,
        *args: Any,
        **kwargs: Any,
    ) -> "SqlStackStore":
        """Initialize a new SqlStackStore.

        Args:
            url: odbc path to a database.
            args, kwargs: additional parameters for SQLModel.
        Returns:
            The initialized stack store instance.
        """
        if not self.is_valid_url(url):
            raise ValueError(f"Invalid URL for SQL store: {url}")

        logger.debug("Initializing SqlStackStore at %s", url)
        self._url = url

        local_path = self.get_path_from_url(url)
        if local_path:
            utils.create_dir_recursive_if_not_exists(str(local_path.parent))

        self.engine = create_engine(url, *args, **kwargs)
        SQLModel.metadata.create_all(self.engine)
        with Session(self.engine) as session:
            if not session.exec(select(ZenUser)).first():
                session.add(ZenUser(id=1, name="LocalZenUser"))
            session.commit()

        super().initialize(url, *args, **kwargs)
        return self

    # Public interface implementations:

    @property
    def type(self) -> StoreType:
        """The type of stack store."""
        return StoreType.SQL

    @property
    def url(self) -> str:
        """URL of the repository."""
        if not self._url:
            raise RuntimeError(
                "SQL stack store has not been initialized. Call `initialize` "
                "before using the store."
            )
        return self._url

    @staticmethod
    def get_path_from_url(url: str) -> Optional[Path]:
        """Get the local path from a URL, if it points to a local sqlite file.

        This method first checks that the URL is a valid SQLite URL, which is
        backed by a file in the local filesystem. All other types of supported
        SQLAlchemy connection URLs are considered non-local and won't return
        a valid local path.

        Args:
            url: The URL to get the path from.

        Returns:
            The path extracted from the URL, or None, if the URL does not
            point to a local sqlite file.
        """
        if not SqlStackStore.is_valid_url(url):
            raise ValueError(f"Invalid URL for SQL store: {url}")
        if not url.startswith("sqlite:///"):
            return None
        url = url.replace("sqlite:///", "")
        return Path(url)

    @staticmethod
    def get_local_url(path: str) -> str:
        """Get a local SQL url for a given local path."""
        return f"sqlite:///{path}/zenml.db"

    @staticmethod
    def is_valid_url(url: str) -> bool:
        """Check if the given url is a valid SQL url."""
        try:
            make_url(url)
        except ArgumentError:
            logger.debug("Invalid SQL URL: %s", url)
            return False

        return True

    @property
    def is_empty(self) -> bool:
        """Check if the stack store is empty."""
        with Session(self.engine) as session:
            return session.exec(select(ZenStack)).first() is None

    def get_stack_configuration(
        self, name: str
    ) -> Dict[StackComponentType, str]:
        """Fetches a stack configuration by name.

        Args:
            name: The name of the stack to fetch.

        Returns:
            Dict[StackComponentType, str] for the requested stack name.

        Raises:
            KeyError: If no stack exists for the given name.
        """
        logger.debug("Fetching stack with name '%s'.", name)
        # first check that the stack exists
        with Session(self.engine) as session:
            maybe_stack = session.exec(
                select(ZenStack).where(ZenStack.name == name)
            ).first()
        if maybe_stack is None:
            raise KeyError(
                f"Unable to find stack with name '{name}'. Available names: "
                f"{set(self.stack_names)}."
            )
        # then get all components assigned to that stack
        with Session(self.engine) as session:
            definitions_and_components = session.exec(
                select(ZenStackDefinition, ZenStackComponent)
                .where(
                    ZenStackDefinition.component_type
                    == ZenStackComponent.component_type
                )
                .where(
                    ZenStackDefinition.component_name == ZenStackComponent.name
                )
                .where(ZenStackDefinition.stack_name == name)
            )
            params = {
                component.component_type: component.name
                for _, component in definitions_and_components
            }
        return {StackComponentType(typ): name for typ, name in params.items()}

    @property
    def stack_configurations(self) -> Dict[str, Dict[StackComponentType, str]]:
        """Configuration for all stacks registered in this stack store.

        Returns:
            Dictionary mapping stack names to Dict[StackComponentType, str]
        """
        return {n: self.get_stack_configuration(n) for n in self.stack_names}

    def register_stack_component(
        self,
        component: StackComponentWrapper,
    ) -> None:
        """Register a stack component.

        Args:
            component: The component to register.

        Raises:
            StackComponentExistsError: If a stack component with the same type
                and name already exists.
        """
        with Session(self.engine) as session:
            existing_component = session.exec(
                select(ZenStackComponent)
                .where(ZenStackComponent.name == component.name)
                .where(ZenStackComponent.component_type == component.type)
            ).first()
            if existing_component is not None:
                raise StackComponentExistsError(
                    f"Unable to register stack component (type: "
                    f"{component.type}) with name '{component.name}': Found "
                    f"existing stack component with this name."
                )
            new_component = ZenStackComponent(
                component_type=component.type,
                name=component.name,
                component_flavor=component.flavor,
                configuration=component.config,
            )
            session.add(new_component)
            session.commit()

    def deregister_stack(self, name: str) -> None:
        """Delete a stack from storage.

        Args:
            name: The name of the stack to be deleted.

        Raises:
            KeyError: If no stack exists for the given name.
        """
        with Session(self.engine) as session:
            try:
                stack = session.exec(
                    select(ZenStack).where(ZenStack.name == name)
                ).one()
                session.delete(stack)
            except NoResultFound as error:
                raise KeyError from error
            definitions = session.exec(
                select(ZenStackDefinition).where(
                    ZenStackDefinition.stack_name == name
                )
            ).all()
            for definition in definitions:
                session.delete(definition)
            session.commit()

    # Private interface implementations:

    def _create_stack(
        self, name: str, stack_configuration: Dict[StackComponentType, str]
    ) -> None:
        """Add a stack to storage.

        Args:
            name: The name to save the stack as.
            stack_configuration: Dict[StackComponentType, str] to persist.
        """
        with Session(self.engine) as session:
            stack = ZenStack(name=name, created_by=1)
            session.add(stack)
            for ctype, cname in stack_configuration.items():
                if cname is not None:
                    session.add(
                        ZenStackDefinition(
                            stack_name=name,
                            component_type=ctype,
                            component_name=cname,
                        )
                    )
            session.commit()

    def _get_component_flavor_and_config(
        self, component_type: StackComponentType, name: str
    ) -> Tuple[str, bytes]:
        """Fetch the flavor and configuration for a stack component.

        Args:
            component_type: The type of the component to fetch.
            name: The name of the component to fetch.

        Returns:
            Pair of (flavor, configuration) for stack component, as string and
            base64-encoded yaml document, respectively

        Raises:
            KeyError: If no stack component exists for the given type and name.
        """
        with Session(self.engine) as session:
            component = session.exec(
                select(ZenStackComponent)
                .where(ZenStackComponent.component_type == component_type)
                .where(ZenStackComponent.name == name)
            ).one_or_none()
            if component is None:
                raise KeyError(
                    f"Unable to find stack component (type: {component_type}) "
                    f"with name '{name}'."
                )
        return component.component_flavor, component.configuration

    def _get_stack_component_names(
        self, component_type: StackComponentType
    ) -> List[str]:
        """Get names of all registered stack components of a given type.

        Args:
            component_type: The type of the component to list names for.

        Returns:
            A list of names as strings.
        """
        with Session(self.engine) as session:
            statement = select(ZenStackComponent).where(
                ZenStackComponent.component_type == component_type
            )
            return [component.name for component in session.exec(statement)]

    def _delete_stack_component(
        self, component_type: StackComponentType, name: str
    ) -> None:
        """Remove a StackComponent from storage.

        Args:
            component_type: The type of component to delete.
            name: Then name of the component to delete.

        Raises:
            KeyError: If no component exists for given type and name.
        """
        with Session(self.engine) as session:
            component = session.exec(
                select(ZenStackComponent)
                .where(ZenStackComponent.component_type == component_type)
                .where(ZenStackComponent.name == name)
            ).first()
            if component is not None:
                session.delete(component)
                session.commit()
            else:
                raise KeyError(
                    "Unable to deregister stack component (type: "
                    f"{component_type.value}) with name '{name}': No stack "
                    "component exists with this name."
                )

    # Implementation-specific internal methods:

    @property
    def stack_names(self) -> List[str]:
        """Names of all stacks registered in this StackStore."""
        with Session(self.engine) as session:
            return [s.name for s in session.exec(select(ZenStack))]
is_empty: bool property readonly

Check if the stack store is empty.

stack_configurations: Dict[str, Dict[zenml.enums.StackComponentType, str]] property readonly

Configuration for all stacks registered in this stack store.

Returns:

Type Description
Dict[str, Dict[zenml.enums.StackComponentType, str]]

Dictionary mapping stack names to Dict[StackComponentType, str]

stack_names: List[str] property readonly

Names of all stacks registered in this StackStore.

type: StoreType property readonly

The type of stack store.

url: str property readonly

URL of the repository.

deregister_stack(self, name)

Delete a stack from storage.

Parameters:

Name Type Description Default
name str

The name of the stack to be deleted.

required

Exceptions:

Type Description
KeyError

If no stack exists for the given name.

Source code in zenml/stack_stores/sql_stack_store.py
def deregister_stack(self, name: str) -> None:
    """Delete a stack from storage.

    Args:
        name: The name of the stack to be deleted.

    Raises:
        KeyError: If no stack exists for the given name.
    """
    with Session(self.engine) as session:
        try:
            stack = session.exec(
                select(ZenStack).where(ZenStack.name == name)
            ).one()
            session.delete(stack)
        except NoResultFound as error:
            raise KeyError from error
        definitions = session.exec(
            select(ZenStackDefinition).where(
                ZenStackDefinition.stack_name == name
            )
        ).all()
        for definition in definitions:
            session.delete(definition)
        session.commit()
get_local_url(path) staticmethod

Get a local SQL url for a given local path.

Source code in zenml/stack_stores/sql_stack_store.py
@staticmethod
def get_local_url(path: str) -> str:
    """Get a local SQL url for a given local path."""
    return f"sqlite:///{path}/zenml.db"
get_path_from_url(url) staticmethod

Get the local path from a URL, if it points to a local sqlite file.

This method first checks that the URL is a valid SQLite URL, which is backed by a file in the local filesystem. All other types of supported SQLAlchemy connection URLs are considered non-local and won't return a valid local path.

Parameters:

Name Type Description Default
url str

The URL to get the path from.

required

Returns:

Type Description
Optional[pathlib.Path]

The path extracted from the URL, or None, if the URL does not point to a local sqlite file.

Source code in zenml/stack_stores/sql_stack_store.py
@staticmethod
def get_path_from_url(url: str) -> Optional[Path]:
    """Get the local path from a URL, if it points to a local sqlite file.

    This method first checks that the URL is a valid SQLite URL, which is
    backed by a file in the local filesystem. All other types of supported
    SQLAlchemy connection URLs are considered non-local and won't return
    a valid local path.

    Args:
        url: The URL to get the path from.

    Returns:
        The path extracted from the URL, or None, if the URL does not
        point to a local sqlite file.
    """
    if not SqlStackStore.is_valid_url(url):
        raise ValueError(f"Invalid URL for SQL store: {url}")
    if not url.startswith("sqlite:///"):
        return None
    url = url.replace("sqlite:///", "")
    return Path(url)
get_stack_configuration(self, name)

Fetches a stack configuration by name.

Parameters:

Name Type Description Default
name str

The name of the stack to fetch.

required

Returns:

Type Description
Dict[zenml.enums.StackComponentType, str]

Dict[StackComponentType, str] for the requested stack name.

Exceptions:

Type Description
KeyError

If no stack exists for the given name.

Source code in zenml/stack_stores/sql_stack_store.py
def get_stack_configuration(
    self, name: str
) -> Dict[StackComponentType, str]:
    """Fetches a stack configuration by name.

    Args:
        name: The name of the stack to fetch.

    Returns:
        Dict[StackComponentType, str] for the requested stack name.

    Raises:
        KeyError: If no stack exists for the given name.
    """
    logger.debug("Fetching stack with name '%s'.", name)
    # first check that the stack exists
    with Session(self.engine) as session:
        maybe_stack = session.exec(
            select(ZenStack).where(ZenStack.name == name)
        ).first()
    if maybe_stack is None:
        raise KeyError(
            f"Unable to find stack with name '{name}'. Available names: "
            f"{set(self.stack_names)}."
        )
    # then get all components assigned to that stack
    with Session(self.engine) as session:
        definitions_and_components = session.exec(
            select(ZenStackDefinition, ZenStackComponent)
            .where(
                ZenStackDefinition.component_type
                == ZenStackComponent.component_type
            )
            .where(
                ZenStackDefinition.component_name == ZenStackComponent.name
            )
            .where(ZenStackDefinition.stack_name == name)
        )
        params = {
            component.component_type: component.name
            for _, component in definitions_and_components
        }
    return {StackComponentType(typ): name for typ, name in params.items()}
initialize(self, url, *args, **kwargs)

Initialize a new SqlStackStore.

Parameters:

Name Type Description Default
url str

odbc path to a database.

required
args, kwargs

additional parameters for SQLModel.

required

Returns:

Type Description
SqlStackStore

The initialized stack store instance.

Source code in zenml/stack_stores/sql_stack_store.py
def initialize(
    self,
    url: str,
    *args: Any,
    **kwargs: Any,
) -> "SqlStackStore":
    """Initialize a new SqlStackStore.

    Args:
        url: odbc path to a database.
        args, kwargs: additional parameters for SQLModel.
    Returns:
        The initialized stack store instance.
    """
    if not self.is_valid_url(url):
        raise ValueError(f"Invalid URL for SQL store: {url}")

    logger.debug("Initializing SqlStackStore at %s", url)
    self._url = url

    local_path = self.get_path_from_url(url)
    if local_path:
        utils.create_dir_recursive_if_not_exists(str(local_path.parent))

    self.engine = create_engine(url, *args, **kwargs)
    SQLModel.metadata.create_all(self.engine)
    with Session(self.engine) as session:
        if not session.exec(select(ZenUser)).first():
            session.add(ZenUser(id=1, name="LocalZenUser"))
        session.commit()

    super().initialize(url, *args, **kwargs)
    return self
is_valid_url(url) staticmethod

Check if the given url is a valid SQL url.

Source code in zenml/stack_stores/sql_stack_store.py
@staticmethod
def is_valid_url(url: str) -> bool:
    """Check if the given url is a valid SQL url."""
    try:
        make_url(url)
    except ArgumentError:
        logger.debug("Invalid SQL URL: %s", url)
        return False

    return True
register_stack_component(self, component)

Register a stack component.

Parameters:

Name Type Description Default
component StackComponentWrapper

The component to register.

required

Exceptions:

Type Description
StackComponentExistsError

If a stack component with the same type and name already exists.

Source code in zenml/stack_stores/sql_stack_store.py
def register_stack_component(
    self,
    component: StackComponentWrapper,
) -> None:
    """Register a stack component.

    Args:
        component: The component to register.

    Raises:
        StackComponentExistsError: If a stack component with the same type
            and name already exists.
    """
    with Session(self.engine) as session:
        existing_component = session.exec(
            select(ZenStackComponent)
            .where(ZenStackComponent.name == component.name)
            .where(ZenStackComponent.component_type == component.type)
        ).first()
        if existing_component is not None:
            raise StackComponentExistsError(
                f"Unable to register stack component (type: "
                f"{component.type}) with name '{component.name}': Found "
                f"existing stack component with this name."
            )
        new_component = ZenStackComponent(
            component_type=component.type,
            name=component.name,
            component_flavor=component.flavor,
            configuration=component.config,
        )
        session.add(new_component)
        session.commit()

ZenStackDefinition (SQLModel) pydantic-model

Join table between Stacks and StackComponents

Source code in zenml/stack_stores/sql_stack_store.py
class ZenStackDefinition(SQLModel, table=True):
    """Join table between Stacks and StackComponents"""

    stack_name: str = Field(primary_key=True, foreign_key="zenstack.name")
    component_type: StackComponentType = Field(
        primary_key=True, foreign_key="zenstackcomponent.component_type"
    )
    component_name: str = Field(
        primary_key=True, foreign_key="zenstackcomponent.name"
    )