Skip to content

Discord

zenml.integrations.discord special

Discord integration for alerter components.

DiscordIntegration (Integration)

Definition of a Discord integration for ZenML.

Implemented using Discord API Wrapper.

Source code in zenml/integrations/discord/__init__.py
class DiscordIntegration(Integration):
    """Definition of a Discord integration for ZenML.

    Implemented using [Discord API Wrapper](https://pypi.org/project/discord.py/).
    """

    NAME = DISCORD
    REQUIREMENTS = ["discord.py>=2.3.2", "aiohttp>=3.8.1", "asyncio"]

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Discord integration.

        Returns:
            List of new flavors defined by the Discord integration.
        """
        from zenml.integrations.discord.flavors import DiscordAlerterFlavor

        return [DiscordAlerterFlavor]

flavors() classmethod

Declare the stack component flavors for the Discord integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of new flavors defined by the Discord integration.

Source code in zenml/integrations/discord/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Discord integration.

    Returns:
        List of new flavors defined by the Discord integration.
    """
    from zenml.integrations.discord.flavors import DiscordAlerterFlavor

    return [DiscordAlerterFlavor]

alerters special

Alerter components defined by the Discord integration.

discord_alerter

Implementation for discord flavor of alerter component.

DiscordAlerter (BaseAlerter)

Send messages to Discord channels.

