Skip to content

Plugins

zenml.plugins special

base_plugin_flavor

Base implementation for all Plugin Flavors.

BasePlugin (ABC)

Base Class for all Plugins.

Source code in zenml/plugins/base_plugin_flavor.py
class BasePlugin(ABC):
    """Base Class for all Plugins."""

    @property
    def zen_store(self) -> "BaseZenStore":
        """Returns the active zen store.

        Returns:
            The active zen store.
        """
        return GlobalConfiguration().zen_store

    @property
    @abstractmethod
    def config_class(self) -> Type[BasePluginConfig]:
        """Returns the `BasePluginConfig` config.

        Returns:
            The configuration.
        """

    @property
    @abstractmethod
    def flavor_class(self) -> "Type[BasePluginFlavor]":
        """Returns the flavor class of the plugin.

        Returns:
            The flavor class of the plugin.
        """
config_class: Type[zenml.plugins.base_plugin_flavor.BasePluginConfig] property readonly

Returns the BasePluginConfig config.

Returns:

Type Description
Type[zenml.plugins.base_plugin_flavor.BasePluginConfig]

The configuration.

flavor_class: Type[BasePluginFlavor] property readonly

Returns the flavor class of the plugin.

Returns:

Type Description
Type[BasePluginFlavor]

The flavor class of the plugin.

zen_store: BaseZenStore property readonly

Returns the active zen store.

Returns:

Type Description
BaseZenStore

The active zen store.

BasePluginConfig (BaseModel, ABC)

Allows configuring of Event Source and Filter configuration.

Source code in zenml/plugins/base_plugin_flavor.py
class BasePluginConfig(BaseModel, ABC):
    """Allows configuring of Event Source and Filter configuration."""

    model_config = ConfigDict(
        # public attributes are mutable
        frozen=False,
        # ignore extra attributes during model initialization
        extra="ignore",
    )

BasePluginFlavor (ABC)

Base Class for all PluginFlavors.

Source code in zenml/plugins/base_plugin_flavor.py
class BasePluginFlavor(ABC):
    """Base Class for all PluginFlavors."""

    TYPE: ClassVar[PluginType]
    SUBTYPE: ClassVar[PluginSubType]
    FLAVOR: ClassVar[str]
    PLUGIN_CLASS: ClassVar[Type[BasePlugin]]

    @classmethod
    @abstractmethod
    def get_flavor_response_model(
        cls, hydrate: bool
    ) -> BasePluginFlavorResponse[Any, Any, Any]:
        """Convert the Flavor into a Response Model.

        Args:
            hydrate: Whether the model should be hydrated.
        """
get_flavor_response_model(hydrate) classmethod

Convert the Flavor into a Response Model.

Parameters:

Name Type Description Default
hydrate bool

Whether the model should be hydrated.

required
Source code in zenml/plugins/base_plugin_flavor.py
@classmethod
@abstractmethod
def get_flavor_response_model(
    cls, hydrate: bool
) -> BasePluginFlavorResponse[Any, Any, Any]:
    """Convert the Flavor into a Response Model.

    Args:
        hydrate: Whether the model should be hydrated.
    """

plugin_flavor_registry

Registry for all plugins.

PluginFlavorRegistry

Registry for plugin flavors.

