Plugins
zenml.plugins
special
base_plugin_flavor
Base implementation for all Plugin Flavors.
BasePlugin (ABC)
Base Class for all Plugins.
Source code in zenml/plugins/base_plugin_flavor.py
class BasePlugin(ABC):
"""Base Class for all Plugins."""
@property
def zen_store(self) -> "BaseZenStore":
"""Returns the active zen store.
Returns:
The active zen store.
"""
return GlobalConfiguration().zen_store
@property
@abstractmethod
def config_class(self) -> Type[BasePluginConfig]:
"""Returns the `BasePluginConfig` config.
Returns:
The configuration.
"""
@property
@abstractmethod
def flavor_class(self) -> "Type[BasePluginFlavor]":
"""Returns the flavor class of the plugin.
Returns:
The flavor class of the plugin.
"""
config_class: Type[zenml.plugins.base_plugin_flavor.BasePluginConfig]
property
readonly
Returns the BasePluginConfig
config.
Returns:
Type | Description |
---|---|
Type[zenml.plugins.base_plugin_flavor.BasePluginConfig] |
The configuration. |
flavor_class: Type[BasePluginFlavor]
property
readonly
Returns the flavor class of the plugin.
Returns:
Type | Description |
---|---|
Type[BasePluginFlavor] |
The flavor class of the plugin. |
zen_store: BaseZenStore
property
readonly
Returns the active zen store.
Returns:
Type | Description |
---|---|
BaseZenStore |
The active zen store. |
BasePluginConfig (BaseModel, ABC)
Allows configuring of Event Source and Filter configuration.
Source code in zenml/plugins/base_plugin_flavor.py
class BasePluginConfig(BaseModel, ABC):
"""Allows configuring of Event Source and Filter configuration."""
model_config = ConfigDict(
# public attributes are mutable
frozen=False,
# ignore extra attributes during model initialization
extra="ignore",
)
BasePluginFlavor (ABC)
Base Class for all PluginFlavors.
Source code in zenml/plugins/base_plugin_flavor.py
class BasePluginFlavor(ABC):
"""Base Class for all PluginFlavors."""
TYPE: ClassVar[PluginType]
SUBTYPE: ClassVar[PluginSubType]
FLAVOR: ClassVar[str]
PLUGIN_CLASS: ClassVar[Type[BasePlugin]]
@classmethod
@abstractmethod
def get_flavor_response_model(
cls, hydrate: bool
) -> BasePluginFlavorResponse[Any, Any, Any]:
"""Convert the Flavor into a Response Model.
Args:
hydrate: Whether the model should be hydrated.
"""
get_flavor_response_model(hydrate)
classmethod
Convert the Flavor into a Response Model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
hydrate |
bool |
Whether the model should be hydrated. |
required |
Source code in zenml/plugins/base_plugin_flavor.py
@classmethod
@abstractmethod
def get_flavor_response_model(
cls, hydrate: bool
) -> BasePluginFlavorResponse[Any, Any, Any]:
"""Convert the Flavor into a Response Model.
Args:
hydrate: Whether the model should be hydrated.
"""
plugin_flavor_registry
Registry for all plugins.
PluginFlavorRegistry
Registry for plugin flavors.
Source code in zenml/plugins/plugin_flavor_registry.py
class PluginFlavorRegistry:
"""Registry for plugin flavors."""
def __init__(self) -> None:
"""Initialize the event flavor registry."""
self.plugin_flavors: Dict[
PluginType, Dict[PluginSubType, Dict[str, RegistryEntry]]
] = {}
self.register_plugin_flavors()
@property
def _types(self) -> List[PluginType]:
"""Returns all available types.
Returns:
List of all available plugin types.
"""
return list(self.plugin_flavors.keys())
def list_subtypes_within_type(
self, _type: PluginType
) -> List[PluginSubType]:
"""Returns all available subtypes for a given type.
Args:
_type: The type of plugin
Returns:
A list of available plugin subtypes for this plugin type.
"""
return list(self.plugin_flavors[_type].keys())
def _flavor_entries(
self, _type: PluginType, subtype: PluginSubType
) -> Dict[str, RegistryEntry]:
"""Get a list of all subtypes for a specific flavor and type.
Args:
_type: The type of Plugin
subtype: The subtype of the plugin
Returns:
Dict of Registry entries sorted by flavor name.
"""
if (
_type in self.plugin_flavors
and subtype in self.plugin_flavors[_type]
):
return self.plugin_flavors[_type][subtype]
else:
return {}
def list_available_flavors_for_type_and_subtype(
self,
_type: PluginType,
subtype: PluginSubType,
) -> List[Type[BasePluginFlavor]]:
"""Get a list of all subtypes for a specific flavor and type.
Args:
_type: The type of Plugin
subtype: The subtype of the plugin
Returns:
List of flavors for the given type/subtype combination.
"""
flavors = list(
[
entry.flavor_class
for _, entry in self._flavor_entries(
_type=_type, subtype=subtype
).items()
]
)
return flavors
def list_available_flavor_responses_for_type_and_subtype(
self,
_type: PluginType,
subtype: PluginSubType,
page: int,
size: int,
hydrate: bool = False,
) -> Page["BasePluginFlavorResponse[Any, Any, Any]"]:
"""Get a list of all subtypes for a specific flavor and type.
Args:
_type: The type of Plugin
subtype: The subtype of the plugin
page: Page for pagination (offset +1)
size: Page size for pagination
hydrate: Whether to hydrate the response bodies
Returns:
A page of flavors.
Raises:
ValueError: If the page is out of range.
"""
flavors = self.list_available_flavors_for_type_and_subtype(
_type=_type,
subtype=subtype,
)
total = len(flavors)
if total == 0:
total_pages = 1
else:
total_pages = math.ceil(total / size)
if page > total_pages:
raise ValueError(
f"Invalid page {page}. The requested page size is "
f"{size} and there are a total of {total} items "
f"for this query. The maximum page value therefore is "
f"{total_pages}."
)
start = (page - 1) * size
end = start + size
page_items = [
flavor.get_flavor_response_model(hydrate=hydrate)
for flavor in flavors[start:end]
]
return Page(
index=page,
max_size=size,
total_pages=total_pages,
total=total,
items=page_items,
)
@property
def _builtin_flavors(self) -> Sequence[Type["BasePluginFlavor"]]:
"""A list of all default in-built flavors.
Returns:
A list of builtin flavors.
"""
from zenml.actions.pipeline_run.pipeline_run_action import (
PipelineRunActionFlavor,
)
flavors = [PipelineRunActionFlavor]
return flavors
@property
def _integration_flavors(self) -> Sequence[Type["BasePluginFlavor"]]:
"""A list of all integration event flavors.
Returns:
A list of integration flavors.
"""
# TODO: Only load active integrations
integrated_flavors = []
for _, integration in integration_registry.integrations.items():
for flavor in integration.plugin_flavors():
integrated_flavors.append(flavor)
return integrated_flavors
def _get_registry_entry(
self,
_type: PluginType,
subtype: PluginSubType,
flavor_name: str,
) -> RegistryEntry:
"""Get registry entry.
Args:
_type: Type of the entry.
subtype: Subtype of the entry.
flavor_name: Flavor of the entry.
Returns:
The registry entry.
"""
return self.plugin_flavors[_type][subtype][flavor_name]
def register_plugin_flavors(self) -> None:
"""Registers all flavors."""
for flavor in self._builtin_flavors:
self.register_plugin_flavor(flavor_class=flavor)
for flavor in self._integration_flavors:
self.register_plugin_flavor(flavor_class=flavor)
def register_plugin_flavor(
self, flavor_class: Type[BasePluginFlavor]
) -> None:
"""Registers a new event_source.
Args:
flavor_class: The flavor to register
"""
try:
self._get_registry_entry(
_type=flavor_class.TYPE,
subtype=flavor_class.SUBTYPE,
flavor_name=flavor_class.FLAVOR,
)
except KeyError:
(
self.plugin_flavors.setdefault(flavor_class.TYPE, {})
.setdefault(flavor_class.SUBTYPE, {})
.setdefault(
flavor_class.FLAVOR,
RegistryEntry(flavor_class=flavor_class),
)
)
logger.debug(
f"Registered built in plugin {flavor_class.FLAVOR} for "
f"plugin type {flavor_class.TYPE} and "
f"subtype {flavor_class.SUBTYPE}: {flavor_class}"
)
else:
logger.debug(
f"Found existing flavor {flavor_class.FLAVOR} already "
f"registered for this type {flavor_class.TYPE} and subtype "
f"{flavor_class.SUBTYPE}. "
f"Skipping registration for {flavor_class}."
)
def get_flavor_class(
self,
_type: PluginType,
subtype: PluginSubType,
name: str,
) -> Type[BasePluginFlavor]:
"""Get a single event_source based on the key.
Args:
_type: The type of plugin.
subtype: The subtype of plugin.
name: Indicates the name of the plugin flavor.
Returns:
`BaseEventConfiguration` subclass that was registered for this key.
Raises:
KeyError: If there is no entry at this type, subtype, flavor
"""
try:
return self._get_registry_entry(
_type=_type,
subtype=subtype,
flavor_name=name,
).flavor_class
except KeyError:
raise KeyError(
f"No flavor class found for flavor name {name} at type "
f"{_type} and subtype {subtype}."
)
def get_plugin(
self,
_type: PluginType,
subtype: PluginSubType,
name: str,
) -> "BasePlugin":
"""Get the plugin based on the flavor, type and subtype.
Args:
name: The name of the plugin flavor.
_type: The type of plugin.
subtype: The subtype of plugin.
Returns:
Plugin instance associated with the flavor, type and subtype.
Raises:
KeyError: If no plugin is found for the given flavor, type and
subtype.
RuntimeError: If the plugin was not initialized.
"""
try:
plugin_entry = self._get_registry_entry(
_type=_type,
subtype=subtype,
flavor_name=name,
)
if plugin_entry.plugin_instance is None:
raise RuntimeError(
f"Plugin {plugin_entry.flavor_class} was not initialized."
)
return plugin_entry.plugin_instance
except KeyError:
raise KeyError(
f"No flavor found for flavor name {name} and type "
f"{_type} and subtype {subtype}."
)
def initialize_plugins(self) -> None:
"""Initializes all registered plugins."""
for _, subtypes_dict in self.plugin_flavors.items():
for _, flavor_dict in subtypes_dict.items():
for _, registry_entry in flavor_dict.items():
# TODO: Only initialize if the integration is active
registry_entry.plugin_instance = (
registry_entry.flavor_class.PLUGIN_CLASS()
)
__init__(self)
special
Initialize the event flavor registry.
Source code in zenml/plugins/plugin_flavor_registry.py
def __init__(self) -> None:
"""Initialize the event flavor registry."""
self.plugin_flavors: Dict[
PluginType, Dict[PluginSubType, Dict[str, RegistryEntry]]
] = {}
self.register_plugin_flavors()
get_flavor_class(self, _type, subtype, name)
Get a single event_source based on the key.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
_type |
PluginType |
The type of plugin. |
required |
subtype |
PluginSubType |
The subtype of plugin. |
required |
name |
str |
Indicates the name of the plugin flavor. |
required |
Returns:
Type | Description |
---|---|
Type[zenml.plugins.base_plugin_flavor.BasePluginFlavor] |
|
Exceptions:
Type | Description |
---|---|
KeyError |
If there is no entry at this type, subtype, flavor |
Source code in zenml/plugins/plugin_flavor_registry.py
def get_flavor_class(
self,
_type: PluginType,
subtype: PluginSubType,
name: str,
) -> Type[BasePluginFlavor]:
"""Get a single event_source based on the key.
Args:
_type: The type of plugin.
subtype: The subtype of plugin.
name: Indicates the name of the plugin flavor.
Returns:
`BaseEventConfiguration` subclass that was registered for this key.
Raises:
KeyError: If there is no entry at this type, subtype, flavor
"""
try:
return self._get_registry_entry(
_type=_type,
subtype=subtype,
flavor_name=name,
).flavor_class
except KeyError:
raise KeyError(
f"No flavor class found for flavor name {name} at type "
f"{_type} and subtype {subtype}."
)
get_plugin(self, _type, subtype, name)
Get the plugin based on the flavor, type and subtype.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the plugin flavor. |
required |
_type |
PluginType |
The type of plugin. |
required |
subtype |
PluginSubType |
The subtype of plugin. |
required |
Returns:
Type | Description |
---|---|
BasePlugin |
Plugin instance associated with the flavor, type and subtype. |
Exceptions:
Type | Description |
---|---|
KeyError |
If no plugin is found for the given flavor, type and subtype. |
RuntimeError |
If the plugin was not initialized. |
Source code in zenml/plugins/plugin_flavor_registry.py
def get_plugin(
self,
_type: PluginType,
subtype: PluginSubType,
name: str,
) -> "BasePlugin":
"""Get the plugin based on the flavor, type and subtype.
Args:
name: The name of the plugin flavor.
_type: The type of plugin.
subtype: The subtype of plugin.
Returns:
Plugin instance associated with the flavor, type and subtype.
Raises:
KeyError: If no plugin is found for the given flavor, type and
subtype.
RuntimeError: If the plugin was not initialized.
"""
try:
plugin_entry = self._get_registry_entry(
_type=_type,
subtype=subtype,
flavor_name=name,
)
if plugin_entry.plugin_instance is None:
raise RuntimeError(
f"Plugin {plugin_entry.flavor_class} was not initialized."
)
return plugin_entry.plugin_instance
except KeyError:
raise KeyError(
f"No flavor found for flavor name {name} and type "
f"{_type} and subtype {subtype}."
)
initialize_plugins(self)
Initializes all registered plugins.
Source code in zenml/plugins/plugin_flavor_registry.py
def initialize_plugins(self) -> None:
"""Initializes all registered plugins."""
for _, subtypes_dict in self.plugin_flavors.items():
for _, flavor_dict in subtypes_dict.items():
for _, registry_entry in flavor_dict.items():
# TODO: Only initialize if the integration is active
registry_entry.plugin_instance = (
registry_entry.flavor_class.PLUGIN_CLASS()
)
list_available_flavor_responses_for_type_and_subtype(self, _type, subtype, page, size, hydrate=False)
Get a list of all subtypes for a specific flavor and type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
_type |
PluginType |
The type of Plugin |
required |
subtype |
PluginSubType |
The subtype of the plugin |
required |
page |
int |
Page for pagination (offset +1) |
required |
size |
int |
Page size for pagination |
required |
hydrate |
bool |
Whether to hydrate the response bodies |
False |
Returns:
Type | Description |
---|---|
Page[BasePluginFlavorResponse[Any, Any, Any]] |
A page of flavors. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the page is out of range. |
Source code in zenml/plugins/plugin_flavor_registry.py
def list_available_flavor_responses_for_type_and_subtype(
self,
_type: PluginType,
subtype: PluginSubType,
page: int,
size: int,
hydrate: bool = False,
) -> Page["BasePluginFlavorResponse[Any, Any, Any]"]:
"""Get a list of all subtypes for a specific flavor and type.
Args:
_type: The type of Plugin
subtype: The subtype of the plugin
page: Page for pagination (offset +1)
size: Page size for pagination
hydrate: Whether to hydrate the response bodies
Returns:
A page of flavors.
Raises:
ValueError: If the page is out of range.
"""
flavors = self.list_available_flavors_for_type_and_subtype(
_type=_type,
subtype=subtype,
)
total = len(flavors)
if total == 0:
total_pages = 1
else:
total_pages = math.ceil(total / size)
if page > total_pages:
raise ValueError(
f"Invalid page {page}. The requested page size is "
f"{size} and there are a total of {total} items "
f"for this query. The maximum page value therefore is "
f"{total_pages}."
)
start = (page - 1) * size
end = start + size
page_items = [
flavor.get_flavor_response_model(hydrate=hydrate)
for flavor in flavors[start:end]
]
return Page(
index=page,
max_size=size,
total_pages=total_pages,
total=total,
items=page_items,
)
list_available_flavors_for_type_and_subtype(self, _type, subtype)
Get a list of all subtypes for a specific flavor and type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
_type |
PluginType |
The type of Plugin |
required |
subtype |
PluginSubType |
The subtype of the plugin |
required |
Returns:
Type | Description |
---|---|
List[Type[zenml.plugins.base_plugin_flavor.BasePluginFlavor]] |
List of flavors for the given type/subtype combination. |
Source code in zenml/plugins/plugin_flavor_registry.py
def list_available_flavors_for_type_and_subtype(
self,
_type: PluginType,
subtype: PluginSubType,
) -> List[Type[BasePluginFlavor]]:
"""Get a list of all subtypes for a specific flavor and type.
Args:
_type: The type of Plugin
subtype: The subtype of the plugin
Returns:
List of flavors for the given type/subtype combination.
"""
flavors = list(
[
entry.flavor_class
for _, entry in self._flavor_entries(
_type=_type, subtype=subtype
).items()
]
)
return flavors
list_subtypes_within_type(self, _type)
Returns all available subtypes for a given type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
_type |
PluginType |
The type of plugin |
required |
Returns:
Type | Description |
---|---|
List[zenml.enums.PluginSubType] |
A list of available plugin subtypes for this plugin type. |
Source code in zenml/plugins/plugin_flavor_registry.py
def list_subtypes_within_type(
self, _type: PluginType
) -> List[PluginSubType]:
"""Returns all available subtypes for a given type.
Args:
_type: The type of plugin
Returns:
A list of available plugin subtypes for this plugin type.
"""
return list(self.plugin_flavors[_type].keys())
register_plugin_flavor(self, flavor_class)
Registers a new event_source.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flavor_class |
Type[zenml.plugins.base_plugin_flavor.BasePluginFlavor] |
The flavor to register |
required |
Source code in zenml/plugins/plugin_flavor_registry.py
def register_plugin_flavor(
self, flavor_class: Type[BasePluginFlavor]
) -> None:
"""Registers a new event_source.
Args:
flavor_class: The flavor to register
"""
try:
self._get_registry_entry(
_type=flavor_class.TYPE,
subtype=flavor_class.SUBTYPE,
flavor_name=flavor_class.FLAVOR,
)
except KeyError:
(
self.plugin_flavors.setdefault(flavor_class.TYPE, {})
.setdefault(flavor_class.SUBTYPE, {})
.setdefault(
flavor_class.FLAVOR,
RegistryEntry(flavor_class=flavor_class),
)
)
logger.debug(
f"Registered built in plugin {flavor_class.FLAVOR} for "
f"plugin type {flavor_class.TYPE} and "
f"subtype {flavor_class.SUBTYPE}: {flavor_class}"
)
else:
logger.debug(
f"Found existing flavor {flavor_class.FLAVOR} already "
f"registered for this type {flavor_class.TYPE} and subtype "
f"{flavor_class.SUBTYPE}. "
f"Skipping registration for {flavor_class}."
)
register_plugin_flavors(self)
Registers all flavors.
Source code in zenml/plugins/plugin_flavor_registry.py
def register_plugin_flavors(self) -> None:
"""Registers all flavors."""
for flavor in self._builtin_flavors:
self.register_plugin_flavor(flavor_class=flavor)
for flavor in self._integration_flavors:
self.register_plugin_flavor(flavor_class=flavor)
RegistryEntry (BaseModel)
Registry Entry Class for the Plugin Registry.
Source code in zenml/plugins/plugin_flavor_registry.py
class RegistryEntry(BaseModel):
"""Registry Entry Class for the Plugin Registry."""
flavor_class: Type[BasePluginFlavor]
plugin_instance: Optional[BasePlugin] = None
model_config = ConfigDict(arbitrary_types_allowed=True)