Skip to content

Discord

zenml.integrations.discord

Discord integration for alerter components.

Attributes

DISCORD = 'discord' module-attribute

DISCORD_ALERTER_FLAVOR = 'discord' module-attribute

Classes

DiscordIntegration

Bases: Integration

Definition of a Discord integration for ZenML.

Implemented using Discord API Wrapper.

Functions
flavors() -> List[Type[Flavor]] classmethod

Declare the stack component flavors for the Discord integration.

Returns:

Type Description
List[Type[Flavor]]

List of new flavors defined by the Discord integration.

Source code in src/zenml/integrations/discord/__init__.py
36
37
38
39
40
41
42
43
44
45
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Discord integration.

    Returns:
        List of new flavors defined by the Discord integration.
    """
    from zenml.integrations.discord.flavors import DiscordAlerterFlavor

    return [DiscordAlerterFlavor]

Flavor

Class for ZenML Flavors.

Attributes
config_class: Type[StackComponentConfig] abstractmethod property

Returns StackComponentConfig config class.

Returns:

Type Description
Type[StackComponentConfig]

The config class.

config_schema: Dict[str, Any] property

The config schema for a flavor.

Returns:

Type Description
Dict[str, Any]

The config schema.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[StackComponent] abstractmethod property

Implementation class for this flavor.

Returns:

Type Description
Type[StackComponent]

The implementation class for this flavor.

logo_url: Optional[str] property

A url to represent the flavor in the dashboard.

Returns:

Type Description
Optional[str]

The flavor logo.

name: str abstractmethod property

The flavor name.

Returns:

Type Description
str

The flavor name.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

type: StackComponentType abstractmethod property

The stack component type.

Returns:

Type Description
StackComponentType

The stack component type.

Functions
from_model(flavor_model: FlavorResponse) -> Flavor classmethod

Loads a flavor from a model.

Parameters:

Name Type Description Default
flavor_model FlavorResponse

The model to load from.

required

Raises:

Type Description
CustomFlavorImportError

If the custom flavor can't be imported.

ImportError

If the flavor can't be imported.

Returns:

Type Description
Flavor

The loaded flavor.

Source code in src/zenml/stack/flavor.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
@classmethod
def from_model(cls, flavor_model: FlavorResponse) -> "Flavor":
    """Loads a flavor from a model.

    Args:
        flavor_model: The model to load from.

    Raises:
        CustomFlavorImportError: If the custom flavor can't be imported.
        ImportError: If the flavor can't be imported.

    Returns:
        The loaded flavor.
    """
    try:
        flavor = source_utils.load(flavor_model.source)()
    except (ModuleNotFoundError, ImportError, NotImplementedError) as err:
        if flavor_model.is_custom:
            flavor_module, _ = flavor_model.source.rsplit(".", maxsplit=1)
            expected_file_path = os.path.join(
                source_utils.get_source_root(),
                flavor_module.replace(".", os.path.sep),
            )
            raise CustomFlavorImportError(
                f"Couldn't import custom flavor {flavor_model.name}: "
                f"{err}. Make sure the custom flavor class "
                f"`{flavor_model.source}` is importable. If it is part of "
                "a library, make sure it is installed. If "
                "it is a local code file, make sure it exists at "
                f"`{expected_file_path}.py`."
            )
        else:
            raise ImportError(
                f"Couldn't import flavor {flavor_model.name}: {err}"
            )
    return cast(Flavor, flavor)
generate_default_docs_url() -> str

Generate the doc urls for all inbuilt and integration flavors.

Note that this method is not going to be useful for custom flavors, which do not have any docs in the main zenml docs.

Returns:

Type Description
str

The complete url to the zenml documentation

Source code in src/zenml/stack/flavor.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def generate_default_docs_url(self) -> str:
    """Generate the doc urls for all inbuilt and integration flavors.

    Note that this method is not going to be useful for custom flavors,
    which do not have any docs in the main zenml docs.

    Returns:
        The complete url to the zenml documentation
    """
    from zenml import __version__

    component_type = self.type.plural.replace("_", "-")
    name = self.name.replace("_", "-")

    try:
        is_latest = is_latest_zenml_version()
    except RuntimeError:
        # We assume in error cases that we are on the latest version
        is_latest = True

    if is_latest:
        base = "https://docs.zenml.io"
    else:
        base = f"https://zenml-io.gitbook.io/zenml-legacy-documentation/v/{__version__}"
    return f"{base}/stack-components/{component_type}/{name}"
generate_default_sdk_docs_url() -> str

Generate SDK docs url for a flavor.

Returns:

Type Description
str

The complete url to the zenml SDK docs

Source code in src/zenml/stack/flavor.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def generate_default_sdk_docs_url(self) -> str:
    """Generate SDK docs url for a flavor.

    Returns:
        The complete url to the zenml SDK docs
    """
    from zenml import __version__

    base = f"https://sdkdocs.zenml.io/{__version__}"

    component_type = self.type.plural

    if "zenml.integrations" in self.__module__:
        # Get integration name out of module path which will look something
        #  like this "zenml.integrations.<integration>....
        integration = self.__module__.split(
            "zenml.integrations.", maxsplit=1
        )[1].split(".")[0]

        return (
            f"{base}/integration_code_docs"
            f"/integrations-{integration}/#{self.__module__}"
        )

    else:
        return (
            f"{base}/core_code_docs/core-{component_type}/"
            f"#{self.__module__}"
        )
to_model(integration: Optional[str] = None, is_custom: bool = True) -> FlavorRequest

Converts a flavor to a model.

Parameters:

Name Type Description Default
integration Optional[str]

The integration to use for the model.

None
is_custom bool

Whether the flavor is a custom flavor. Custom flavors are then scoped by user and workspace

True

Returns:

Type Description
FlavorRequest

The model.

Source code in src/zenml/stack/flavor.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def to_model(
    self,
    integration: Optional[str] = None,
    is_custom: bool = True,
) -> FlavorRequest:
    """Converts a flavor to a model.

    Args:
        integration: The integration to use for the model.
        is_custom: Whether the flavor is a custom flavor. Custom flavors
            are then scoped by user and workspace

    Returns:
        The model.
    """
    connector_requirements = self.service_connector_requirements
    connector_type = (
        connector_requirements.connector_type
        if connector_requirements
        else None
    )
    resource_type = (
        connector_requirements.resource_type
        if connector_requirements
        else None
    )
    resource_id_attr = (
        connector_requirements.resource_id_attr
        if connector_requirements
        else None
    )
    user = None
    workspace = None
    if is_custom:
        user = Client().active_user.id
        workspace = Client().active_workspace.id

    model_class = FlavorRequest if is_custom else InternalFlavorRequest
    model = model_class(
        user=user,
        workspace=workspace,
        name=self.name,
        type=self.type,
        source=source_utils.resolve(self.__class__).import_path,
        config_schema=self.config_schema,
        connector_type=connector_type,
        connector_resource_type=resource_type,
        connector_resource_id_attr=resource_id_attr,
        integration=integration,
        logo_url=self.logo_url,
        docs_url=self.docs_url,
        sdk_docs_url=self.sdk_docs_url,
        is_custom=is_custom,
    )
    return model

Integration

Base class for integration in ZenML.

Functions
activate() -> None classmethod

Abstract method to activate the integration.

Source code in src/zenml/integrations/integration.py
170
171
172
@classmethod
def activate(cls) -> None:
    """Abstract method to activate the integration."""
check_installation() -> bool classmethod

Method to check whether the required packages are installed.

Returns:

Type Description
bool

True if all required packages are installed, False otherwise.

Source code in src/zenml/integrations/integration.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@classmethod
def check_installation(cls) -> bool:
    """Method to check whether the required packages are installed.

    Returns:
        True if all required packages are installed, False otherwise.
    """
    for r in cls.get_requirements():
        try:
            # First check if the base package is installed
            dist = pkg_resources.get_distribution(r)

            # Next, check if the dependencies (including extras) are
            # installed
            deps: List[Requirement] = []

            _, extras = parse_requirement(r)
            if extras:
                extra_list = extras[1:-1].split(",")
                for extra in extra_list:
                    try:
                        requirements = dist.requires(extras=[extra])  # type: ignore[arg-type]
                    except pkg_resources.UnknownExtra as e:
                        logger.debug(f"Unknown extra: {str(e)}")
                        return False
                    deps.extend(requirements)
            else:
                deps = dist.requires()

            for ri in deps:
                try:
                    # Remove the "extra == ..." part from the requirement string
                    cleaned_req = re.sub(
                        r"; extra == \"\w+\"", "", str(ri)
                    )
                    pkg_resources.get_distribution(cleaned_req)
                except pkg_resources.DistributionNotFound as e:
                    logger.debug(
                        f"Unable to find required dependency "
                        f"'{e.req}' for requirement '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False
                except pkg_resources.VersionConflict as e:
                    logger.debug(
                        f"Package version '{e.dist}' does not match "
                        f"version '{e.req}' required by '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False

        except pkg_resources.DistributionNotFound as e:
            logger.debug(
                f"Unable to find required package '{e.req}' for "
                f"integration {cls.NAME}."
            )
            return False
        except pkg_resources.VersionConflict as e:
            logger.debug(
                f"Package version '{e.dist}' does not match version "
                f"'{e.req}' necessary for integration {cls.NAME}."
            )
            return False

    logger.debug(
        f"Integration {cls.NAME} is installed correctly with "
        f"requirements {cls.get_requirements()}."
    )
    return True
flavors() -> List[Type[Flavor]] classmethod

Abstract method to declare new stack component flavors.

Returns:

Type Description
List[Type[Flavor]]

A list of new stack component flavors.

Source code in src/zenml/integrations/integration.py
174
175
176
177
178
179
180
181
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Abstract method to declare new stack component flavors.

    Returns:
        A list of new stack component flavors.
    """
    return []