Source code in zenml/plugins/plugin_flavor_registry.py
class PluginFlavorRegistry:
    """Registry for plugin flavors."""

    def __init__(self) -> None:
        """Initialize the event flavor registry."""
        self.plugin_flavors: Dict[
            PluginType, Dict[PluginSubType, Dict[str, RegistryEntry]]
        ] = {}
        self.register_plugin_flavors()

    @property
    def _types(self) -> List[PluginType]:
        """Returns all available types.

        Returns:
            List of all available plugin types.
        """
        return list(self.plugin_flavors.keys())

    def list_subtypes_within_type(
        self, _type: PluginType
    ) -> List[PluginSubType]:
        """Returns all available subtypes for a given type.

        Args:
            _type: The type of plugin

        Returns:
            A list of available plugin subtypes for this plugin type.
        """
        return list(self.plugin_flavors[_type].keys())

    def _flavor_entries(
        self, _type: PluginType, subtype: PluginSubType
    ) -> Dict[str, RegistryEntry]:
        """Get a list of all subtypes for a specific flavor and type.

        Args:
            _type: The type of Plugin
            subtype: The subtype of the plugin

        Returns:
            Dict of Registry entries sorted by flavor name.
        """
        if (
            _type in self.plugin_flavors
            and subtype in self.plugin_flavors[_type]
        ):
            return self.plugin_flavors[_type][subtype]
        else:
            return {}

    def list_available_flavors_for_type_and_subtype(
        self,
        _type: PluginType,
        subtype: PluginSubType,
    ) -> List[Type[BasePluginFlavor]]:
        """Get a list of all subtypes for a specific flavor and type.

        Args:
            _type: The type of Plugin
            subtype: The subtype of the plugin

        Returns:
            List of flavors for the given type/subtype combination.
        """
        flavors = list(
            [
                entry.flavor_class
                for _, entry in self._flavor_entries(
                    _type=_type, subtype=subtype
                ).items()
            ]
        )
        return flavors

    def list_available_flavor_responses_for_type_and_subtype(
        self,
        _type: PluginType,
        subtype: PluginSubType,
        page: int,
        size: int,
        hydrate: bool = False,
    ) -> Page["BasePluginFlavorResponse[Any, Any, Any]"]:
        """Get a list of all subtypes for a specific flavor and type.

        Args:
            _type: The type of Plugin
            subtype: The subtype of the plugin
            page: Page for pagination (offset +1)
            size: Page size for pagination
            hydrate: Whether to hydrate the response bodies

        Returns:
            A page of flavors.

        Raises:
            ValueError: If the page is out of range.
        """
        flavors = self.list_available_flavors_for_type_and_subtype(
            _type=_type,
            subtype=subtype,
        )
        total = len(flavors)
        if total == 0:
            total_pages = 1
        else:
            total_pages = math.ceil(total / size)

        if page > total_pages:
            raise ValueError(
                f"Invalid page {page}. The requested page size is "
                f"{size} and there are a total of {total} items "
                f"for this query. The maximum page value therefore is "
                f"{total_pages}."
            )

        start = (page - 1) * size
        end = start + size

        page_items = [
            flavor.get_flavor_response_model(hydrate=hydrate)
            for flavor in flavors[start:end]
        ]

        return Page(
            index=page,
            max_size=size,
            total_pages=total_pages,
            total=total,
            items=page_items,
        )

    @property
    def _builtin_flavors(self) -> Sequence[Type["BasePluginFlavor"]]:
        """A list of all default in-built flavors.

        Returns:
            A list of builtin flavors.
        """
        from zenml.actions.pipeline_run.pipeline_run_action import (
            PipelineRunActionFlavor,
        )

        flavors = [PipelineRunActionFlavor]
        return flavors

    @property
    def _integration_flavors(self) -> Sequence[Type["BasePluginFlavor"]]:
        """A list of all integration event flavors.

        Returns:
            A list of integration flavors.
        """
        # TODO: Only load active integrations
        integrated_flavors = []
        for _, integration in integration_registry.integrations.items():
            for flavor in integration.plugin_flavors():
                integrated_flavors.append(flavor)

        return integrated_flavors

    def _get_registry_entry(
        self,
        _type: PluginType,
        subtype: PluginSubType,
        flavor_name: str,
    ) -> RegistryEntry:
        """Get registry entry.

        Args:
            _type: Type of the entry.
            subtype: Subtype of the entry.
            flavor_name: Flavor of the entry.

        Returns:
            The registry entry.
        """
        return self.plugin_flavors[_type][subtype][flavor_name]

    def register_plugin_flavors(self) -> None:
        """Registers all flavors."""
        for flavor in self._builtin_flavors:
            self.register_plugin_flavor(flavor_class=flavor)
        for flavor in self._integration_flavors:
            self.register_plugin_flavor(flavor_class=flavor)

    def register_plugin_flavor(
        self, flavor_class: Type[BasePluginFlavor]
    ) -> None:
        """Registers a new event_source.

        Args:
            flavor_class: The flavor to register
        """
        try:
            self._get_registry_entry(
                _type=flavor_class.TYPE,
                subtype=flavor_class.SUBTYPE,
                flavor_name=flavor_class.FLAVOR,
            )
        except KeyError:
            (
                self.plugin_flavors.setdefault(flavor_class.TYPE, {})
                .setdefault(flavor_class.SUBTYPE, {})
                .setdefault(
                    flavor_class.FLAVOR,
                    RegistryEntry(flavor_class=flavor_class),
                )
            )
            logger.debug(
                f"Registered built in plugin {flavor_class.FLAVOR} for "
                f"plugin type {flavor_class.TYPE} and "
                f"subtype {flavor_class.SUBTYPE}: {flavor_class}"
            )
        else:
            logger.debug(
                f"Found existing flavor {flavor_class.FLAVOR} already "
                f"registered for this type {flavor_class.TYPE} and subtype "
                f"{flavor_class.SUBTYPE}. "
                f"Skipping registration for {flavor_class}."
            )

    def get_flavor_class(
        self,
        _type: PluginType,
        subtype: PluginSubType,
        name: str,
    ) -> Type[BasePluginFlavor]:
        """Get a single event_source based on the key.

        Args:
            _type: The type of plugin.
            subtype: The subtype of plugin.
            name: Indicates the name of the plugin flavor.

        Returns:
            `BaseEventConfiguration` subclass that was registered for this key.

        Raises:
            KeyError: If there is no entry at this type, subtype, flavor
        """
        try:
            return self._get_registry_entry(
                _type=_type,
                subtype=subtype,
                flavor_name=name,
            ).flavor_class
        except KeyError:
            raise KeyError(
                f"No flavor class found for flavor name {name} at type "
                f"{_type} and subtype {subtype}."
            )

    def get_plugin(
        self,
        _type: PluginType,
        subtype: PluginSubType,
        name: str,
    ) -> "BasePlugin":
        """Get the plugin based on the flavor, type and subtype.

        Args:
            name: The name of the plugin flavor.
            _type: The type of plugin.
            subtype: The subtype of plugin.

        Returns:
            Plugin instance associated with the flavor, type and subtype.

        Raises:
            KeyError: If no plugin is found for the given flavor, type and
                subtype.
            RuntimeError: If the plugin was not initialized.
        """
        try:
            plugin_entry = self._get_registry_entry(
                _type=_type,
                subtype=subtype,
                flavor_name=name,
            )
            if plugin_entry.plugin_instance is None:
                raise RuntimeError(
                    f"Plugin {plugin_entry.flavor_class} was not initialized."
                )
            return plugin_entry.plugin_instance
        except KeyError:
            raise KeyError(
                f"No flavor found for flavor name {name} and type "
                f"{_type} and subtype {subtype}."
            )

    def initialize_plugins(self) -> None:
        """Initializes all registered plugins."""
        for _, subtypes_dict in self.plugin_flavors.items():
            for _, flavor_dict in subtypes_dict.items():
                for _, registry_entry in flavor_dict.items():
                    # TODO: Only initialize if the integration is active
                    registry_entry.plugin_instance = (
                        registry_entry.flavor_class.PLUGIN_CLASS()
                    )