Source code in zenml/integrations/discord/alerters/discord_alerter.py
class DiscordAlerter(BaseAlerter):
    """Send messages to Discord channels."""

    @property
    def config(self) -> DiscordAlerterConfig:
        """Returns the `DiscordAlerterConfig` config.

        Returns:
            The configuration.
        """
        return cast(DiscordAlerterConfig, self._config)

    def _get_channel_id(
        self, params: Optional[BaseAlerterStepParameters] = None
    ) -> str:
        """Get the Discord channel ID to be used by post/ask.

        Args:
            params: Optional parameters.

        Returns:
            ID of the Discord channel to be used.

        Raises:
            RuntimeError: if config is not of type `BaseAlerterStepConfig`.
            ValueError: if a discord channel was neither defined in the config
                nor in the discord alerter component.
        """
        if params and not isinstance(params, BaseAlerterStepParameters):
            raise RuntimeError(
                "The config object must be of type `BaseAlerterStepParameters`."
            )
        if (
            params
            and isinstance(params, DiscordAlerterParameters)
            and hasattr(params, "discord_channel_id")
            and params.discord_channel_id is not None
        ):
            return params.discord_channel_id
        if self.config.default_discord_channel_id is not None:
            return self.config.default_discord_channel_id
        raise ValueError(
            "Neither the `DiscordAlerterConfig.discord_channel_id` in the runtime "
            "configuration, nor the `default_discord_channel_id` in the alerter "
            "stack component is specified. Please specify at least one."
        )

    def _get_approve_msg_options(
        self, params: Optional[BaseAlerterStepParameters]
    ) -> List[str]:
        """Define which messages will lead to approval during ask().

        Args:
            params: Optional parameters.

        Returns:
            Set of messages that lead to approval in alerter.ask().
        """
        if (
            isinstance(params, DiscordAlerterParameters)
            and hasattr(params, "approve_msg_options")
            and params.approve_msg_options is not None
        ):
            return params.approve_msg_options
        return DEFAULT_APPROVE_MSG_OPTIONS

    def _get_disapprove_msg_options(
        self, params: Optional[BaseAlerterStepParameters]
    ) -> List[str]:
        """Define which messages will lead to disapproval during ask().

        Args:
            params: Optional parameters.

        Returns:
            Set of messages that lead to disapproval in alerter.ask().
        """
        if (
            isinstance(params, DiscordAlerterParameters)
            and hasattr(params, "disapprove_msg_options")
            and params.disapprove_msg_options is not None
        ):
            return params.disapprove_msg_options
        return DEFAULT_DISAPPROVE_MSG_OPTIONS

    def _create_blocks(
        self, message: str, params: Optional[BaseAlerterStepParameters]
    ) -> Optional[Embed]:
        """Helper function to create discord blocks.

        Args:
            message: message
            params: Optional parameters.

        Returns:
            Discord embed object.
        """
        blocks_response = None
        if (
            isinstance(params, DiscordAlerterParameters)
            and hasattr(params, "payload")
            and params.payload is not None
        ):
            payload = params.payload
            embed = Embed()
            embed.set_thumbnail(
                url="https://zenml-strapi-media.s3.eu-central-1.amazonaws.com/03_Zen_ML_Logo_Square_White_efefc24ae7.png"
            )

            # Add fields to the embed
            embed.add_field(
                name=":star: *Pipeline:*",
                value=f"\n{payload.pipeline_name}",
                inline=False,
            )
            embed.add_field(
                name=":arrow_forward: *Step:*",
                value=f"\n{payload.step_name}",
                inline=False,
            )
            embed.add_field(
                name=":ring_buoy: *Stack:*",
                value=f"\n{payload.stack_name}",
                inline=False,
            )

            # Add a message field
            embed.add_field(
                name=":email: *Message:*", value=f"\n{message}", inline=False
            )
            blocks_response = embed
        return blocks_response

    def start_client(self, client: Client) -> None:
        """Helper function to start discord client.

        Args:
            client: discord client object

        """
        loop = asyncio.get_event_loop()

        if loop.is_closed():
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            loop = asyncio.get_event_loop()

        timeout_seconds = 60
        # Run the bot with a timeout
        try:
            loop.run_until_complete(
                asyncio.wait_for(
                    client.start(self.config.discord_token),
                    timeout=timeout_seconds,
                )
            )
        except asyncio.TimeoutError:
            logger.error(
                "Client connection timed out. please verify the credentials."
            )
        finally:
            # Close the event loop
            loop.close()

    def post(
        self, message: str, params: Optional[BaseAlerterStepParameters] = None
    ) -> bool:
        """Post a message to a Discord channel.

        Args:
            message: Message to be posted.
            params: Optional parameters.

        Returns:
            True if operation succeeded, else False
        """
        discord_channel_id = self._get_channel_id(params=params)
        intents = Intents.default()
        intents.message_content = True

        client = Client(intents=intents)
        embed_blocks = self._create_blocks(message, params)
        message_sent = False

        @client.event
        async def on_ready() -> None:
            nonlocal message_sent
            try:
                channel = client.get_channel(int(discord_channel_id))
                if channel:
                    # Send the message
                    if embed_blocks:
                        await channel.send(embed=embed_blocks)  # type: ignore
                    else:
                        await channel.send(content=message)  # type: ignore
                    message_sent = True
                else:
                    logger.error(
                        f"Channel with ID {discord_channel_id} not found."
                    )

            except DiscordException as error:
                logger.error(f"DiscordAlerter.post() failed: {error}")
            finally:
                await client.close()

        self.start_client(client)
        return message_sent

    def ask(
        self, message: str, params: Optional[BaseAlerterStepParameters] = None
    ) -> bool:
        """Post a message to a Discord channel and wait for approval.

        Args:
            message: Initial message to be posted.
            params: Optional parameters.

        Returns:
            True if a user approved the operation, else False
        """
        discord_channel_id = self._get_channel_id(params=params)
        intents = Intents.default()
        intents.message_content = True

        client = Client(intents=intents)
        embed_blocks = self._create_blocks(message, params)
        approved = False  # will be modified by check()

        @client.event
        async def on_ready() -> None:
            try:
                channel = client.get_channel(int(discord_channel_id))
                if channel:
                    # Send the message
                    if embed_blocks:
                        await channel.send(embed=embed_blocks)  # type: ignore
                    else:
                        await channel.send(content=message)  # type: ignore

                    def check(message: Message) -> bool:
                        if message.channel == channel:
                            if (
                                message.content
                                in self._get_approve_msg_options(params)
                            ):
                                nonlocal approved
                                approved = True
                                return True
                            elif (
                                message.content
                                in self._get_disapprove_msg_options(params)
                            ):
                                return True
                        return False

                    await client.wait_for("message", check=check)

                else:
                    logger.error(
                        f"Channel with ID {discord_channel_id} not found."
                    )

            except DiscordException as error:
                logger.error(f"DiscordAlerter.ask() failed: {error}")
            finally:
                await client.close()

        self.start_client(client)
        return approved
config: DiscordAlerterConfig property readonly

Returns the DiscordAlerterConfig config.

