Slack
zenml.integrations.slack
special
Slack integration for alerter components.
SlackIntegration (Integration)
Definition of a Slack integration for ZenML.
Implemented using Slack SDK.
Source code in zenml/integrations/slack/__init__.py
class SlackIntegration(Integration):
"""Definition of a Slack integration for ZenML.
Implemented using [Slack SDK](https://pypi.org/project/slack-sdk/).
"""
NAME = SLACK
REQUIREMENTS = ["slack-sdk>=3.16.1", "aiohttp>=3.8.1"]
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Slack integration.
Returns:
List of new flavors defined by the Slack integration.
"""
from zenml.integrations.slack.flavors import SlackAlerterFlavor
return [SlackAlerterFlavor]
flavors()
classmethod
Declare the stack component flavors for the Slack integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of new flavors defined by the Slack integration. |
Source code in zenml/integrations/slack/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Slack integration.
Returns:
List of new flavors defined by the Slack integration.
"""
from zenml.integrations.slack.flavors import SlackAlerterFlavor
return [SlackAlerterFlavor]
alerters
special
Alerter components defined by the Slack integration.
slack_alerter
Implementation for slack flavor of alerter component.
SlackAlerter (BaseAlerter)
Send messages to Slack channels.
Source code in zenml/integrations/slack/alerters/slack_alerter.py
class SlackAlerter(BaseAlerter):
"""Send messages to Slack channels."""
@property
def config(self) -> SlackAlerterConfig:
"""Returns the `SlackAlerterConfig` config.
Returns:
The configuration.
"""
return cast(SlackAlerterConfig, self._config)
def _get_channel_id(
self, params: Optional[BaseAlerterStepParameters]
) -> str:
"""Get the Slack channel ID to be used by post/ask.
Args:
params: Optional parameters.
Returns:
ID of the Slack channel to be used.
Raises:
RuntimeError: if config is not of type `BaseAlerterStepConfig`.
ValueError: if a slack channel was neither defined in the config
nor in the slack alerter component.
"""
if not isinstance(params, BaseAlerterStepParameters):
raise RuntimeError(
"The config object must be of type `BaseAlerterStepParameters`."
)
if (
isinstance(params, SlackAlerterParameters)
and hasattr(params, "slack_channel_id")
and params.slack_channel_id is not None
):
return params.slack_channel_id
if self.config.default_slack_channel_id is not None:
return self.config.default_slack_channel_id
raise ValueError(
"Neither the `SlackAlerterConfig.slack_channel_id` in the runtime "
"configuration, nor the `default_slack_channel_id` in the alerter "
"stack component is specified. Please specify at least one."
)
def _get_approve_msg_options(
self, params: Optional[BaseAlerterStepParameters]
) -> List[str]:
"""Define which messages will lead to approval during ask().
Args:
params: Optional parameters.
Returns:
Set of messages that lead to approval in alerter.ask().
"""
if (
isinstance(params, SlackAlerterParameters)
and hasattr(params, "approve_msg_options")
and params.approve_msg_options is not None
):
return params.approve_msg_options
return DEFAULT_APPROVE_MSG_OPTIONS
def _get_disapprove_msg_options(
self, params: Optional[BaseAlerterStepParameters]
) -> List[str]:
"""Define which messages will lead to disapproval during ask().
Args:
params: Optional parameters.
Returns:
Set of messages that lead to disapproval in alerter.ask().
"""
if (
isinstance(params, SlackAlerterParameters)
and hasattr(params, "disapprove_msg_options")
and params.disapprove_msg_options is not None
):
return params.disapprove_msg_options
return DEFAULT_DISAPPROVE_MSG_OPTIONS
def post(
self, message: str, params: Optional[BaseAlerterStepParameters]
) -> bool:
"""Post a message to a Slack channel.
Args:
message: Message to be posted.
params: Optional parameters.
Returns:
True if operation succeeded, else False
"""
slack_channel_id = self._get_channel_id(params=params)
client = WebClient(token=self.config.slack_token)
try:
response = client.chat_postMessage(
channel=slack_channel_id,
text=message,
)
return True
except SlackApiError as error:
response = error.response["error"]
logger.error(f"SlackAlerter.post() failed: {response}")
return False
def ask(
self, message: str, params: Optional[BaseAlerterStepParameters]
) -> bool:
"""Post a message to a Slack channel and wait for approval.
Args:
message: Initial message to be posted.
params: Optional parameters.
Returns:
True if a user approved the operation, else False
"""
rtm = RTMClient(token=self.config.slack_token)
slack_channel_id = self._get_channel_id(params=params)
approved = False # will be modified by handle()
@RTMClient.run_on(event="hello") # type: ignore
def post_initial_message(**payload: Any) -> None:
"""Post an initial message in a channel and start listening.
Args:
payload: payload of the received Slack event.
"""
web_client = payload["web_client"]
web_client.chat_postMessage(channel=slack_channel_id, text=message)
@RTMClient.run_on(event="message") # type: ignore
def handle(**payload: Any) -> None:
"""Listen / handle messages posted in the channel.
Args:
payload: payload of the received Slack event.
"""
event = payload["data"]
if event["channel"] == slack_channel_id:
# approve request (return True)
if event["text"] in self._get_approve_msg_options(params):
print(f"User {event['user']} approved on slack.")
nonlocal approved
approved = True
rtm.stop() # type: ignore
# disapprove request (return False)
elif event["text"] in self._get_disapprove_msg_options(params):
print(f"User {event['user']} disapproved on slack.")
rtm.stop() # type:ignore
# start another thread until `rtm.stop()` is called in handle()
rtm.start()
return approved
config: SlackAlerterConfig
property
readonly
Returns the SlackAlerterConfig
config.
Returns:
Type | Description |
---|---|
SlackAlerterConfig |
The configuration. |
ask(self, message, params)
Post a message to a Slack channel and wait for approval.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
str |
Initial message to be posted. |
required |
params |
Optional[zenml.steps.step_interfaces.base_alerter_step.BaseAlerterStepParameters] |
Optional parameters. |
required |
Returns:
Type | Description |
---|---|
bool |
True if a user approved the operation, else False |
Source code in zenml/integrations/slack/alerters/slack_alerter.py
def ask(
self, message: str, params: Optional[BaseAlerterStepParameters]
) -> bool:
"""Post a message to a Slack channel and wait for approval.
Args:
message: Initial message to be posted.
params: Optional parameters.
Returns:
True if a user approved the operation, else False
"""
rtm = RTMClient(token=self.config.slack_token)
slack_channel_id = self._get_channel_id(params=params)
approved = False # will be modified by handle()
@RTMClient.run_on(event="hello") # type: ignore
def post_initial_message(**payload: Any) -> None:
"""Post an initial message in a channel and start listening.
Args:
payload: payload of the received Slack event.
"""
web_client = payload["web_client"]
web_client.chat_postMessage(channel=slack_channel_id, text=message)
@RTMClient.run_on(event="message") # type: ignore
def handle(**payload: Any) -> None:
"""Listen / handle messages posted in the channel.
Args:
payload: payload of the received Slack event.
"""
event = payload["data"]
if event["channel"] == slack_channel_id:
# approve request (return True)
if event["text"] in self._get_approve_msg_options(params):
print(f"User {event['user']} approved on slack.")
nonlocal approved
approved = True
rtm.stop() # type: ignore
# disapprove request (return False)
elif event["text"] in self._get_disapprove_msg_options(params):
print(f"User {event['user']} disapproved on slack.")
rtm.stop() # type:ignore
# start another thread until `rtm.stop()` is called in handle()
rtm.start()
return approved
post(self, message, params)
Post a message to a Slack channel.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
str |
Message to be posted. |
required |
params |
Optional[zenml.steps.step_interfaces.base_alerter_step.BaseAlerterStepParameters] |
Optional parameters. |
required |
Returns:
Type | Description |
---|---|
bool |
True if operation succeeded, else False |
Source code in zenml/integrations/slack/alerters/slack_alerter.py
def post(
self, message: str, params: Optional[BaseAlerterStepParameters]
) -> bool:
"""Post a message to a Slack channel.
Args:
message: Message to be posted.
params: Optional parameters.
Returns:
True if operation succeeded, else False
"""
slack_channel_id = self._get_channel_id(params=params)
client = WebClient(token=self.config.slack_token)
try:
response = client.chat_postMessage(
channel=slack_channel_id,
text=message,
)
return True
except SlackApiError as error:
response = error.response["error"]
logger.error(f"SlackAlerter.post() failed: {response}")
return False
SlackAlerterParameters (BaseAlerterStepParameters)
pydantic-model
Slack alerter parameters.
Source code in zenml/integrations/slack/alerters/slack_alerter.py
class SlackAlerterParameters(BaseAlerterStepParameters):
"""Slack alerter parameters."""
# The ID of the Slack channel to use for communication.
slack_channel_id: Optional[str] = None
# Set of messages that lead to approval in alerter.ask()
approve_msg_options: Optional[List[str]] = None
# Set of messages that lead to disapproval in alerter.ask()
disapprove_msg_options: Optional[List[str]] = None
flavors
special
Slack integration flavors.
slack_alerter_flavor
Slack alerter flavor.
SlackAlerterConfig (BaseAlerterConfig)
pydantic-model
Slack alerter config.
Attributes:
Name | Type | Description |
---|---|---|
slack_token |
str |
The Slack token tied to the Slack account to be used. |
default_slack_channel_id |
Optional[str] |
The ID of the Slack channel to use for communication if no channel ID is provided in the step config. |
Source code in zenml/integrations/slack/flavors/slack_alerter_flavor.py
class SlackAlerterConfig(BaseAlerterConfig):
"""Slack alerter config.
Attributes:
slack_token: The Slack token tied to the Slack account to be used.
default_slack_channel_id: The ID of the Slack channel to use for
communication if no channel ID is provided in the step config.
"""
slack_token: str = SecretField()
default_slack_channel_id: Optional[str] = None
SlackAlerterFlavor (BaseAlerterFlavor)
Slack alerter flavor.
Source code in zenml/integrations/slack/flavors/slack_alerter_flavor.py
class SlackAlerterFlavor(BaseAlerterFlavor):
"""Slack alerter flavor."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return SLACK_ALERTER_FLAVOR
@property
def config_class(self) -> Type[SlackAlerterConfig]:
"""Returns `SlackAlerterConfig` config class.
Returns:
The config class.
"""
return SlackAlerterConfig
@property
def implementation_class(self) -> Type["SlackAlerter"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.slack.alerters import SlackAlerter
return SlackAlerter
config_class: Type[zenml.integrations.slack.flavors.slack_alerter_flavor.SlackAlerterConfig]
property
readonly
Returns SlackAlerterConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.slack.flavors.slack_alerter_flavor.SlackAlerterConfig] |
The config class. |
implementation_class: Type[SlackAlerter]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[SlackAlerter] |
The implementation class. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
steps
special
Built-in steps for the Slack integration.
slack_alerter_ask_step
Step that allows you to send messages to Slack and wait for a response.
slack_alerter_ask_step (BaseStep)
Posts a message to the Slack alerter component and waits for approval.
This can be useful, e.g. to easily get a human in the loop before deploying models.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
params |
Parameters for the Slack alerter. |
required | |
context |
StepContext of the ZenML repository. |
required | |
message |
Initial message to be posted. |
required |
Returns:
Type | Description |
---|---|
True if a user approved the operation, else False. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If currently active alerter is not a |
PARAMETERS_CLASS (BaseAlerterStepParameters)
pydantic-model
Slack alerter parameters.
Source code in zenml/integrations/slack/steps/slack_alerter_ask_step.py
class SlackAlerterParameters(BaseAlerterStepParameters):
"""Slack alerter parameters."""
# The ID of the Slack channel to use for communication.
slack_channel_id: Optional[str] = None
# Set of messages that lead to approval in alerter.ask()
approve_msg_options: Optional[List[str]] = None
# Set of messages that lead to disapproval in alerter.ask()
disapprove_msg_options: Optional[List[str]] = None
entrypoint(params, context, message)
staticmethod
Posts a message to the Slack alerter component and waits for approval.
This can be useful, e.g. to easily get a human in the loop before deploying models.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
params |
SlackAlerterParameters |
Parameters for the Slack alerter. |
required |
context |
StepContext |
StepContext of the ZenML repository. |
required |
message |
str |
Initial message to be posted. |
required |
Returns:
Type | Description |
---|---|
bool |
True if a user approved the operation, else False. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If currently active alerter is not a |
Source code in zenml/integrations/slack/steps/slack_alerter_ask_step.py
@step
def slack_alerter_ask_step(
params: SlackAlerterParameters, context: StepContext, message: str
) -> bool:
"""Posts a message to the Slack alerter component and waits for approval.
This can be useful, e.g. to easily get a human in the loop before
deploying models.
Args:
params: Parameters for the Slack alerter.
context: StepContext of the ZenML repository.
message: Initial message to be posted.
Returns:
True if a user approved the operation, else False.
Raises:
RuntimeError: If currently active alerter is not a `SlackAlerter`.
"""
alerter = get_active_alerter(context)
if not isinstance(alerter, SlackAlerter):
# TODO: potential duplicate code for other components
# -> generalize to `check_component_flavor()` utility function?
raise RuntimeError(
"Step `slack_alerter_ask_step` requires an alerter component of "
"flavor `slack`, but the currently active alerter is of type "
f"{type(alerter)}, which is not a subclass of `SlackAlerter`."
)
return alerter.ask(message, params)
slack_alerter_post_step
Step that allows you to post messages to Slack.
slack_alerter_post_step (BaseStep)
Post a message to the Slack alerter component of the active stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
params |
Parameters for the Slack alerter. |
required | |
context |
StepContext of the ZenML repository. |
required | |
message |
Message to be posted. |
required |
Returns:
Type | Description |
---|---|
True if operation succeeded, else False. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If currently active alerter is not a |
PARAMETERS_CLASS (BaseAlerterStepParameters)
pydantic-model
Slack alerter parameters.
Source code in zenml/integrations/slack/steps/slack_alerter_post_step.py
class SlackAlerterParameters(BaseAlerterStepParameters):
"""Slack alerter parameters."""
# The ID of the Slack channel to use for communication.
slack_channel_id: Optional[str] = None
# Set of messages that lead to approval in alerter.ask()
approve_msg_options: Optional[List[str]] = None
# Set of messages that lead to disapproval in alerter.ask()
disapprove_msg_options: Optional[List[str]] = None
entrypoint(params, context, message)
staticmethod
Post a message to the Slack alerter component of the active stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
params |
SlackAlerterParameters |
Parameters for the Slack alerter. |
required |
context |
StepContext |
StepContext of the ZenML repository. |
required |
message |
str |
Message to be posted. |
required |
Returns:
Type | Description |
---|---|
bool |
True if operation succeeded, else False. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If currently active alerter is not a |
Source code in zenml/integrations/slack/steps/slack_alerter_post_step.py
@step
def slack_alerter_post_step(
params: SlackAlerterParameters, context: StepContext, message: str
) -> bool:
"""Post a message to the Slack alerter component of the active stack.
Args:
params: Parameters for the Slack alerter.
context: StepContext of the ZenML repository.
message: Message to be posted.
Returns:
True if operation succeeded, else False.
Raises:
RuntimeError: If currently active alerter is not a `SlackAlerter`.
"""
alerter = get_active_alerter(context)
if not isinstance(alerter, SlackAlerter):
raise RuntimeError(
"Step `slack_alerter_post_step` requires an alerter component of "
"flavor `slack`, but the currently active alerter is of type "
f"{type(alerter)}, which is not a subclass of `SlackAlerter`."
)
return alerter.post(message, params)