Skip to content

Slack

zenml.integrations.slack special

Slack integration for alerter components.

SlackIntegration (Integration)

Definition of a Slack integration for ZenML.

Implemented using Slack SDK.

Source code in zenml/integrations/slack/__init__.py
class SlackIntegration(Integration):
    """Definition of a Slack integration for ZenML.

    Implemented using [Slack SDK](https://pypi.org/project/slack-sdk/).
    """

    NAME = SLACK
    REQUIREMENTS = ["slack-sdk>=3.16.1", "aiohttp>=3.8.1"]

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Slack integration.

        Returns:
            List of new flavors defined by the Slack integration.
        """
        from zenml.integrations.slack.flavors import SlackAlerterFlavor

        return [SlackAlerterFlavor]

flavors() classmethod

Declare the stack component flavors for the Slack integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of new flavors defined by the Slack integration.

Source code in zenml/integrations/slack/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Slack integration.

    Returns:
        List of new flavors defined by the Slack integration.
    """
    from zenml.integrations.slack.flavors import SlackAlerterFlavor

    return [SlackAlerterFlavor]

alerters special

Alerter components defined by the Slack integration.

slack_alerter

Implementation for slack flavor of alerter component.

SlackAlerter (BaseAlerter)

Send messages to Slack channels.