get_requirements(target_os: Optional[str] = None) -> List[str] classmethod

Method to get the requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
135
136
137
138
139
140
141
142
143
144
145
@classmethod
def get_requirements(cls, target_os: Optional[str] = None) -> List[str]:
    """Method to get the requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.

    Returns:
        A list of requirements.
    """
    return cls.REQUIREMENTS
get_uninstall_requirements(target_os: Optional[str] = None) -> List[str] classmethod

Method to get the uninstall requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
@classmethod
def get_uninstall_requirements(
    cls, target_os: Optional[str] = None
) -> List[str]:
    """Method to get the uninstall requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.

    Returns:
        A list of requirements.
    """
    ret = []
    for each in cls.get_requirements(target_os=target_os):
        is_ignored = False
        for ignored in cls.REQUIREMENTS_IGNORED_ON_UNINSTALL:
            if each.startswith(ignored):
                is_ignored = True
                break
        if not is_ignored:
            ret.append(each)
    return ret
plugin_flavors() -> List[Type[BasePluginFlavor]] classmethod

Abstract method to declare new plugin flavors.

Returns:

Type Description
List[Type[BasePluginFlavor]]

A list of new plugin flavors.

Source code in src/zenml/integrations/integration.py
183
184
185
186
187
188
189
190
@classmethod
def plugin_flavors(cls) -> List[Type["BasePluginFlavor"]]:
    """Abstract method to declare new plugin flavors.

    Returns:
        A list of new plugin flavors.
    """
    return []

