Discord
zenml.integrations.discord
special
Discord integration for alerter components.
DiscordIntegration (Integration)
Definition of a Discord integration for ZenML.
Implemented using Discord API Wrapper.
Source code in zenml/integrations/discord/__init__.py
class DiscordIntegration(Integration):
"""Definition of a Discord integration for ZenML.
Implemented using [Discord API Wrapper](https://pypi.org/project/discord.py/).
"""
NAME = DISCORD
REQUIREMENTS = ["discord.py>=2.3.2", "aiohttp>=3.8.1", "asyncio"]
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Discord integration.
Returns:
List of new flavors defined by the Discord integration.
"""
from zenml.integrations.discord.flavors import DiscordAlerterFlavor
return [DiscordAlerterFlavor]
flavors()
classmethod
Declare the stack component flavors for the Discord integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of new flavors defined by the Discord integration. |
Source code in zenml/integrations/discord/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Discord integration.
Returns:
List of new flavors defined by the Discord integration.
"""
from zenml.integrations.discord.flavors import DiscordAlerterFlavor
return [DiscordAlerterFlavor]
alerters
special
Alerter components defined by the Discord integration.
discord_alerter
Implementation for discord flavor of alerter component.
DiscordAlerter (BaseAlerter)
Send messages to Discord channels.
Source code in zenml/integrations/discord/alerters/discord_alerter.py
class DiscordAlerter(BaseAlerter):
"""Send messages to Discord channels."""
@property
def config(self) -> DiscordAlerterConfig:
"""Returns the `DiscordAlerterConfig` config.
Returns:
The configuration.
"""
return cast(DiscordAlerterConfig, self._config)
def _get_channel_id(
self, params: Optional[BaseAlerterStepParameters] = None
) -> str:
"""Get the Discord channel ID to be used by post/ask.
Args:
params: Optional parameters.
Returns:
ID of the Discord channel to be used.
Raises:
RuntimeError: if config is not of type `BaseAlerterStepConfig`.
ValueError: if a discord channel was neither defined in the config
nor in the discord alerter component.
"""
if params and not isinstance(params, BaseAlerterStepParameters):
raise RuntimeError(
"The config object must be of type `BaseAlerterStepParameters`."
)
if (
params
and isinstance(params, DiscordAlerterParameters)
and hasattr(params, "discord_channel_id")
and params.discord_channel_id is not None
):
return params.discord_channel_id
if self.config.default_discord_channel_id is not None:
return self.config.default_discord_channel_id
raise ValueError(
"Neither the `DiscordAlerterConfig.discord_channel_id` in the runtime "
"configuration, nor the `default_discord_channel_id` in the alerter "
"stack component is specified. Please specify at least one."
)
def _get_approve_msg_options(
self, params: Optional[BaseAlerterStepParameters]
) -> List[str]:
"""Define which messages will lead to approval during ask().
Args:
params: Optional parameters.
Returns:
Set of messages that lead to approval in alerter.ask().
"""
if (
isinstance(params, DiscordAlerterParameters)
and hasattr(params, "approve_msg_options")
and params.approve_msg_options is not None
):
return params.approve_msg_options
return DEFAULT_APPROVE_MSG_OPTIONS
def _get_disapprove_msg_options(
self, params: Optional[BaseAlerterStepParameters]
) -> List[str]:
"""Define which messages will lead to disapproval during ask().
Args:
params: Optional parameters.
Returns:
Set of messages that lead to disapproval in alerter.ask().
"""
if (
isinstance(params, DiscordAlerterParameters)
and hasattr(params, "disapprove_msg_options")
and params.disapprove_msg_options is not None
):
return params.disapprove_msg_options
return DEFAULT_DISAPPROVE_MSG_OPTIONS
def _create_blocks(
self, message: str, params: Optional[BaseAlerterStepParameters]
) -> Optional[Embed]:
"""Helper function to create discord blocks.
Args:
message: message
params: Optional parameters.
Returns:
Discord embed object.
"""
blocks_response = None
if (
isinstance(params, DiscordAlerterParameters)
and hasattr(params, "payload")
and params.payload is not None
):
payload = params.payload
embed = Embed()
embed.set_thumbnail(
url="https://zenml-strapi-media.s3.eu-central-1.amazonaws.com/03_Zen_ML_Logo_Square_White_efefc24ae7.png"
)
# Add fields to the embed
embed.add_field(
name=":star: *Pipeline:*",
value=f"\n{payload.pipeline_name}",
inline=False,
)
embed.add_field(
name=":arrow_forward: *Step:*",
value=f"\n{payload.step_name}",
inline=False,
)
embed.add_field(
name=":ring_buoy: *Stack:*",
value=f"\n{payload.stack_name}",
inline=False,
)
# Add a message field
embed.add_field(
name=":email: *Message:*", value=f"\n{message}", inline=False
)
blocks_response = embed
return blocks_response
def start_client(self, client: Client) -> None:
"""Helper function to start discord client.
Args:
client: discord client object
"""
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop = asyncio.get_event_loop()
timeout_seconds = 60
# Run the bot with a timeout
try:
loop.run_until_complete(
asyncio.wait_for(
client.start(self.config.discord_token),
timeout=timeout_seconds,
)
)
except asyncio.TimeoutError:
logger.error(
"Client connection timed out. please verify the credentials."
)
finally:
# Close the event loop
loop.close()
def post(
self, message: str, params: Optional[BaseAlerterStepParameters] = None
) -> bool:
"""Post a message to a Discord channel.
Args:
message: Message to be posted.
params: Optional parameters.
Returns:
True if operation succeeded, else False
"""
discord_channel_id = self._get_channel_id(params=params)
intents = Intents.default()
intents.message_content = True
client = Client(intents=intents)
embed_blocks = self._create_blocks(message, params)
message_sent = False
@client.event
async def on_ready() -> None:
nonlocal message_sent
try:
channel = client.get_channel(int(discord_channel_id))
if channel:
# Send the message
if embed_blocks:
await channel.send(embed=embed_blocks) # type: ignore
else:
await channel.send(content=message) # type: ignore
message_sent = True
else:
logger.error(
f"Channel with ID {discord_channel_id} not found."
)
except DiscordException as error:
logger.error(f"DiscordAlerter.post() failed: {error}")
finally:
await client.close()
self.start_client(client)
return message_sent
def ask(
self, message: str, params: Optional[BaseAlerterStepParameters] = None
) -> bool:
"""Post a message to a Discord channel and wait for approval.
Args:
message: Initial message to be posted.
params: Optional parameters.
Returns:
True if a user approved the operation, else False
"""
discord_channel_id = self._get_channel_id(params=params)
intents = Intents.default()
intents.message_content = True
client = Client(intents=intents)
embed_blocks = self._create_blocks(message, params)
approved = False # will be modified by check()
@client.event
async def on_ready() -> None:
try:
channel = client.get_channel(int(discord_channel_id))
if channel:
# Send the message
if embed_blocks:
await channel.send(embed=embed_blocks) # type: ignore
else:
await channel.send(content=message) # type: ignore
def check(message: Message) -> bool:
if message.channel == channel:
if (
message.content
in self._get_approve_msg_options(params)
):
nonlocal approved
approved = True
return True
elif (
message.content
in self._get_disapprove_msg_options(params)
):
return True
return False
await client.wait_for("message", check=check)
else:
logger.error(
f"Channel with ID {discord_channel_id} not found."
)
except DiscordException as error:
logger.error(f"DiscordAlerter.ask() failed: {error}")
finally:
await client.close()
self.start_client(client)
return approved
config: DiscordAlerterConfig
property
readonly
Returns the DiscordAlerterConfig
config.
Returns:
Type | Description |
---|---|
DiscordAlerterConfig |
The configuration. |
ask(self, message, params=None)
Post a message to a Discord channel and wait for approval.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
str |
Initial message to be posted. |
required |
params |
Optional[zenml.alerter.base_alerter.BaseAlerterStepParameters] |
Optional parameters. |
None |
Returns:
Type | Description |
---|---|
bool |
True if a user approved the operation, else False |
Source code in zenml/integrations/discord/alerters/discord_alerter.py
def ask(
self, message: str, params: Optional[BaseAlerterStepParameters] = None
) -> bool:
"""Post a message to a Discord channel and wait for approval.
Args:
message: Initial message to be posted.
params: Optional parameters.
Returns:
True if a user approved the operation, else False
"""
discord_channel_id = self._get_channel_id(params=params)
intents = Intents.default()
intents.message_content = True
client = Client(intents=intents)
embed_blocks = self._create_blocks(message, params)
approved = False # will be modified by check()
@client.event
async def on_ready() -> None:
try:
channel = client.get_channel(int(discord_channel_id))
if channel:
# Send the message
if embed_blocks:
await channel.send(embed=embed_blocks) # type: ignore
else:
await channel.send(content=message) # type: ignore
def check(message: Message) -> bool:
if message.channel == channel:
if (
message.content
in self._get_approve_msg_options(params)
):
nonlocal approved
approved = True
return True
elif (
message.content
in self._get_disapprove_msg_options(params)
):
return True
return False
await client.wait_for("message", check=check)
else:
logger.error(
f"Channel with ID {discord_channel_id} not found."
)
except DiscordException as error:
logger.error(f"DiscordAlerter.ask() failed: {error}")
finally:
await client.close()
self.start_client(client)
return approved
post(self, message, params=None)
Post a message to a Discord channel.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
str |
Message to be posted. |
required |
params |
Optional[zenml.alerter.base_alerter.BaseAlerterStepParameters] |
Optional parameters. |
None |
Returns:
Type | Description |
---|---|
bool |
True if operation succeeded, else False |
Source code in zenml/integrations/discord/alerters/discord_alerter.py
def post(
self, message: str, params: Optional[BaseAlerterStepParameters] = None
) -> bool:
"""Post a message to a Discord channel.
Args:
message: Message to be posted.
params: Optional parameters.
Returns:
True if operation succeeded, else False
"""
discord_channel_id = self._get_channel_id(params=params)
intents = Intents.default()
intents.message_content = True
client = Client(intents=intents)
embed_blocks = self._create_blocks(message, params)
message_sent = False
@client.event
async def on_ready() -> None:
nonlocal message_sent
try:
channel = client.get_channel(int(discord_channel_id))
if channel:
# Send the message
if embed_blocks:
await channel.send(embed=embed_blocks) # type: ignore
else:
await channel.send(content=message) # type: ignore
message_sent = True
else:
logger.error(
f"Channel with ID {discord_channel_id} not found."
)
except DiscordException as error:
logger.error(f"DiscordAlerter.post() failed: {error}")
finally:
await client.close()
self.start_client(client)
return message_sent
start_client(self, client)
Helper function to start discord client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
client |
Client |
discord client object |
required |
Source code in zenml/integrations/discord/alerters/discord_alerter.py
def start_client(self, client: Client) -> None:
"""Helper function to start discord client.
Args:
client: discord client object
"""
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop = asyncio.get_event_loop()
timeout_seconds = 60
# Run the bot with a timeout
try:
loop.run_until_complete(
asyncio.wait_for(
client.start(self.config.discord_token),
timeout=timeout_seconds,
)
)
except asyncio.TimeoutError:
logger.error(
"Client connection timed out. please verify the credentials."
)
finally:
# Close the event loop
loop.close()
DiscordAlerterParameters (BaseAlerterStepParameters)
pydantic-model
Discord alerter parameters.
Source code in zenml/integrations/discord/alerters/discord_alerter.py
class DiscordAlerterParameters(BaseAlerterStepParameters):
"""Discord alerter parameters."""
# The ID of the Discord channel to use for communication.
discord_channel_id: Optional[str] = None
# Set of messages that lead to approval in alerter.ask()
approve_msg_options: Optional[List[str]] = None
# Set of messages that lead to disapproval in alerter.ask()
disapprove_msg_options: Optional[List[str]] = None
payload: Optional[DiscordAlerterPayload] = None
include_format_blocks: Optional[bool] = True
DiscordAlerterPayload (BaseModel)
pydantic-model
Discord alerter payload implementation.
Source code in zenml/integrations/discord/alerters/discord_alerter.py
class DiscordAlerterPayload(BaseModel):
"""Discord alerter payload implementation."""
pipeline_name: Optional[str] = None
step_name: Optional[str] = None
stack_name: Optional[str] = None
flavors
special
Discord integration flavors.
discord_alerter_flavor
Discord alerter flavor.
DiscordAlerterConfig (BaseAlerterConfig)
pydantic-model
Discord alerter config.
Attributes:
Name | Type | Description |
---|---|---|
discord_token |
str |
The Discord token tied to the Discord account to be used. |
default_discord_channel_id |
Optional[str] |
The ID of the Discord channel to use for communication if no channel ID is provided in the step config. |
Source code in zenml/integrations/discord/flavors/discord_alerter_flavor.py
class DiscordAlerterConfig(BaseAlerterConfig):
"""Discord alerter config.
Attributes:
discord_token: The Discord token tied to the Discord account to be used.
default_discord_channel_id: The ID of the Discord channel to use for
communication if no channel ID is provided in the step config.
"""
discord_token: str = SecretField()
default_discord_channel_id: Optional[str] = None # TODO: Potential setting
@property
def is_valid(self) -> bool:
"""Check if the stack component is valid.
Returns:
True if the stack component is valid, False otherwise.
"""
try:
from discord import Client, DiscordException, Intents
except ImportError:
logger.warning(
"Unable to validate Discord alerter credentials because the Discord integration is not installed."
)
return True
intents = Intents.default()
intents.message_content = True
client = Client(intents=intents)
valid = False
try:
# Check discord token validity
@client.event
async def on_ready() -> None:
nonlocal valid
try:
if self.default_discord_channel_id:
channel = client.get_channel(
int(self.default_discord_channel_id)
)
if channel:
valid = True
else:
valid = True
finally:
await client.close()
client.run(self.discord_token)
except DiscordException as e:
logger.error("Discord API Error:", e)
except ValueError as ve:
logger.error("Value Error:", ve)
return valid
is_valid: bool
property
readonly
Check if the stack component is valid.
Returns:
Type | Description |
---|---|
bool |
True if the stack component is valid, False otherwise. |
DiscordAlerterFlavor (BaseAlerterFlavor)
Discord alerter flavor.
Source code in zenml/integrations/discord/flavors/discord_alerter_flavor.py
class DiscordAlerterFlavor(BaseAlerterFlavor):
"""Discord alerter flavor."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return DISCORD_ALERTER_FLAVOR
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/alerter/discord.png"
@property
def config_class(self) -> Type[DiscordAlerterConfig]:
"""Returns `DiscordAlerterConfig` config class.
Returns:
The config class.
"""
return DiscordAlerterConfig
@property
def implementation_class(self) -> Type["DiscordAlerter"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.discord.alerters import DiscordAlerter
return DiscordAlerter
config_class: Type[zenml.integrations.discord.flavors.discord_alerter_flavor.DiscordAlerterConfig]
property
readonly
Returns DiscordAlerterConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.discord.flavors.discord_alerter_flavor.DiscordAlerterConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[DiscordAlerter]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[DiscordAlerter] |
The implementation class. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
steps
special
Built-in steps for the Discord integration.
discord_alerter_ask_step
Step that allows you to send messages to Discord and wait for a response.
discord_alerter_post_step
Step that allows you to post messages to Discord.