__init__(self) special

Initialize the event flavor registry.

Source code in zenml/plugins/plugin_flavor_registry.py
def __init__(self) -> None:
    """Initialize the event flavor registry."""
    self.plugin_flavors: Dict[
        PluginType, Dict[PluginSubType, Dict[str, RegistryEntry]]
    ] = {}
    self.register_plugin_flavors()
get_flavor_class(self, _type, subtype, name)

Get a single event_source based on the key.

Parameters:

Name Type Description Default
_type PluginType

The type of plugin.

required
subtype PluginSubType

The subtype of plugin.

required
name str

Indicates the name of the plugin flavor.

required

Returns:

Type Description
Type[zenml.plugins.base_plugin_flavor.BasePluginFlavor]

BaseEventConfiguration subclass that was registered for this key.

Exceptions:

Type Description
KeyError

If there is no entry at this type, subtype, flavor

Source code in zenml/plugins/plugin_flavor_registry.py
def get_flavor_class(
    self,
    _type: PluginType,
    subtype: PluginSubType,
    name: str,
) -> Type[BasePluginFlavor]:
    """Get a single event_source based on the key.

    Args:
        _type: The type of plugin.
        subtype: The subtype of plugin.
        name: Indicates the name of the plugin flavor.

    Returns:
        `BaseEventConfiguration` subclass that was registered for this key.

    Raises:
        KeyError: If there is no entry at this type, subtype, flavor
    """
    try:
        return self._get_registry_entry(
            _type=_type,
            subtype=subtype,
            flavor_name=name,
        ).flavor_class
    except KeyError:
        raise KeyError(
            f"No flavor class found for flavor name {name} at type "
            f"{_type} and subtype {subtype}."
        )
