Discord
        zenml.integrations.discord
  
      special
  
    Discord integration for alerter components.
        
DiscordIntegration            (Integration)
        
    Definition of a Discord integration for ZenML.
Implemented using Discord API Wrapper.
Source code in zenml/integrations/discord/__init__.py
          class DiscordIntegration(Integration):
    """Definition of a Discord integration for ZenML.
    Implemented using [Discord API Wrapper](https://pypi.org/project/discord.py/).
    """
    NAME = DISCORD
    REQUIREMENTS = ["discord.py>=2.3.2", "aiohttp>=3.8.1", "asyncio"]
    REQUIREMENTS_IGNORED_ON_UNINSTALL = ["aiohttp","asyncio"]
    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Discord integration.
        Returns:
            List of new flavors defined by the Discord integration.
        """
        from zenml.integrations.discord.flavors import DiscordAlerterFlavor
        return [DiscordAlerterFlavor]
flavors()
  
      classmethod
  
    Declare the stack component flavors for the Discord integration.
Returns:
| Type | Description | 
|---|---|
| List[Type[zenml.stack.flavor.Flavor]] | List of new flavors defined by the Discord integration. | 
Source code in zenml/integrations/discord/__init__.py
          @classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Discord integration.
    Returns:
        List of new flavors defined by the Discord integration.
    """
    from zenml.integrations.discord.flavors import DiscordAlerterFlavor
    return [DiscordAlerterFlavor]
        alerters
  
      special
  
    Alerter components defined by the Discord integration.
        discord_alerter
    Implementation for discord flavor of alerter component.
        
DiscordAlerter            (BaseAlerter)
        
    Send messages to Discord channels.