StackComponentType

Bases: StrEnum

All possible types a StackComponent can have.

Attributes
plural: str property

Returns the plural of the enum value.

Returns:

Type Description
str

The plural of the enum value.

Modules

alerters

Alerter components defined by the Discord integration.

Classes
DiscordAlerter(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], workspace: UUID, created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseAlerter

Send messages to Discord channels.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    workspace: UUID,
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        workspace: The ID of the workspace the component belongs to.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.workspace = workspace
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: DiscordAlerterConfig property

Returns the DiscordAlerterConfig config.

Returns:

Type Description
DiscordAlerterConfig

The configuration.

Functions
ask(message: str, params: Optional[BaseAlerterStepParameters] = None) -> bool

Post a message to a Discord channel and wait for approval.

Parameters:

Name Type Description Default
message str

Initial message to be posted.

required
params Optional[BaseAlerterStepParameters]

Optional parameters.

None

Returns:

Type Description
bool

True if a user approved the operation, else False

Source code in src/zenml/integrations/discord/alerters/discord_alerter.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def ask(
    self, message: str, params: Optional[BaseAlerterStepParameters] = None
) -> bool:
    """Post a message to a Discord channel and wait for approval.

    Args:
        message: Initial message to be posted.
        params: Optional parameters.

    Returns:
        True if a user approved the operation, else False
    """
    discord_channel_id = self._get_channel_id(params=params)
    intents = Intents.default()
    intents.message_content = True

    client = Client(intents=intents)
    embed_blocks = self._create_blocks(message, params)
    approved = False  # will be modified by check()

    @client.event
    async def on_ready() -> None:
        try:
            channel = client.get_channel(int(discord_channel_id))
            if channel:
                # Send the message
                if embed_blocks:
                    await channel.send(embed=embed_blocks)  # type: ignore
                else:
                    await channel.send(content=message)  # type: ignore

                def check(message: Message) -> bool:
                    if message.channel == channel:
                        if (
                            message.content
                            in self._get_approve_msg_options(params)
                        ):
                            nonlocal approved
                            approved = True
                            return True
                        elif (
                            message.content
                            in self._get_disapprove_msg_options(params)
                        ):
                            return True
                    return False

                await client.wait_for("message", check=check)

            else:
                logger.error(
                    f"Channel with ID {discord_channel_id} not found."
                )

        except DiscordException as error:
            logger.error(f"DiscordAlerter.ask() failed: {error}")
        finally:
            await client.close()

    self.start_client(client)
    return approved