get_plugin(self, _type, subtype, name)

Get the plugin based on the flavor, type and subtype.

Parameters:

Name Type Description Default
name str

The name of the plugin flavor.

required
_type PluginType

The type of plugin.

required
subtype PluginSubType

The subtype of plugin.

required

Returns:

Type Description
BasePlugin

Plugin instance associated with the flavor, type and subtype.

Exceptions:

Type Description
KeyError

If no plugin is found for the given flavor, type and subtype.

RuntimeError

If the plugin was not initialized.

Source code in zenml/plugins/plugin_flavor_registry.py
def get_plugin(
    self,
    _type: PluginType,
    subtype: PluginSubType,
    name: str,
) -> "BasePlugin":
    """Get the plugin based on the flavor, type and subtype.

    Args:
        name: The name of the plugin flavor.
        _type: The type of plugin.
        subtype: The subtype of plugin.

    Returns:
        Plugin instance associated with the flavor, type and subtype.

    Raises:
        KeyError: If no plugin is found for the given flavor, type and
            subtype.
        RuntimeError: If the plugin was not initialized.
    """
    try:
        plugin_entry = self._get_registry_entry(
            _type=_type,
            subtype=subtype,
            flavor_name=name,
        )
        if plugin_entry.plugin_instance is None:
            raise RuntimeError(
                f"Plugin {plugin_entry.flavor_class} was not initialized."
            )
        return plugin_entry.plugin_instance
    except KeyError:
        raise KeyError(
            f"No flavor found for flavor name {name} and type "
            f"{_type} and subtype {subtype}."
        )
initialize_plugins(self)

Initializes all registered plugins.

Source code in zenml/plugins/plugin_flavor_registry.py
def initialize_plugins(self) -> None:
    """Initializes all registered plugins."""
    for _, subtypes_dict in self.plugin_flavors.items():
        for _, flavor_dict in subtypes_dict.items():
            for _, registry_entry in flavor_dict.items():
                # TODO: Only initialize if the integration is active
                registry_entry.plugin_instance = (
                    registry_entry.flavor_class.PLUGIN_CLASS()
                )
list_available_flavor_responses_for_type_and_subtype(self, _type, subtype, page, size, hydrate=False)

Get a list of all subtypes for a specific flavor and type.

Parameters:

Name Type Description Default
_type PluginType

The type of Plugin

required
subtype PluginSubType

The subtype of the plugin

required
page int

Page for pagination (offset +1)

required
size int

Page size for pagination

required
hydrate bool

Whether to hydrate the response bodies

False

Returns:

Type Description
Page[BasePluginFlavorResponse[Any, Any, Any]]

A page of flavors.

Exceptions:

Type Description
ValueError

If the page is out of range.

Source code in zenml/plugins/plugin_flavor_registry.py
def list_available_flavor_responses_for_type_and_subtype(
    self,
    _type: PluginType,
    subtype: PluginSubType,
    page: int,
    size: int,
    hydrate: bool = False,
) -> Page["BasePluginFlavorResponse[Any, Any, Any]"]:
    """Get a list of all subtypes for a specific flavor and type.

    Args:
        _type: The type of Plugin
        subtype: The subtype of the plugin
        page: Page for pagination (offset +1)
        size: Page size for pagination
        hydrate: Whether to hydrate the response bodies

    Returns:
        A page of flavors.

    Raises:
        ValueError: If the page is out of range.
    """
    flavors = self.list_available_flavors_for_type_and_subtype(
        _type=_type,
        subtype=subtype,
    )
    total = len(flavors)
    if total == 0:
        total_pages = 1
    else:
        total_pages = math.ceil(total / size)

    if page > total_pages:
        raise ValueError(
            f"Invalid page {page}. The requested page size is "
            f"{size} and there are a total of {total} items "
            f"for this query. The maximum page value therefore is "
            f"{total_pages}."
        )

    start = (page - 1) * size
    end = start + size

    page_items = [
        flavor.get_flavor_response_model(hydrate=hydrate)
        for flavor in flavors[start:end]
    ]

    return Page(
        index=page,
        max_size=size,
        total_pages=total_pages,
        total=total,
        items=page_items,
    )