Source code in zenml/integrations/discord/alerters/discord_alerter.py
          class DiscordAlerter(BaseAlerter):
    """Send messages to Discord channels."""
    @property
    def config(self) -> DiscordAlerterConfig:
        """Returns the `DiscordAlerterConfig` config.
        Returns:
            The configuration.
        """
        return cast(DiscordAlerterConfig, self._config)
    def _get_channel_id(
        self, params: Optional[BaseAlerterStepParameters] = None
    ) -> str:
        """Get the Discord channel ID to be used by post/ask.
        Args:
            params: Optional parameters.
        Returns:
            ID of the Discord channel to be used.
        Raises:
            RuntimeError: if config is not of type `BaseAlerterStepConfig`.
            ValueError: if a discord channel was neither defined in the config
                nor in the discord alerter component.
        """
        if params and not isinstance(params, BaseAlerterStepParameters):
            raise RuntimeError(
                "The config object must be of type `BaseAlerterStepParameters`."
            )
        if (
            params
            and isinstance(params, DiscordAlerterParameters)
            and hasattr(params, "discord_channel_id")
            and params.discord_channel_id is not None
        ):
            return params.discord_channel_id
        if self.config.default_discord_channel_id is not None:
            return self.config.default_discord_channel_id
        raise ValueError(
            "Neither the `DiscordAlerterConfig.discord_channel_id` in the runtime "
            "configuration, nor the `default_discord_channel_id` in the alerter "
            "stack component is specified. Please specify at least one."
        )
    def _get_approve_msg_options(
        self, params: Optional[BaseAlerterStepParameters]
    ) -> List[str]:
        """Define which messages will lead to approval during ask().
        Args:
            params: Optional parameters.
        Returns:
            Set of messages that lead to approval in alerter.ask().
        """
        if (
            isinstance(params, DiscordAlerterParameters)
            and hasattr(params, "approve_msg_options")
            and params.approve_msg_options is not None
        ):
            return params.approve_msg_options
        return DEFAULT_APPROVE_MSG_OPTIONS
    def _get_disapprove_msg_options(
        self, params: Optional[BaseAlerterStepParameters]
    ) -> List[str]:
        """Define which messages will lead to disapproval during ask().
        Args:
            params: Optional parameters.
        Returns:
            Set of messages that lead to disapproval in alerter.ask().
        """
        if (
            isinstance(params, DiscordAlerterParameters)
            and hasattr(params, "disapprove_msg_options")
            and params.disapprove_msg_options is not None
        ):
            return params.disapprove_msg_options
        return DEFAULT_DISAPPROVE_MSG_OPTIONS
    def _create_blocks(
        self, message: str, params: Optional[BaseAlerterStepParameters]
    ) -> Optional[Embed]:
        """Helper function to create discord blocks.
        Args:
            message: message
            params: Optional parameters.
        Returns:
            Discord embed object.
        """
        blocks_response = None
        if (
            isinstance(params, DiscordAlerterParameters)
            and hasattr(params, "payload")
            and params.payload is not None
        ):
            payload = params.payload
            embed = Embed()
            embed.set_thumbnail(
                url="https://zenml-strapi-media.s3.eu-central-1.amazonaws.com/03_Zen_ML_Logo_Square_White_efefc24ae7.png"
            )
            # Add fields to the embed
            embed.add_field(
                name=":star: *Pipeline:*",
                value=f"\n{payload.pipeline_name}",
                inline=False,
            )
            embed.add_field(
                name=":arrow_forward: *Step:*",
                value=f"\n{payload.step_name}",
                inline=False,
            )
            embed.add_field(
                name=":ring_buoy: *Stack:*",
                value=f"\n{payload.stack_name}",
                inline=False,
            )
            # Add a message field
            embed.add_field(
                name=":email: *Message:*", value=f"\n{message}", inline=False
            )
            blocks_response = embed
        return blocks_response
    def start_client(self, client: Client) -> None:
        """Helper function to start discord client.
        Args:
            client: discord client object
        """
        loop = asyncio.get_event_loop()
        if loop.is_closed():
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            loop = asyncio.get_event_loop()
        timeout_seconds = 60
        # Run the bot with a timeout
        try:
            loop.run_until_complete(
                asyncio.wait_for(
                    client.start(self.config.discord_token),
                    timeout=timeout_seconds,
                )
            )
        except asyncio.TimeoutError:
            logger.error(
                "Client connection timed out. please verify the credentials."
            )
        finally:
            # Close the event loop
            loop.close()
    def post(
        self, message: str, params: Optional[BaseAlerterStepParameters] = None
    ) -> bool:
        """Post a message to a Discord channel.
        Args:
            message: Message to be posted.
            params: Optional parameters.
        Returns:
            True if operation succeeded, else False
        """
        discord_channel_id = self._get_channel_id(params=params)
        intents = Intents.default()
        intents.message_content = True
        client = Client(intents=intents)
        embed_blocks = self._create_blocks(message, params)
        message_sent = False
        @client.event
        async def on_ready() -> None:
            nonlocal message_sent
            try:
                channel = client.get_channel(int(discord_channel_id))
                if channel:
                    # Send the message
                    if embed_blocks:
                        await channel.send(embed=embed_blocks)  # type: ignore
                    else:
                        await channel.send(content=message)  # type: ignore
                    message_sent = True
                else:
                    logger.error(
                        f"Channel with ID {discord_channel_id} not found."
                    )
            except DiscordException as error:
                logger.error(f"DiscordAlerter.post() failed: {error}")
            finally:
                await client.close()
        self.start_client(client)
        return message_sent
    def ask(
        self, message: str, params: Optional[BaseAlerterStepParameters] = None
    ) -> bool:
        """Post a message to a Discord channel and wait for approval.
        Args:
            message: Initial message to be posted.
            params: Optional parameters.
        Returns:
            True if a user approved the operation, else False
        """
        discord_channel_id = self._get_channel_id(params=params)
        intents = Intents.default()
        intents.message_content = True
        client = Client(intents=intents)
        embed_blocks = self._create_blocks(message, params)
        approved = False  # will be modified by check()
        @client.event
        async def on_ready() -> None:
            try:
                channel = client.get_channel(int(discord_channel_id))
                if channel:
                    # Send the message
                    if embed_blocks:
                        await channel.send(embed=embed_blocks)  # type: ignore
                    else:
                        await channel.send(content=message)  # type: ignore
                    def check(message: Message) -> bool:
                        if message.channel == channel:
                            if (
                                message.content
                                in self._get_approve_msg_options(params)
                            ):
                                nonlocal approved
                                approved = True
                                return True
                            elif (
                                message.content
                                in self._get_disapprove_msg_options(params)
                            ):
                                return True
                        return False
                    await client.wait_for("message", check=check)
                else:
                    logger.error(
                        f"Channel with ID {discord_channel_id} not found."
                    )
            except DiscordException as error:
                logger.error(f"DiscordAlerter.ask() failed: {error}")
            finally:
                await client.close()
        self.start_client(client)
        return approved
config: DiscordAlerterConfig
  
      property
      readonly
  
    Returns the DiscordAlerterConfig config.
Returns:
| Type | Description | 
|---|---|
| DiscordAlerterConfig | The configuration. | 
ask(self, message, params=None)
    Post a message to a Discord channel and wait for approval.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| message | str | Initial message to be posted. | required | 