Returns:

Type Description
DiscordAlerterConfig

The configuration.

ask(self, message, params=None)

Post a message to a Discord channel and wait for approval.

Parameters:

Name Type Description Default
message str

Initial message to be posted.

required
params Optional[zenml.alerter.base_alerter.BaseAlerterStepParameters]

Optional parameters.

None

Returns:

Type Description
bool

True if a user approved the operation, else False

Source code in zenml/integrations/discord/alerters/discord_alerter.py
def ask(
    self, message: str, params: Optional[BaseAlerterStepParameters] = None
) -> bool:
    """Post a message to a Discord channel and wait for approval.

    Args:
        message: Initial message to be posted.
        params: Optional parameters.

    Returns:
        True if a user approved the operation, else False
    """
    discord_channel_id = self._get_channel_id(params=params)
    intents = Intents.default()
    intents.message_content = True

    client = Client(intents=intents)
    embed_blocks = self._create_blocks(message, params)
    approved = False  # will be modified by check()

    @client.event
    async def on_ready() -> None:
        try:
            channel = client.get_channel(int(discord_channel_id))
            if channel:
                # Send the message
                if embed_blocks:
                    await channel.send(embed=embed_blocks)  # type: ignore
                else:
                    await channel.send(content=message)  # type: ignore

                def check(message: Message) -> bool:
                    if message.channel == channel:
                        if (
                            message.content
                            in self._get_approve_msg_options(params)
                        ):
                            nonlocal approved
                            approved = True
                            return True
                        elif (
                            message.content
                            in self._get_disapprove_msg_options(params)
                        ):
                            return True
                    return False

                await client.wait_for("message", check=check)

            else:
                logger.error(
                    f"Channel with ID {discord_channel_id} not found."
                )

        except DiscordException as error:
            logger.error(f"DiscordAlerter.ask() failed: {error}")
        finally:
            await client.close()

    self.start_client(client)
    return approved
post(self, message, params=None)

Post a message to a Discord channel.

Parameters:

Name Type Description Default
message str

Message to be posted.

required
params Optional[zenml.alerter.base_alerter.BaseAlerterStepParameters]

Optional parameters.

None

Returns:

Type Description
bool

True if operation succeeded, else False

Source code in zenml/integrations/discord/alerters/discord_alerter.py
def post(
    self, message: str, params: Optional[BaseAlerterStepParameters] = None
) -> bool:
    """Post a message to a Discord channel.

    Args:
        message: Message to be posted.
        params: Optional parameters.

    Returns:
        True if operation succeeded, else False
    """
    discord_channel_id = self._get_channel_id(params=params)
    intents = Intents.default()
    intents.message_content = True

    client = Client(intents=intents)
    embed_blocks = self._create_blocks(message, params)
    message_sent = False

    @client.event
    async def on_ready() -> None:
        nonlocal message_sent
        try:
            channel = client.get_channel(int(discord_channel_id))
            if channel:
                # Send the message
                if embed_blocks:
                    await channel.send(embed=embed_blocks)  # type: ignore
                else:
                    await channel.send(content=message)  # type: ignore
                message_sent = True
            else:
                logger.error(
                    f"Channel with ID {discord_channel_id} not found."
                )

        except DiscordException as error:
            logger.error(f"DiscordAlerter.post() failed: {error}")
        finally:
            await client.close()

    self.start_client(client)
    return message_sent
start_client(self, client)

Helper function to start discord client.

Parameters:

Name Type Description Default
client Client

discord client object

required
Source code in zenml/integrations/discord/alerters/discord_alerter.py
def start_client(self, client: Client) -> None:
    """Helper function to start discord client.

    Args:
        client: discord client object

    """
    loop = asyncio.get_event_loop()

    if loop.is_closed():
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop = asyncio.get_event_loop()

    timeout_seconds = 60
    # Run the bot with a timeout
    try:
        loop.run_until_complete(
            asyncio.wait_for(
                client.start(self.config.discord_token),
                timeout=timeout_seconds,
            )
        )
    except asyncio.TimeoutError:
        logger.error(
            "Client connection timed out. please verify the credentials."
        )
    finally:
        # Close the event loop
        loop.close()
DiscordAlerterParameters (BaseAlerterStepParameters) pydantic-model

