Slack
zenml.integrations.slack
special
Slack integration for alerter components.
SlackIntegration (Integration)
Definition of a Slack integration for ZenML.
Implemented using Slack SDK.
Source code in zenml/integrations/slack/__init__.py
class SlackIntegration(Integration):
"""Definition of a Slack integration for ZenML.
Implemented using [Slack SDK](https://pypi.org/project/slack-sdk/).
"""
NAME = SLACK
REQUIREMENTS = ["slack-sdk>=3.16.1", "aiohttp>=3.8.1"]
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Slack integration.
Returns:
List of new flavors defined by the Slack integration.
"""
from zenml.integrations.slack.flavors import SlackAlerterFlavor
return [SlackAlerterFlavor]
flavors()
classmethod
Declare the stack component flavors for the Slack integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of new flavors defined by the Slack integration. |
Source code in zenml/integrations/slack/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Slack integration.
Returns:
List of new flavors defined by the Slack integration.
"""
from zenml.integrations.slack.flavors import SlackAlerterFlavor
return [SlackAlerterFlavor]
alerters
special
Alerter components defined by the Slack integration.
slack_alerter
Implementation for slack flavor of alerter component.
SlackAlerter (BaseAlerter)
Send messages to Slack channels.
Source code in zenml/integrations/slack/alerters/slack_alerter.py
class SlackAlerter(BaseAlerter):
"""Send messages to Slack channels."""
@property
def config(self) -> SlackAlerterConfig:
"""Returns the `SlackAlerterConfig` config.
Returns:
The configuration.
"""
return cast(SlackAlerterConfig, self._config)
def _get_channel_id(
self, params: Optional[BaseAlerterStepParameters] = None
) -> str:
"""Get the Slack channel ID to be used by post/ask.
Args:
params: Optional parameters.
Returns:
ID of the Slack channel to be used.
Raises:
RuntimeError: if config is not of type `BaseAlerterStepConfig`.
ValueError: if a slack channel was neither defined in the config
nor in the slack alerter component.
"""
if params and not isinstance(params, BaseAlerterStepParameters):
raise RuntimeError(
"The config object must be of type `BaseAlerterStepParameters`."
)
if (
params
and isinstance(params, SlackAlerterParameters)
and hasattr(params, "slack_channel_id")
and params.slack_channel_id is not None
):
return params.slack_channel_id
if self.config.default_slack_channel_id is not None:
return self.config.default_slack_channel_id
raise ValueError(
"Neither the `SlackAlerterConfig.slack_channel_id` in the runtime "
"configuration, nor the `default_slack_channel_id` in the alerter "
"stack component is specified. Please specify at least one."
)
def _get_approve_msg_options(
self, params: Optional[BaseAlerterStepParameters]
) -> List[str]:
"""Define which messages will lead to approval during ask().
Args:
params: Optional parameters.
Returns:
Set of messages that lead to approval in alerter.ask().
"""
if (
isinstance(params, SlackAlerterParameters)
and hasattr(params, "approve_msg_options")
and params.approve_msg_options is not None
):
return params.approve_msg_options
return DEFAULT_APPROVE_MSG_OPTIONS
def _get_disapprove_msg_options(
self, params: Optional[BaseAlerterStepParameters]
) -> List[str]:
"""Define which messages will lead to disapproval during ask().
Args:
params: Optional parameters.
Returns:
Set of messages that lead to disapproval in alerter.ask().
"""
if (
isinstance(params, SlackAlerterParameters)
and hasattr(params, "disapprove_msg_options")
and params.disapprove_msg_options is not None
):
return params.disapprove_msg_options
return DEFAULT_DISAPPROVE_MSG_OPTIONS
def _create_blocks(
self, message: str, params: Optional[BaseAlerterStepParameters]
) -> List[Dict]: # type: ignore
"""Helper function to create slack blocks.
Args:
message: message
params: Optional parameters.
Returns:
List of slack blocks.
"""
if (
isinstance(params, SlackAlerterParameters)
and hasattr(params, "payload")
and params.payload is not None
):
payload = params.payload
return [
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": f":star: *Pipeline:*\n{payload.pipeline_name}",
},
{
"type": "mrkdwn",
"text": f":arrow_forward: *Step:*\n{payload.step_name}",
},
{
"type": "mrkdwn",
"text": f":ring_buoy: *Stack:*\n{payload.stack_name}",
},
],
"accessory": {
"type": "image",
"image_url": "https://zenml-strapi-media.s3.eu-central-1.amazonaws.com/03_Zen_ML_Logo_Square_White_efefc24ae7.png",
"alt_text": "zenml logo",
},
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": f":email: *Message:*\n{message}",
},
],
},
]
return []
def post(
self, message: str, params: Optional[BaseAlerterStepParameters] = None
) -> bool:
"""Post a message to a Slack channel.
Args:
message: Message to be posted.
params: Optional parameters.
Returns:
True if operation succeeded, else False
"""
slack_channel_id = self._get_channel_id(params=params)
client = WebClient(token=self.config.slack_token)
blocks = self._create_blocks(message, params)
try:
response = client.chat_postMessage(
channel=slack_channel_id, text=message, blocks=blocks
)
return True
except SlackApiError as error:
response = error.response["error"]
logger.error(f"SlackAlerter.post() failed: {response}")
return False
def ask(
self, message: str, params: Optional[BaseAlerterStepParameters] = None
) -> bool:
"""Post a message to a Slack channel and wait for approval.
Args:
message: Initial message to be posted.
params: Optional parameters.
Returns:
True if a user approved the operation, else False
"""
rtm = RTMClient(token=self.config.slack_token)
slack_channel_id = self._get_channel_id(params=params)
approved = False # will be modified by handle()
@RTMClient.run_on(event="hello") # type: ignore
def post_initial_message(**payload: Any) -> None:
"""Post an initial message in a channel and start listening.
Args:
payload: payload of the received Slack event.
"""
web_client = payload["web_client"]
blocks = self._create_blocks(message, params)
web_client.chat_postMessage(
channel=slack_channel_id, text=message, blocks=blocks
)
@RTMClient.run_on(event="message") # type: ignore
def handle(**payload: Any) -> None:
"""Listen / handle messages posted in the channel.
Args:
payload: payload of the received Slack event.
"""
event = payload["data"]
if event["channel"] == slack_channel_id:
# approve request (return True)
if event["text"] in self._get_approve_msg_options(params):
print(f"User {event['user']} approved on slack.")
nonlocal approved
approved = True
rtm.stop() # type: ignore[no-untyped-call]
# disapprove request (return False)
elif event["text"] in self._get_disapprove_msg_options(params):
print(f"User {event['user']} disapproved on slack.")
rtm.stop() # type: ignore[no-untyped-call]
# start another thread until `rtm.stop()` is called in handle()
rtm.start()
return approved
config: SlackAlerterConfig
property
readonly
Returns the SlackAlerterConfig
config.
Returns:
Type | Description |
---|---|
SlackAlerterConfig |
The configuration. |
ask(self, message, params=None)
Post a message to a Slack channel and wait for approval.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
str |
Initial message to be posted. |
required |
params |
Optional[zenml.alerter.base_alerter.BaseAlerterStepParameters] |
Optional parameters. |
None |
Returns:
Type | Description |
---|---|
bool |
True if a user approved the operation, else False |
Source code in zenml/integrations/slack/alerters/slack_alerter.py
def ask(
self, message: str, params: Optional[BaseAlerterStepParameters] = None
) -> bool:
"""Post a message to a Slack channel and wait for approval.
Args:
message: Initial message to be posted.
params: Optional parameters.
Returns:
True if a user approved the operation, else False
"""
rtm = RTMClient(token=self.config.slack_token)
slack_channel_id = self._get_channel_id(params=params)
approved = False # will be modified by handle()
@RTMClient.run_on(event="hello") # type: ignore
def post_initial_message(**payload: Any) -> None:
"""Post an initial message in a channel and start listening.
Args:
payload: payload of the received Slack event.
"""
web_client = payload["web_client"]
blocks = self._create_blocks(message, params)
web_client.chat_postMessage(
channel=slack_channel_id, text=message, blocks=blocks
)
@RTMClient.run_on(event="message") # type: ignore
def handle(**payload: Any) -> None:
"""Listen / handle messages posted in the channel.
Args:
payload: payload of the received Slack event.
"""
event = payload["data"]
if event["channel"] == slack_channel_id:
# approve request (return True)
if event["text"] in self._get_approve_msg_options(params):
print(f"User {event['user']} approved on slack.")
nonlocal approved
approved = True
rtm.stop() # type: ignore[no-untyped-call]
# disapprove request (return False)
elif event["text"] in self._get_disapprove_msg_options(params):
print(f"User {event['user']} disapproved on slack.")
rtm.stop() # type: ignore[no-untyped-call]
# start another thread until `rtm.stop()` is called in handle()
rtm.start()
return approved
post(self, message, params=None)
Post a message to a Slack channel.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
str |
Message to be posted. |
required |
params |
Optional[zenml.alerter.base_alerter.BaseAlerterStepParameters] |
Optional parameters. |
None |
Returns:
Type | Description |
---|---|
bool |
True if operation succeeded, else False |
Source code in zenml/integrations/slack/alerters/slack_alerter.py
def post(
self, message: str, params: Optional[BaseAlerterStepParameters] = None
) -> bool:
"""Post a message to a Slack channel.
Args:
message: Message to be posted.
params: Optional parameters.
Returns:
True if operation succeeded, else False
"""
slack_channel_id = self._get_channel_id(params=params)
client = WebClient(token=self.config.slack_token)
blocks = self._create_blocks(message, params)
try:
response = client.chat_postMessage(
channel=slack_channel_id, text=message, blocks=blocks
)
return True
except SlackApiError as error:
response = error.response["error"]
logger.error(f"SlackAlerter.post() failed: {response}")
return False
SlackAlerterParameters (BaseAlerterStepParameters)
pydantic-model
Slack alerter parameters.
Source code in zenml/integrations/slack/alerters/slack_alerter.py
class SlackAlerterParameters(BaseAlerterStepParameters):
"""Slack alerter parameters."""
# The ID of the Slack channel to use for communication.
slack_channel_id: Optional[str] = None
# Set of messages that lead to approval in alerter.ask()
approve_msg_options: Optional[List[str]] = None
# Set of messages that lead to disapproval in alerter.ask()
disapprove_msg_options: Optional[List[str]] = None
payload: Optional[SlackAlerterPayload] = None
include_format_blocks: Optional[bool] = True
SlackAlerterPayload (BaseModel)
pydantic-model
Slack alerter payload implementation.
Source code in zenml/integrations/slack/alerters/slack_alerter.py
class SlackAlerterPayload(BaseModel):
"""Slack alerter payload implementation."""
pipeline_name: Optional[str] = None
step_name: Optional[str] = None
stack_name: Optional[str] = None
flavors
special
Slack integration flavors.
slack_alerter_flavor
Slack alerter flavor.
SlackAlerterConfig (BaseAlerterConfig)
pydantic-model
Slack alerter config.
Attributes:
Name | Type | Description |
---|---|---|
slack_token |
str |
The Slack token tied to the Slack account to be used. |
default_slack_channel_id |
Optional[str] |
The ID of the Slack channel to use for communication if no channel ID is provided in the step config. |
Source code in zenml/integrations/slack/flavors/slack_alerter_flavor.py
class SlackAlerterConfig(BaseAlerterConfig):
"""Slack alerter config.
Attributes:
slack_token: The Slack token tied to the Slack account to be used.
default_slack_channel_id: The ID of the Slack channel to use for
communication if no channel ID is provided in the step config.
"""
slack_token: str = SecretField()
default_slack_channel_id: Optional[str] = None # TODO: Potential setting
@property
def is_valid(self) -> bool:
"""Check if the stack component is valid.
Returns:
True if the stack component is valid, False otherwise.
"""
try:
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
except ImportError:
logger.warning(
"Unable to validate Slack alerter credentials because the Slack integration is not installed."
)
return True
client = WebClient(token=self.slack_token)
try:
# Check slack token validity
response = client.auth_test()
if not response["ok"]:
return False
if self.default_slack_channel_id:
# Check channel validity
response = client.conversations_info(
channel=self.default_slack_channel_id
)
valid: bool = response["ok"]
return valid
except SlackApiError as e:
logger.error("Slack API Error:", e.response["error"])
return False
is_valid: bool
property
readonly
Check if the stack component is valid.
Returns:
Type | Description |
---|---|
bool |
True if the stack component is valid, False otherwise. |
SlackAlerterFlavor (BaseAlerterFlavor)
Slack alerter flavor.
Source code in zenml/integrations/slack/flavors/slack_alerter_flavor.py
class SlackAlerterFlavor(BaseAlerterFlavor):
"""Slack alerter flavor."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return SLACK_ALERTER_FLAVOR
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/alerter/slack.png"
@property
def config_class(self) -> Type[SlackAlerterConfig]:
"""Returns `SlackAlerterConfig` config class.
Returns:
The config class.
"""
return SlackAlerterConfig
@property
def implementation_class(self) -> Type["SlackAlerter"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.slack.alerters import SlackAlerter
return SlackAlerter
config_class: Type[zenml.integrations.slack.flavors.slack_alerter_flavor.SlackAlerterConfig]
property
readonly
Returns SlackAlerterConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.slack.flavors.slack_alerter_flavor.SlackAlerterConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[SlackAlerter]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[SlackAlerter] |
The implementation class. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
steps
special
Built-in steps for the Slack integration.
slack_alerter_ask_step
Step that allows you to send messages to Slack and wait for a response.
slack_alerter_post_step
Step that allows you to post messages to Slack.