| params | Optional[zenml.alerter.base_alerter.BaseAlerterStepParameters] | Optional parameters. | None | 
Returns:
| Type | Description | 
|---|---|
| bool | True if a user approved the operation, else False | 
Source code in zenml/integrations/discord/alerters/discord_alerter.py
          def ask(
    self, message: str, params: Optional[BaseAlerterStepParameters] = None
) -> bool:
    """Post a message to a Discord channel and wait for approval.
    Args:
        message: Initial message to be posted.
        params: Optional parameters.
    Returns:
        True if a user approved the operation, else False
    """
    discord_channel_id = self._get_channel_id(params=params)
    intents = Intents.default()
    intents.message_content = True
    client = Client(intents=intents)
    embed_blocks = self._create_blocks(message, params)
    approved = False  # will be modified by check()
    @client.event
    async def on_ready() -> None:
        try:
            channel = client.get_channel(int(discord_channel_id))
            if channel:
                # Send the message
                if embed_blocks:
                    await channel.send(embed=embed_blocks)  # type: ignore
                else:
                    await channel.send(content=message)  # type: ignore
                def check(message: Message) -> bool:
                    if message.channel == channel:
                        if (
                            message.content
                            in self._get_approve_msg_options(params)
                        ):
                            nonlocal approved
                            approved = True
                            return True
                        elif (
                            message.content
                            in self._get_disapprove_msg_options(params)
                        ):
                            return True
                    return False
                await client.wait_for("message", check=check)
            else:
                logger.error(
                    f"Channel with ID {discord_channel_id} not found."
                )
        except DiscordException as error:
            logger.error(f"DiscordAlerter.ask() failed: {error}")
        finally:
            await client.close()
    self.start_client(client)
    return approved
post(self, message, params=None)
    Post a message to a Discord channel.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| message | str | Message to be posted. | required | 
| params | Optional[zenml.alerter.base_alerter.BaseAlerterStepParameters] | Optional parameters. | None | 
Returns:
| Type | Description | 
|---|---|
| bool | True if operation succeeded, else False | 
Source code in zenml/integrations/discord/alerters/discord_alerter.py
          def post(
    self, message: str, params: Optional[BaseAlerterStepParameters] = None
) -> bool:
    """Post a message to a Discord channel.
    Args:
        message: Message to be posted.
        params: Optional parameters.
    Returns:
        True if operation succeeded, else False
    """
    discord_channel_id = self._get_channel_id(params=params)
    intents = Intents.default()
    intents.message_content = True
    client = Client(intents=intents)
    embed_blocks = self._create_blocks(message, params)
    message_sent = False
    @client.event
    async def on_ready() -> None:
        nonlocal message_sent
        try:
            channel = client.get_channel(int(discord_channel_id))
            if channel:
                # Send the message
                if embed_blocks:
                    await channel.send(embed=embed_blocks)  # type: ignore
                else:
                    await channel.send(content=message)  # type: ignore
                message_sent = True
            else:
                logger.error(
                    f"Channel with ID {discord_channel_id} not found."
                )
        except DiscordException as error:
            logger.error(f"DiscordAlerter.post() failed: {error}")
        finally:
            await client.close()
    self.start_client(client)
    return message_sent
start_client(self, client)
    Helper function to start discord client.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| client | discord.Client | discord client object | required | 
Source code in zenml/integrations/discord/alerters/discord_alerter.py
          def start_client(self, client: Client) -> None:
    """Helper function to start discord client.
    Args:
        client: discord client object
    """
    loop = asyncio.get_event_loop()
    if loop.is_closed():
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop = asyncio.get_event_loop()
    timeout_seconds = 60
    # Run the bot with a timeout
    try:
        loop.run_until_complete(
            asyncio.wait_for(
                client.start(self.config.discord_token),
                timeout=timeout_seconds,
            )
        )
    except asyncio.TimeoutError:
        logger.error(
            "Client connection timed out. please verify the credentials."
        )
    finally:
        # Close the event loop
        loop.close()
        
DiscordAlerterParameters            (BaseAlerterStepParameters)
        
    Discord alerter parameters.
Source code in zenml/integrations/discord/alerters/discord_alerter.py
          class DiscordAlerterParameters(BaseAlerterStepParameters):
    """Discord alerter parameters."""
    # The ID of the Discord channel to use for communication.
    discord_channel_id: Optional[str] = None
    # Set of messages that lead to approval in alerter.ask()
    approve_msg_options: Optional[List[str]] = None
    # Set of messages that lead to disapproval in alerter.ask()
    disapprove_msg_options: Optional[List[str]] = None
    payload: Optional[DiscordAlerterPayload] = None
    include_format_blocks: Optional[bool] = True
        
DiscordAlerterPayload            (BaseModel)
        
    Discord alerter payload implementation.
Source code in zenml/integrations/discord/alerters/discord_alerter.py
          class DiscordAlerterPayload(BaseModel):
    """Discord alerter payload implementation."""
    pipeline_name: Optional[str] = None
    step_name: Optional[str] = None
    stack_name: Optional[str] = None
        flavors
  
      special
  
    Discord integration flavors.
        discord_alerter_flavor
    Discord alerter flavor.
        