post(message: str, params: Optional[BaseAlerterStepParameters] = None) -> bool

Post a message to a Discord channel.

Parameters:

Name Type Description Default
message str

Message to be posted.

required
params Optional[BaseAlerterStepParameters]

Optional parameters.

None

Returns:

Type Description
bool

True if operation succeeded, else False

Source code in src/zenml/integrations/discord/alerters/discord_alerter.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def post(
    self, message: str, params: Optional[BaseAlerterStepParameters] = None
) -> bool:
    """Post a message to a Discord channel.

    Args:
        message: Message to be posted.
        params: Optional parameters.

    Returns:
        True if operation succeeded, else False
    """
    discord_channel_id = self._get_channel_id(params=params)
    intents = Intents.default()
    intents.message_content = True

    client = Client(intents=intents)
    embed_blocks = self._create_blocks(message, params)
    message_sent = False

    @client.event
    async def on_ready() -> None:
        nonlocal message_sent
        try:
            channel = client.get_channel(int(discord_channel_id))
            if channel:
                # Send the message
                if embed_blocks:
                    await channel.send(embed=embed_blocks)  # type: ignore
                else:
                    await channel.send(content=message)  # type: ignore
                message_sent = True
            else:
                logger.error(
                    f"Channel with ID {discord_channel_id} not found."
                )

        except DiscordException as error:
            logger.error(f"DiscordAlerter.post() failed: {error}")
        finally:
            await client.close()

    self.start_client(client)
    return message_sent
start_client(client: Client) -> None

Helper function to start discord client.

Parameters:

Name Type Description Default
client Client

discord client object

required
Source code in src/zenml/integrations/discord/alerters/discord_alerter.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def start_client(self, client: Client) -> None:
    """Helper function to start discord client.

    Args:
        client: discord client object

    """
    loop = asyncio.get_event_loop()

    if loop.is_closed():
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop = asyncio.get_event_loop()

    timeout_seconds = 60
    # Run the bot with a timeout
    try:
        loop.run_until_complete(
            asyncio.wait_for(
                client.start(self.config.discord_token),
                timeout=timeout_seconds,
            )
        )
    except asyncio.TimeoutError:
        logger.error(
            "Client connection timed out. please verify the credentials."
        )
    finally:
        # Close the event loop
        loop.close()
Modules
discord_alerter

Implementation for discord flavor of alerter component.

Classes
DiscordAlerter(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], workspace: UUID, created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseAlerter

Send messages to Discord channels.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    workspace: UUID,
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        workspace: The ID of the workspace the component belongs to.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.workspace = workspace
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: DiscordAlerterConfig property

Returns the DiscordAlerterConfig config.

Returns:

Type Description
DiscordAlerterConfig

The configuration.

Functions
ask(message: str, params: Optional[BaseAlerterStepParameters] = None) -> bool

Post a message to a Discord channel and wait for approval.

Parameters:

Name Type Description Default
message str

Initial message to be posted.

required
params Optional[BaseAlerterStepParameters]

Optional parameters.

None

Returns:

Type Description
bool

True if a user approved the operation, else False

Source code in src/zenml/integrations/discord/alerters/discord_alerter.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def ask(
    self, message: str, params: Optional[BaseAlerterStepParameters] = None
) -> bool:
    """Post a message to a Discord channel and wait for approval.

    Args:
        message: Initial message to be posted.
        params: Optional parameters.

    Returns:
        True if a user approved the operation, else False
    """
    discord_channel_id = self._get_channel_id(params=params)
    intents = Intents.default()
    intents.message_content = True

    client = Client(intents=intents)
    embed_blocks = self._create_blocks(message, params)
    approved = False  # will be modified by check()

    @client.event
    async def on_ready() -> None:
        try:
            channel = client.get_channel(int(discord_channel_id))
            if channel:
                # Send the message
                if embed_blocks:
                    await channel.send(embed=embed_blocks)  # type: ignore
                else:
                    await channel.send(content=message)  # type: ignore

                def check(message: Message) -> bool:
                    if message.channel == channel:
                        if (
                            message.content
                            in self._get_approve_msg_options(params)
                        ):
                            nonlocal approved
                            approved = True
                            return True
                        elif (
                            message.content
                            in self._get_disapprove_msg_options(params)
                        ):
                            return True
                    return False

                await client.wait_for("message", check=check)

            else:
                logger.error(
                    f"Channel with ID {discord_channel_id} not found."
                )

        except DiscordException as error:
            logger.error(f"DiscordAlerter.ask() failed: {error}")
        finally:
            await client.close()

    self.start_client(client)
    return approved