Source code in zenml/integrations/slack/alerters/slack_alerter.py
class SlackAlerter(BaseAlerter):
    """Send messages to Slack channels."""

    @property
    def config(self) -> SlackAlerterConfig:
        """Returns the `SlackAlerterConfig` config.

        Returns:
            The configuration.
        """
        return cast(SlackAlerterConfig, self._config)

    @property
    def settings_class(self) -> Type[SlackAlerterSettings]:
        """Settings class for the Slack alerter.

        Returns:
            The settings class.
        """
        return SlackAlerterSettings

    def _get_channel_id(
        self, params: Optional[BaseAlerterStepParameters] = None
    ) -> str:
        """Get the Slack channel ID to be used by post/ask.

        Args:
            params: Optional parameters.

        Returns:
            ID of the Slack channel to be used.

        Raises:
            RuntimeError: if config is not of type `BaseAlerterStepConfig`.
            ValueError: if a slack channel was neither defined in the config
                nor in the slack alerter component.
        """
        if params and not isinstance(params, BaseAlerterStepParameters):
            raise RuntimeError(
                "The config object must be of type `BaseAlerterStepParameters`."
            )
        if (
            params
            and isinstance(params, SlackAlerterParameters)
            and hasattr(params, "slack_channel_id")
            and params.slack_channel_id is not None
        ):
            return params.slack_channel_id

        settings = cast(
            SlackAlerterSettings,
            self.get_settings(get_step_context().step_run),
        )
        if settings.slack_channel_id is not None:
            return settings.slack_channel_id

        if self.config.default_slack_channel_id is not None:
            return self.config.default_slack_channel_id

        raise ValueError(
            "Neither the `slack_channel_id` in the runtime "
            "configuration, nor the `default_slack_channel_id` in the alerter "
            "stack component is specified. Please specify at least one."
        )

    def _get_approve_msg_options(
        self, params: Optional[BaseAlerterStepParameters]
    ) -> List[str]:
        """Define which messages will lead to approval during ask().

        Args:
            params: Optional parameters.

        Returns:
            Set of messages that lead to approval in alerter.ask().
        """
        if (
            isinstance(params, SlackAlerterParameters)
            and hasattr(params, "approve_msg_options")
            and params.approve_msg_options is not None
        ):
            return params.approve_msg_options
        return DEFAULT_APPROVE_MSG_OPTIONS

    def _get_disapprove_msg_options(
        self, params: Optional[BaseAlerterStepParameters]
    ) -> List[str]:
        """Define which messages will lead to disapproval during ask().

        Args:
            params: Optional parameters.

        Returns:
            Set of messages that lead to disapproval in alerter.ask().
        """
        if (
            isinstance(params, SlackAlerterParameters)
            and hasattr(params, "disapprove_msg_options")
            and params.disapprove_msg_options is not None
        ):
            return params.disapprove_msg_options
        return DEFAULT_DISAPPROVE_MSG_OPTIONS

    def _create_blocks(
        self,
        message: Optional[str],
        params: Optional[BaseAlerterStepParameters],
    ) -> List[Dict]:  # type: ignore
        """Helper function to create slack blocks.

        Args:
            message: message
            params: Optional parameters.

        Returns:
            List of slack blocks.
        """
        if isinstance(params, SlackAlerterParameters):
            if hasattr(params, "blocks") and params.blocks is not None:
                logger.info("Using custom blocks")
                return params.blocks
            elif hasattr(params, "payload") and params.payload is not None:
                logger.info(
                    "No custom blocks set. Using default blocks for Slack alerter"
                )
                payload = params.payload
                return [
                    {
                        "type": "section",
                        "fields": [
                            {
                                "type": "mrkdwn",
                                "text": f":star: *Pipeline:*\n{payload.pipeline_name}",
                            },
                            {
                                "type": "mrkdwn",
                                "text": f":arrow_forward: *Step:*\n{payload.step_name}",
                            },
                            {
                                "type": "mrkdwn",
                                "text": f":ring_buoy: *Stack:*\n{payload.stack_name}",
                            },
                        ],
                        "accessory": {
                            "type": "image",
                            "image_url": "https://zenml-strapi-media.s3.eu-central-1.amazonaws.com/03_Zen_ML_Logo_Square_White_efefc24ae7.png",
                            "alt_text": "zenml logo",
                        },
                    },
                    {
                        "type": "section",
                        "fields": [
                            {
                                "type": "mrkdwn",
                                "text": f":email: *Message:*\n{message}",
                            },
                        ],
                    },
                ]
            else:
                logger.info(
                    "No custom blocks or payload set for Slack alerter."
                )
                return []
        else:
            logger.info(
                "params is not of type SlackAlerterParameters. Returning empty blocks."
            )
            return []

    def post(
        self,
        message: Optional[str] = None,
        params: Optional[BaseAlerterStepParameters] = None,
    ) -> bool:
        """Post a message to a Slack channel.

        Args:
            message: Message to be posted.
            params: Optional parameters.

        Returns:
            True if operation succeeded, else False
        """
        slack_channel_id = self._get_channel_id(params=params)
        client = WebClient(token=self.config.slack_token)
        blocks = self._create_blocks(message, params)
        try:
            response = client.chat_postMessage(
                channel=slack_channel_id, text=message, blocks=blocks
            )
            return True
        except SlackApiError as error:
            response = error.response["error"]
            logger.error(f"SlackAlerter.post() failed: {response}")
            return False

    def ask(
        self, message: str, params: Optional[BaseAlerterStepParameters] = None
    ) -> bool:
        """Post a message to a Slack channel and wait for approval.

        Args:
            message: Initial message to be posted.
            params: Optional parameters.

        Returns:
            True if a user approved the operation, else False
        """
        rtm = RTMClient(token=self.config.slack_token)
        slack_channel_id = self._get_channel_id(params=params)

        approved = False  # will be modified by handle()

        @RTMClient.run_on(event="hello")  # type: ignore
        def post_initial_message(**payload: Any) -> None:
            """Post an initial message in a channel and start listening.

            Args:
                payload: payload of the received Slack event.
            """
            web_client = payload["web_client"]
            blocks = self._create_blocks(message, params)
            web_client.chat_postMessage(
                channel=slack_channel_id, text=message, blocks=blocks
            )

        @RTMClient.run_on(event="message")  # type: ignore
        def handle(**payload: Any) -> None:
            """Listen / handle messages posted in the channel.

            Args:
                payload: payload of the received Slack event.
            """
            event = payload["data"]
            if event["channel"] == slack_channel_id:
                # approve request (return True)
                if event["text"] in self._get_approve_msg_options(params):
                    print(f"User {event['user']} approved on slack.")
                    nonlocal approved
                    approved = True
                    rtm.stop()  # type: ignore[no-untyped-call]

                # disapprove request (return False)
                elif event["text"] in self._get_disapprove_msg_options(params):
                    print(f"User {event['user']} disapproved on slack.")
                    rtm.stop()  # type: ignore[no-untyped-call]

        # start another thread until `rtm.stop()` is called in handle()
        rtm.start()

        return approved