DiscordAlerterConfig            (BaseAlerterConfig)
        
    Discord alerter config.
Attributes:
| Name | Type | Description | 
|---|---|---|
| discord_token | str | The Discord token tied to the Discord account to be used. | 
| default_discord_channel_id | Optional[str] | The ID of the Discord channel to use for communication if no channel ID is provided in the step config. | 
Source code in zenml/integrations/discord/flavors/discord_alerter_flavor.py
          class DiscordAlerterConfig(BaseAlerterConfig):
    """Discord alerter config.
    Attributes:
        discord_token: The Discord token tied to the Discord account to be used.
        default_discord_channel_id: The ID of the Discord channel to use for
            communication if no channel ID is provided in the step config.
    """
    discord_token: str = SecretField()
    default_discord_channel_id: Optional[str] = None  # TODO: Potential setting
    @property
    def is_valid(self) -> bool:
        """Check if the stack component is valid.
        Returns:
            True if the stack component is valid, False otherwise.
        """
        try:
            from discord import Client, DiscordException, Intents
        except ImportError:
            logger.warning(
                "Unable to validate Discord alerter credentials because the Discord integration is not installed."
            )
            return True
        intents = Intents.default()
        intents.message_content = True
        client = Client(intents=intents)
        valid = False
        try:
            # Check discord token validity
            @client.event
            async def on_ready() -> None:
                nonlocal valid
                try:
                    if self.default_discord_channel_id:
                        channel = client.get_channel(
                            int(self.default_discord_channel_id)
                        )
                        if channel:
                            valid = True
                    else:
                        valid = True
                finally:
                    await client.close()
            client.run(self.discord_token)
        except DiscordException as e:
            logger.error("Discord API Error:", e)
        except ValueError as ve:
            logger.error("Value Error:", ve)
        return valid
is_valid: bool
  
      property
      readonly
  
    Check if the stack component is valid.
Returns:
| Type | Description | 
|---|---|
| bool | True if the stack component is valid, False otherwise. | 
        
DiscordAlerterFlavor            (BaseAlerterFlavor)
        
    Discord alerter flavor.
Source code in zenml/integrations/discord/flavors/discord_alerter_flavor.py
          class DiscordAlerterFlavor(BaseAlerterFlavor):
    """Discord alerter flavor."""
    @property
    def name(self) -> str:
        """Name of the flavor.
        Returns:
            The name of the flavor.
        """
        return DISCORD_ALERTER_FLAVOR
    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.
        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()
    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.
        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()
    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.
        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/alerter/discord.png"
    @property
    def config_class(self) -> Type[DiscordAlerterConfig]:
        """Returns `DiscordAlerterConfig` config class.
        Returns:
                The config class.
        """
        return DiscordAlerterConfig
    @property
    def implementation_class(self) -> Type["DiscordAlerter"]:
        """Implementation class for this flavor.
        Returns:
            The implementation class.
        """
        from zenml.integrations.discord.alerters import DiscordAlerter
        return DiscordAlerter
config_class: Type[zenml.integrations.discord.flavors.discord_alerter_flavor.DiscordAlerterConfig]
  
      property
      readonly
  
    Returns DiscordAlerterConfig config class.
Returns:
| Type | Description | 
|---|---|
| Type[zenml.integrations.discord.flavors.discord_alerter_flavor.DiscordAlerterConfig] | The config class. | 
docs_url: Optional[str]
  
      property
      readonly
  
    A url to point at docs explaining this flavor.
Returns:
| Type | Description | 
|---|---|
| Optional[str] | A flavor docs url. | 
implementation_class: Type[DiscordAlerter]
  
      property
      readonly
  
    Implementation class for this flavor.
Returns:
| Type | Description | 
|---|---|
| Type[DiscordAlerter] | The implementation class. | 
logo_url: str
  
      property
      readonly
  
    A url to represent the flavor in the dashboard.
Returns:
| Type | Description | 
|---|---|
| str | The flavor logo. | 
name: str
  
      property
      readonly
  
    Name of the flavor.
Returns:
| Type | Description | 
|---|---|
| str | The name of the flavor. | 
sdk_docs_url: Optional[str]
  
      property
      readonly
  
    A url to point at SDK docs explaining this flavor.
Returns:
| Type | Description | 
|---|---|
| Optional[str] | A flavor SDK docs url. | 
        steps
  
      special
  
    Built-in steps for the Discord integration.
        discord_alerter_ask_step
    Step that allows you to send messages to Discord and wait for a response.
        discord_alerter_post_step
    Step that allows you to post messages to Discord.