post(message: str, params: Optional[BaseAlerterStepParameters] = None) -> bool

Post a message to a Discord channel.

Parameters:

Name Type Description Default
message str

Message to be posted.

required
params Optional[BaseAlerterStepParameters]

Optional parameters.

None

Returns:

Type Description
bool

True if operation succeeded, else False

Source code in src/zenml/integrations/discord/alerters/discord_alerter.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def post(
    self, message: str, params: Optional[BaseAlerterStepParameters] = None
) -> bool:
    """Post a message to a Discord channel.

    Args:
        message: Message to be posted.
        params: Optional parameters.

    Returns:
        True if operation succeeded, else False
    """
    discord_channel_id = self._get_channel_id(params=params)
    intents = Intents.default()
    intents.message_content = True

    client = Client(intents=intents)
    embed_blocks = self._create_blocks(message, params)
    message_sent = False

    @client.event
    async def on_ready() -> None:
        nonlocal message_sent
        try:
            channel = client.get_channel(int(discord_channel_id))
            if channel:
                # Send the message
                if embed_blocks:
                    await channel.send(embed=embed_blocks)  # type: ignore
                else:
                    await channel.send(content=message)  # type: ignore
                message_sent = True
            else:
                logger.error(
                    f"Channel with ID {discord_channel_id} not found."
                )

        except DiscordException as error:
            logger.error(f"DiscordAlerter.post() failed: {error}")
        finally:
            await client.close()

    self.start_client(client)
    return message_sent
start_client(client: Client) -> None

Helper function to start discord client.

Parameters:

Name Type Description Default
client Client

discord client object

required
Source code in src/zenml/integrations/discord/alerters/discord_alerter.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def start_client(self, client: Client) -> None:
    """Helper function to start discord client.

    Args:
        client: discord client object

    """
    loop = asyncio.get_event_loop()

    if loop.is_closed():
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop = asyncio.get_event_loop()

    timeout_seconds = 60
    # Run the bot with a timeout
    try:
        loop.run_until_complete(
            asyncio.wait_for(
                client.start(self.config.discord_token),
                timeout=timeout_seconds,
            )
        )
    except asyncio.TimeoutError:
        logger.error(
            "Client connection timed out. please verify the credentials."
        )
    finally:
        # Close the event loop
        loop.close()
DiscordAlerterParameters

Bases: BaseAlerterStepParameters

Discord alerter parameters.

DiscordAlerterPayload

Bases: BaseModel

Discord alerter payload implementation.

Functions

flavors

Discord integration flavors.

Classes
DiscordAlerterConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseAlerterConfig

Discord alerter config.

Attributes:

Name Type Description
discord_token str

The Discord token tied to the Discord account to be used.

default_discord_channel_id Optional[str]

The ID of the Discord channel to use for communication if no channel ID is provided in the step config.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_valid: bool property

Check if the stack component is valid.

Returns:

Type Description
bool

True if the stack component is valid, False otherwise.

DiscordAlerterFlavor

Bases: BaseAlerterFlavor

Discord alerter flavor.

Attributes
config_class: Type[DiscordAlerterConfig] property

Returns DiscordAlerterConfig config class.

Returns:

Type Description
Type[DiscordAlerterConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[DiscordAlerter] property

Implementation class for this flavor.

Returns:

Type Description
Type[DiscordAlerter]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

Modules
discord_alerter_flavor

Discord alerter flavor.

Classes
DiscordAlerterConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseAlerterConfig

Discord alerter config.

Attributes:

Name Type Description
discord_token str

The Discord token tied to the Discord account to be used.

default_discord_channel_id Optional[str]

The ID of the Discord channel to use for communication if no channel ID is provided in the step config.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_valid: bool property

Check if the stack component is valid.

Returns:

Type Description
bool

True if the stack component is valid, False otherwise.

DiscordAlerterFlavor

Bases: BaseAlerterFlavor

Discord alerter flavor.

Attributes
config_class: Type[DiscordAlerterConfig] property

Returns DiscordAlerterConfig config class.

Returns:

Type Description
Type[DiscordAlerterConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[DiscordAlerter] property

Implementation class for this flavor.

Returns:

Type Description
Type[DiscordAlerter]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

Functions

steps

Built-in steps for the Discord integration.

Modules
discord_alerter_ask_step

Step that allows you to send messages to Discord and wait for a response.

Classes Functions
discord_alerter_ask_step(message: str, params: Optional[DiscordAlerterParameters] = None) -> bool

Posts a message to the Discord alerter component and waits for approval.

This can be useful, e.g. to easily get a human in the loop before deploying models.

Parameters:

Name Type Description Default
message str

Initial message to be posted.

required
params Optional[DiscordAlerterParameters]

Parameters for the Discord alerter.

None

Returns:

Type Description
bool

True if a user approved the operation, else False.

Raises:

Type Description
RuntimeError

If currently active alerter is not a DiscordAlerter.

Source code in src/zenml/integrations/discord/steps/discord_alerter_ask_step.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@step
def discord_alerter_ask_step(
    message: str,
    params: Optional[DiscordAlerterParameters] = None,
) -> bool:
    """Posts a message to the Discord alerter component and waits for approval.

    This can be useful, e.g. to easily get a human in the loop before
    deploying models.

    Args:
        message: Initial message to be posted.
        params: Parameters for the Discord alerter.

    Returns:
        True if a user approved the operation, else False.

    Raises:
        RuntimeError: If currently active alerter is not a `DiscordAlerter`.
    """
    context = get_step_context()
    client = Client()
    active_stack = client.active_stack
    alerter = active_stack.alerter
    if not isinstance(alerter, DiscordAlerter):
        # TODO: potential duplicate code for other components
        # -> generalize to `check_component_flavor()` utility function?
        raise RuntimeError(
            "Step `discord_alerter_ask_step` requires an alerter component of "
            "flavor `discord`, but the currently active alerter is of type "
            f"{type(alerter)}, which is not a subclass of `DiscordAlerter`."
        )
    if (
        params
        and hasattr(params, "include_format_blocks")
        and params.include_format_blocks
    ):
        pipeline_name = context.pipeline.name
        step_name = context.step_run.name
        payload = DiscordAlerterPayload(
            pipeline_name=pipeline_name,
            step_name=step_name,
            stack_name=active_stack.name,
        )
        params.payload = payload
    return alerter.ask(message, params)
discord_alerter_post_step

Step that allows you to post messages to Discord.

Classes Functions
discord_alerter_post_step(message: str, params: Optional[DiscordAlerterParameters] = None) -> bool

Post a message to the Discord alerter component of the active stack.

Parameters:

Name Type Description Default
message str

Message to be posted.

required
params Optional[DiscordAlerterParameters]

Parameters for the Discord alerter.

None

Returns:

Type Description
bool

True if operation succeeded, else False.

Raises:

Type Description
RuntimeError

If currently active alerter is not a DiscordAlerter.

Source code in src/zenml/integrations/discord/steps/discord_alerter_post_step.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@step
def discord_alerter_post_step(
    message: str,
    params: Optional[DiscordAlerterParameters] = None,
) -> bool:
    """Post a message to the Discord alerter component of the active stack.

    Args:
        message: Message to be posted.
        params: Parameters for the Discord alerter.

    Returns:
        True if operation succeeded, else False.

    Raises:
        RuntimeError: If currently active alerter is not a `DiscordAlerter`.
    """
    context = get_step_context()
    client = Client()
    active_stack = client.active_stack
    alerter = active_stack.alerter
    if not isinstance(alerter, DiscordAlerter):
        raise RuntimeError(
            "Step `discord_alerter_post_step` requires an alerter component of "
            "flavor `discord`, but the currently active alerter is of type "
            f"{type(alerter)}, which is not a subclass of `DiscordAlerter`."
        )
    if (
        params
        and hasattr(params, "include_format_blocks")
        and params.include_format_blocks
    ):
        pipeline_name = context.pipeline.name
        step_name = context.step_run.name
        payload = DiscordAlerterPayload(
            pipeline_name=pipeline_name,
            step_name=step_name,
            stack_name=active_stack.name,
        )
        params.payload = payload
    return alerter.post(message, params)