Discord alerter parameters.

Source code in zenml/integrations/discord/alerters/discord_alerter.py
class DiscordAlerterParameters(BaseAlerterStepParameters):
    """Discord alerter parameters."""

    # The ID of the Discord channel to use for communication.
    discord_channel_id: Optional[str] = None

    # Set of messages that lead to approval in alerter.ask()
    approve_msg_options: Optional[List[str]] = None

    # Set of messages that lead to disapproval in alerter.ask()
    disapprove_msg_options: Optional[List[str]] = None
    payload: Optional[DiscordAlerterPayload] = None
    include_format_blocks: Optional[bool] = True
DiscordAlerterPayload (BaseModel) pydantic-model

Discord alerter payload implementation.

Source code in zenml/integrations/discord/alerters/discord_alerter.py
class DiscordAlerterPayload(BaseModel):
    """Discord alerter payload implementation."""

    pipeline_name: Optional[str] = None
    step_name: Optional[str] = None
    stack_name: Optional[str] = None

flavors special

Discord integration flavors.

discord_alerter_flavor

Discord alerter flavor.

DiscordAlerterConfig (BaseAlerterConfig) pydantic-model

Discord alerter config.

Attributes:

Name Type Description
discord_token str

The Discord token tied to the Discord account to be used.

default_discord_channel_id Optional[str]

The ID of the Discord channel to use for communication if no channel ID is provided in the step config.

Source code in zenml/integrations/discord/flavors/discord_alerter_flavor.py
class DiscordAlerterConfig(BaseAlerterConfig):
    """Discord alerter config.

    Attributes:
        discord_token: The Discord token tied to the Discord account to be used.
        default_discord_channel_id: The ID of the Discord channel to use for
            communication if no channel ID is provided in the step config.

    """

    discord_token: str = SecretField()
    default_discord_channel_id: Optional[str] = None  # TODO: Potential setting

    @property
    def is_valid(self) -> bool:
        """Check if the stack component is valid.

        Returns:
            True if the stack component is valid, False otherwise.
        """
        try:
            from discord import Client, DiscordException, Intents
        except ImportError:
            logger.warning(
                "Unable to validate Discord alerter credentials because the Discord integration is not installed."
            )
            return True

        intents = Intents.default()
        intents.message_content = True
        client = Client(intents=intents)
        valid = False
        try:
            # Check discord token validity
            @client.event
            async def on_ready() -> None:
                nonlocal valid
                try:
                    if self.default_discord_channel_id:
                        channel = client.get_channel(
                            int(self.default_discord_channel_id)
                        )
                        if channel:
                            valid = True
                    else:
                        valid = True
                finally:
                    await client.close()

            client.run(self.discord_token)
        except DiscordException as e:
            logger.error("Discord API Error:", e)
        except ValueError as ve:
            logger.error("Value Error:", ve)
        return valid
is_valid: bool property readonly

Check if the stack component is valid.

Returns:

Type Description
bool

True if the stack component is valid, False otherwise.

DiscordAlerterFlavor (BaseAlerterFlavor)

Discord alerter flavor.

Source code in zenml/integrations/discord/flavors/discord_alerter_flavor.py
class DiscordAlerterFlavor(BaseAlerterFlavor):
    """Discord alerter flavor."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return DISCORD_ALERTER_FLAVOR

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/alerter/discord.png"

    @property
    def config_class(self) -> Type[DiscordAlerterConfig]:
        """Returns `DiscordAlerterConfig` config class.

        Returns:
                The config class.
        """
        return DiscordAlerterConfig

    @property
    def implementation_class(self) -> Type["DiscordAlerter"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.discord.alerters import DiscordAlerter

        return DiscordAlerter
config_class: Type[zenml.integrations.discord.flavors.discord_alerter_flavor.DiscordAlerterConfig] property readonly

Returns DiscordAlerterConfig config class.

Returns:

Type Description
Type[zenml.integrations.discord.flavors.discord_alerter_flavor.DiscordAlerterConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[DiscordAlerter] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[DiscordAlerter]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

steps special

Built-in steps for the Discord integration.

discord_alerter_ask_step

Step that allows you to send messages to Discord and wait for a response.

discord_alerter_post_step

Step that allows you to post messages to Discord.