config: SlackAlerterConfig property readonly

Returns the SlackAlerterConfig config.

Returns:

Type Description
SlackAlerterConfig

The configuration.

settings_class: Type[zenml.integrations.slack.flavors.slack_alerter_flavor.SlackAlerterSettings] property readonly

Settings class for the Slack alerter.

Returns:

Type Description
Type[zenml.integrations.slack.flavors.slack_alerter_flavor.SlackAlerterSettings]

The settings class.

ask(self, message, params=None)

Post a message to a Slack channel and wait for approval.

Parameters:

Name Type Description Default
message str

Initial message to be posted.

required
params Optional[zenml.alerter.base_alerter.BaseAlerterStepParameters]

Optional parameters.

None

Returns:

Type Description
bool

True if a user approved the operation, else False

Source code in zenml/integrations/slack/alerters/slack_alerter.py
def ask(
    self, message: str, params: Optional[BaseAlerterStepParameters] = None
) -> bool:
    """Post a message to a Slack channel and wait for approval.

    Args:
        message: Initial message to be posted.
        params: Optional parameters.

    Returns:
        True if a user approved the operation, else False
    """
    rtm = RTMClient(token=self.config.slack_token)
    slack_channel_id = self._get_channel_id(params=params)

    approved = False  # will be modified by handle()

    @RTMClient.run_on(event="hello")  # type: ignore
    def post_initial_message(**payload: Any) -> None:
        """Post an initial message in a channel and start listening.

        Args:
            payload: payload of the received Slack event.
        """
        web_client = payload["web_client"]
        blocks = self._create_blocks(message, params)
        web_client.chat_postMessage(
            channel=slack_channel_id, text=message, blocks=blocks
        )

    @RTMClient.run_on(event="message")  # type: ignore
    def handle(**payload: Any) -> None:
        """Listen / handle messages posted in the channel.

        Args:
            payload: payload of the received Slack event.
        """
        event = payload["data"]
        if event["channel"] == slack_channel_id:
            # approve request (return True)
            if event["text"] in self._get_approve_msg_options(params):
                print(f"User {event['user']} approved on slack.")
                nonlocal approved
                approved = True
                rtm.stop()  # type: ignore[no-untyped-call]

            # disapprove request (return False)
            elif event["text"] in self._get_disapprove_msg_options(params):
                print(f"User {event['user']} disapproved on slack.")
                rtm.stop()  # type: ignore[no-untyped-call]

    # start another thread until `rtm.stop()` is called in handle()
    rtm.start()

    return approved
post(self, message=None, params=None)

Post a message to a Slack channel.

Parameters:

Name Type Description Default
message Optional[str]

Message to be posted.

None
params Optional[zenml.alerter.base_alerter.BaseAlerterStepParameters]

Optional parameters.

None

Returns:

Type Description
bool

True if operation succeeded, else False

Source code in zenml/integrations/slack/alerters/slack_alerter.py
def post(
    self,
    message: Optional[str] = None,
    params: Optional[BaseAlerterStepParameters] = None,
) -> bool:
    """Post a message to a Slack channel.

    Args:
        message: Message to be posted.
        params: Optional parameters.

    Returns:
        True if operation succeeded, else False
    """
    slack_channel_id = self._get_channel_id(params=params)
    client = WebClient(token=self.config.slack_token)
    blocks = self._create_blocks(message, params)
    try:
        response = client.chat_postMessage(
            channel=slack_channel_id, text=message, blocks=blocks
        )
        return True
    except SlackApiError as error:
        response = error.response["error"]
        logger.error(f"SlackAlerter.post() failed: {response}")
        return False
SlackAlerterParameters (BaseAlerterStepParameters)

Slack alerter parameters.

