Skip to content

Registry

zenml.integrations.registry

Implementation of a registry to track ZenML integrations.

Attributes

integration_registry = IntegrationRegistry() module-attribute

logger = get_logger(__name__) module-attribute

Classes

Integration

Base class for integration in ZenML.

Functions
activate() -> None classmethod

Abstract method to activate the integration.

Source code in src/zenml/integrations/integration.py
175
176
177
@classmethod
def activate(cls) -> None:
    """Abstract method to activate the integration."""
check_installation() -> bool classmethod

Method to check whether the required packages are installed.

Returns:

Type Description
bool

True if all required packages are installed, False otherwise.

Source code in src/zenml/integrations/integration.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@classmethod
def check_installation(cls) -> bool:
    """Method to check whether the required packages are installed.

    Returns:
        True if all required packages are installed, False otherwise.
    """
    for r in cls.get_requirements():
        try:
            # First check if the base package is installed
            dist = pkg_resources.get_distribution(r)

            # Next, check if the dependencies (including extras) are
            # installed
            deps: List[Requirement] = []

            _, extras = parse_requirement(r)
            if extras:
                extra_list = extras[1:-1].split(",")
                for extra in extra_list:
                    try:
                        requirements = dist.requires(extras=[extra])  # type: ignore[arg-type]
                    except pkg_resources.UnknownExtra as e:
                        logger.debug(f"Unknown extra: {str(e)}")
                        return False
                    deps.extend(requirements)
            else:
                deps = dist.requires()

            for ri in deps:
                try:
                    # Remove the "extra == ..." part from the requirement string
                    cleaned_req = re.sub(
                        r"; extra == \"\w+\"", "", str(ri)
                    )
                    pkg_resources.get_distribution(cleaned_req)
                except pkg_resources.DistributionNotFound as e:
                    logger.debug(
                        f"Unable to find required dependency "
                        f"'{e.req}' for requirement '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False
                except pkg_resources.VersionConflict as e:
                    logger.debug(
                        f"Package version '{e.dist}' does not match "
                        f"version '{e.req}' required by '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False

        except pkg_resources.DistributionNotFound as e:
            logger.debug(
                f"Unable to find required package '{e.req}' for "
                f"integration {cls.NAME}."
            )
            return False
        except pkg_resources.VersionConflict as e:
            logger.debug(
                f"Package version '{e.dist}' does not match version "
                f"'{e.req}' necessary for integration {cls.NAME}."
            )
            return False

    logger.debug(
        f"Integration {cls.NAME} is installed correctly with "
        f"requirements {cls.get_requirements()}."
    )
    return True
flavors() -> List[Type[Flavor]] classmethod

Abstract method to declare new stack component flavors.

Returns:

Type Description
List[Type[Flavor]]

A list of new stack component flavors.

Source code in src/zenml/integrations/integration.py
179
180
181
182
183
184
185
186
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Abstract method to declare new stack component flavors.

    Returns:
        A list of new stack component flavors.
    """
    return []
get_requirements(target_os: Optional[str] = None, python_version: Optional[str] = None) -> List[str] classmethod

Method to get the requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None
python_version Optional[str]

The Python version to use for the requirements.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@classmethod
def get_requirements(
    cls,
    target_os: Optional[str] = None,
    python_version: Optional[str] = None,
) -> List[str]:
    """Method to get the requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.
        python_version: The Python version to use for the requirements.

    Returns:
        A list of requirements.
    """
    return cls.REQUIREMENTS
get_uninstall_requirements(target_os: Optional[str] = None) -> List[str] classmethod

Method to get the uninstall requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@classmethod
def get_uninstall_requirements(
    cls, target_os: Optional[str] = None
) -> List[str]:
    """Method to get the uninstall requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.

    Returns:
        A list of requirements.
    """
    ret = []
    for each in cls.get_requirements(target_os=target_os):
        is_ignored = False
        for ignored in cls.REQUIREMENTS_IGNORED_ON_UNINSTALL:
            if each.startswith(ignored):
                is_ignored = True
                break
        if not is_ignored:
            ret.append(each)
    return ret
plugin_flavors() -> List[Type[BasePluginFlavor]] classmethod

Abstract method to declare new plugin flavors.

Returns:

Type Description
List[Type[BasePluginFlavor]]

A list of new plugin flavors.

Source code in src/zenml/integrations/integration.py
188
189
190
191
192
193
194
195
@classmethod
def plugin_flavors(cls) -> List[Type["BasePluginFlavor"]]:
    """Abstract method to declare new plugin flavors.

    Returns:
        A list of new plugin flavors.
    """
    return []

IntegrationError(message: Optional[str] = None, url: Optional[str] = None)

Bases: ZenMLBaseException

Raises exceptions when a requested integration can not be activated.

Source code in src/zenml/exceptions.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def __init__(
    self,
    message: Optional[str] = None,
    url: Optional[str] = None,
):
    """The BaseException used to format messages displayed to the user.

    Args:
        message: Message with details of exception. This message
                 will be appended with another message directing user to
                 `url` for more information. If `None`, then default
                 Exception behavior is used.
        url: URL to point to in exception message. If `None`, then no url
             is appended.
    """
    if message and url:
        message += f" For more information, visit {url}."
    super().__init__(message)

IntegrationRegistry()

Bases: object

Registry to keep track of ZenML Integrations.

Initializing the integration registry.

Source code in src/zenml/integrations/registry.py
32
33
34
35
def __init__(self) -> None:
    """Initializing the integration registry."""
    self._integrations: Dict[str, Type["Integration"]] = {}
    self._initialized = False
Attributes
integrations: Dict[str, Type[Integration]] property writable

Method to get integrations dictionary.

Returns:

Type Description
Dict[str, Type[Integration]]

A dict of integration key to type of Integration.

list_integration_names: List[str] property

Get a list of all possible integrations.

Returns:

Type Description
List[str]

A list of all possible integrations.

Functions
activate_integrations() -> None

Method to activate the integrations with are registered in the registry.

Source code in src/zenml/integrations/registry.py
 99
100
101
102
103
104
105
106
107
108
def activate_integrations(self) -> None:
    """Method to activate the integrations with are registered in the registry."""
    self._initialize()
    for name, integration in self._integrations.items():
        if integration.check_installation():
            logger.debug(f"Activating integration `{name}`...")
            integration.activate()
            logger.debug(f"Integration `{name}` is activated.")
        else:
            logger.debug(f"Integration `{name}` could not be activated.")
get_installed_integrations() -> List[str]

Returns list of installed integrations.

Returns:

Type Description
List[str]

List of installed integrations.

Source code in src/zenml/integrations/registry.py
224
225
226
227
228
229
230
231
232
233
234
235
def get_installed_integrations(self) -> List[str]:
    """Returns list of installed integrations.

    Returns:
        List of installed integrations.
    """
    self._initialize()
    return [
        name
        for name, integration in integration_registry.integrations.items()
        if integration.check_installation()
    ]
is_installed(integration_name: Optional[str] = None) -> bool

Checks if all requirements for an integration are installed.

Parameters:

Name Type Description Default
integration_name Optional[str]

Name of the integration to check.

None

Returns:

Type Description
bool

True if all requirements are installed, False otherwise.

Raises:

Type Description
KeyError

If the integration is not found.

Source code in src/zenml/integrations/registry.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def is_installed(self, integration_name: Optional[str] = None) -> bool:
    """Checks if all requirements for an integration are installed.

    Args:
        integration_name: Name of the integration to check.

    Returns:
        True if all requirements are installed, False otherwise.

    Raises:
        KeyError: If the integration is not found.
    """
    self._initialize()
    if integration_name in self.list_integration_names:
        return self._integrations[integration_name].check_installation()
    elif not integration_name:
        all_installed = [
            self._integrations[item].check_installation()
            for item in self.list_integration_names
        ]
        return all(all_installed)
    else:
        raise KeyError(
            f"Integration '{integration_name}' not found. "
            f"Currently the following integrations are available: "
            f"{self.list_integration_names}"
        )
register_integration(key: str, type_: Type[Integration]) -> None

Method to register an integration with a given name.

Parameters:

Name Type Description Default
key str

Name of the integration.

required
type_ Type[Integration]

Type of the integration.

required
Source code in src/zenml/integrations/registry.py
64
65
66
67
68
69
70
71
72
73
def register_integration(
    self, key: str, type_: Type["Integration"]
) -> None:
    """Method to register an integration with a given name.

    Args:
        key: Name of the integration.
        type_: Type of the integration.
    """
    self._integrations[key] = type_
select_integration_requirements(integration_name: Optional[str] = None, target_os: Optional[str] = None) -> List[str]

Select the requirements for a given integration or all integrations.

Parameters:

Name Type Description Default
integration_name Optional[str]

Name of the integration to check.

None
target_os Optional[str]

Target OS for the requirements.

None

Returns:

Type Description
List[str]

List of requirements for the integration.

Raises:

Type Description
KeyError

If the integration is not found.

Source code in src/zenml/integrations/registry.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def select_integration_requirements(
    self,
    integration_name: Optional[str] = None,
    target_os: Optional[str] = None,
) -> List[str]:
    """Select the requirements for a given integration or all integrations.

    Args:
        integration_name: Name of the integration to check.
        target_os: Target OS for the requirements.

    Returns:
        List of requirements for the integration.

    Raises:
        KeyError: If the integration is not found.
    """
    self._initialize()
    if integration_name:
        if integration_name in self.list_integration_names:
            return self._integrations[integration_name].get_requirements(
                target_os=target_os
            )
        else:
            raise KeyError(
                f"Integration {integration_name} does not exist. "
                f"Currently the following integrations are implemented. "
                f"{self.list_integration_names}"
            )
    else:
        return [
            requirement
            for name in self.list_integration_names
            for requirement in self._integrations[name].get_requirements(
                target_os=target_os
            )
        ]
select_uninstall_requirements(integration_name: Optional[str] = None, target_os: Optional[str] = None) -> List[str]

Select the uninstall requirements for a given integration or all integrations.

Parameters:

Name Type Description Default
integration_name Optional[str]

Name of the integration to check.

None
target_os Optional[str]

Target OS for the requirements.

None

Returns:

Type Description
List[str]

List of requirements for the integration uninstall.

Raises:

Type Description
KeyError

If the integration is not found.

Source code in src/zenml/integrations/registry.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def select_uninstall_requirements(
    self,
    integration_name: Optional[str] = None,
    target_os: Optional[str] = None,
) -> List[str]:
    """Select the uninstall requirements for a given integration or all integrations.

    Args:
        integration_name: Name of the integration to check.
        target_os: Target OS for the requirements.

    Returns:
        List of requirements for the integration uninstall.

    Raises:
        KeyError: If the integration is not found.
    """
    self._initialize()
    if integration_name:
        if integration_name in self.list_integration_names:
            return self._integrations[
                integration_name
            ].get_uninstall_requirements(target_os=target_os)
        else:
            raise KeyError(
                f"Integration {integration_name} does not exist. "
                f"Currently the following integrations are implemented. "
                f"{self.list_integration_names}"
            )
    else:
        return [
            requirement
            for name in self.list_integration_names
            for requirement in self._integrations[
                name
            ].get_uninstall_requirements(target_os=target_os)
        ]

Functions

get_logger(logger_name: str) -> logging.Logger

Main function to get logger name,.

Parameters:

Name Type Description Default
logger_name str

Name of logger to initialize.

required

Returns:

Type Description
Logger

A logger object.

Source code in src/zenml/logger.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def get_logger(logger_name: str) -> logging.Logger:
    """Main function to get logger name,.

    Args:
        logger_name: Name of logger to initialize.

    Returns:
        A logger object.
    """
    logger = logging.getLogger(logger_name)
    logger.setLevel(get_logging_level().value)
    logger.addHandler(get_console_handler())

    logger.propagate = False
    return logger