list_available_flavors_for_type_and_subtype(self, _type, subtype)

Get a list of all subtypes for a specific flavor and type.

Parameters:

Name Type Description Default
_type PluginType

The type of Plugin

required
subtype PluginSubType

The subtype of the plugin

required

Returns:

Type Description
List[Type[zenml.plugins.base_plugin_flavor.BasePluginFlavor]]

List of flavors for the given type/subtype combination.

Source code in zenml/plugins/plugin_flavor_registry.py
def list_available_flavors_for_type_and_subtype(
    self,
    _type: PluginType,
    subtype: PluginSubType,
) -> List[Type[BasePluginFlavor]]:
    """Get a list of all subtypes for a specific flavor and type.

    Args:
        _type: The type of Plugin
        subtype: The subtype of the plugin

    Returns:
        List of flavors for the given type/subtype combination.
    """
    flavors = list(
        [
            entry.flavor_class
            for _, entry in self._flavor_entries(
                _type=_type, subtype=subtype
            ).items()
        ]
    )
    return flavors
list_subtypes_within_type(self, _type)

Returns all available subtypes for a given type.

Parameters:

Name Type Description Default
_type PluginType

The type of plugin

required

Returns:

Type Description
List[zenml.enums.PluginSubType]

A list of available plugin subtypes for this plugin type.

Source code in zenml/plugins/plugin_flavor_registry.py
def list_subtypes_within_type(
    self, _type: PluginType
) -> List[PluginSubType]:
    """Returns all available subtypes for a given type.

    Args:
        _type: The type of plugin

    Returns:
        A list of available plugin subtypes for this plugin type.
    """
    return list(self.plugin_flavors[_type].keys())
register_plugin_flavor(self, flavor_class)

Registers a new event_source.

Parameters:

Name Type Description Default
flavor_class Type[zenml.plugins.base_plugin_flavor.BasePluginFlavor]

The flavor to register

required
Source code in zenml/plugins/plugin_flavor_registry.py
def register_plugin_flavor(
    self, flavor_class: Type[BasePluginFlavor]
) -> None:
    """Registers a new event_source.

    Args:
        flavor_class: The flavor to register
    """
    try:
        self._get_registry_entry(
            _type=flavor_class.TYPE,
            subtype=flavor_class.SUBTYPE,
            flavor_name=flavor_class.FLAVOR,
        )
    except KeyError:
        (
            self.plugin_flavors.setdefault(flavor_class.TYPE, {})
            .setdefault(flavor_class.SUBTYPE, {})
            .setdefault(
                flavor_class.FLAVOR,
                RegistryEntry(flavor_class=flavor_class),
            )
        )
        logger.debug(
            f"Registered built in plugin {flavor_class.FLAVOR} for "
            f"plugin type {flavor_class.TYPE} and "
            f"subtype {flavor_class.SUBTYPE}: {flavor_class}"
        )
    else:
        logger.debug(
            f"Found existing flavor {flavor_class.FLAVOR} already "
            f"registered for this type {flavor_class.TYPE} and subtype "
            f"{flavor_class.SUBTYPE}. "
            f"Skipping registration for {flavor_class}."
        )
register_plugin_flavors(self)

Registers all flavors.

Source code in zenml/plugins/plugin_flavor_registry.py
def register_plugin_flavors(self) -> None:
    """Registers all flavors."""
    for flavor in self._builtin_flavors:
        self.register_plugin_flavor(flavor_class=flavor)
    for flavor in self._integration_flavors:
        self.register_plugin_flavor(flavor_class=flavor)

RegistryEntry (BaseModel)

Registry Entry Class for the Plugin Registry.

Source code in zenml/plugins/plugin_flavor_registry.py
class RegistryEntry(BaseModel):
    """Registry Entry Class for the Plugin Registry."""

    flavor_class: Type[BasePluginFlavor]
    plugin_instance: Optional[BasePlugin] = None
    model_config = ConfigDict(arbitrary_types_allowed=True)