Source code in zenml/integrations/slack/alerters/slack_alerter.py
class SlackAlerterParameters(BaseAlerterStepParameters):
    """Slack alerter parameters."""

    # The ID of the Slack channel to use for communication.
    slack_channel_id: Optional[str] = None

    # Set of messages that lead to approval in alerter.ask()
    approve_msg_options: Optional[List[str]] = None

    # Set of messages that lead to disapproval in alerter.ask()
    disapprove_msg_options: Optional[List[str]] = None
    payload: Optional[SlackAlerterPayload] = None
    include_format_blocks: Optional[bool] = True

    # Allowing user to use their own custom blocks in the slack post message
    blocks: Optional[List[Dict]] = None  # type: ignore
SlackAlerterPayload (BaseModel)

Slack alerter payload implementation.

Source code in zenml/integrations/slack/alerters/slack_alerter.py
class SlackAlerterPayload(BaseModel):
    """Slack alerter payload implementation."""

    pipeline_name: Optional[str] = None
    step_name: Optional[str] = None
    stack_name: Optional[str] = None

flavors special

Slack integration flavors.

slack_alerter_flavor

Slack alerter flavor.

SlackAlerterConfig (BaseAlerterConfig)

Slack alerter config.

Attributes:

Name Type Description
slack_token str

The Slack token tied to the Slack account to be used.

default_slack_channel_id Optional[str]

The ID of the default Slack channel to use for communication.

Source code in zenml/integrations/slack/flavors/slack_alerter_flavor.py
class SlackAlerterConfig(BaseAlerterConfig):
    """Slack alerter config.

    Attributes:
        slack_token: The Slack token tied to the Slack account to be used.
        default_slack_channel_id: The ID of the default Slack channel to use for communication.
    """

    slack_token: str = SecretField()
    default_slack_channel_id: Optional[str] = None

    @property
    def is_valid(self) -> bool:
        """Check if the stack component is valid.

        Returns:
            True if the stack component is valid, False otherwise.
        """
        try:
            from slack_sdk import WebClient
            from slack_sdk.errors import SlackApiError
        except ImportError:
            logger.warning(
                "Unable to validate Slack alerter credentials because the Slack integration is not installed."
            )
            return True
        client = WebClient(token=self.slack_token)
        try:
            # Check slack token validity
            response = client.auth_test()
            if not response["ok"]:
                return False

            if self.default_slack_channel_id:
                # Check channel validity
                response = client.conversations_info(
                    channel=self.default_slack_channel_id
                )
            valid: bool = response["ok"]
            return valid

        except SlackApiError as e:
            logger.error("Slack API Error:", e.response["error"])
            return False
is_valid: bool property readonly

Check if the stack component is valid.

Returns:

Type Description
bool

True if the stack component is valid, False otherwise.

SlackAlerterFlavor (BaseAlerterFlavor)

Slack alerter flavor.

Source code in zenml/integrations/slack/flavors/slack_alerter_flavor.py
class SlackAlerterFlavor(BaseAlerterFlavor):
    """Slack alerter flavor."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return SLACK_ALERTER_FLAVOR

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/alerter/slack.png"

    @property
    def config_class(self) -> Type[SlackAlerterConfig]:
        """Returns `SlackAlerterConfig` config class.

        Returns:
                The config class.
        """
        return SlackAlerterConfig

    @property
    def implementation_class(self) -> Type["SlackAlerter"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.slack.alerters import SlackAlerter

        return SlackAlerter
config_class: Type[zenml.integrations.slack.flavors.slack_alerter_flavor.SlackAlerterConfig] property readonly

Returns SlackAlerterConfig config class.

Returns:

Type Description
Type[zenml.integrations.slack.flavors.slack_alerter_flavor.SlackAlerterConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[SlackAlerter] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[SlackAlerter]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

SlackAlerterSettings (BaseSettings)

Settings for the Slack alerter.

Attributes:

Name Type Description
slack_channel_id Optional[str]

The ID of the Slack channel to use for communication.

Source code in zenml/integrations/slack/flavors/slack_alerter_flavor.py
class SlackAlerterSettings(BaseSettings):
    """Settings for the Slack alerter.

    Attributes:
        slack_channel_id: The ID of the Slack channel to use for communication.
    """

    slack_channel_id: Optional[str] = None

steps special

Built-in steps for the Slack integration.

slack_alerter_ask_step

Step that allows you to send messages to Slack and wait for a response.

slack_alerter_post_step

Step that allows you to post messages to Slack.