Skip to content

Slack

zenml.integrations.slack special

Slack integration for alerter components.

SlackIntegration (Integration)

Definition of a Slack integration for ZenML.

Implemented using Slack SDK.

Source code in zenml/integrations/slack/__init__.py
class SlackIntegration(Integration):
    """Definition of a Slack integration for ZenML.

    Implemented using [Slack SDK](https://pypi.org/project/slack-sdk/).
    """

    NAME = SLACK
    REQUIREMENTS = ["slack-sdk==3.30.0"]

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Slack integration.

        Returns:
            List of new flavors defined by the Slack integration.
        """
        from zenml.integrations.slack.flavors import SlackAlerterFlavor

        return [SlackAlerterFlavor]

flavors() classmethod

Declare the stack component flavors for the Slack integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of new flavors defined by the Slack integration.

Source code in zenml/integrations/slack/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Slack integration.

    Returns:
        List of new flavors defined by the Slack integration.
    """
    from zenml.integrations.slack.flavors import SlackAlerterFlavor

    return [SlackAlerterFlavor]

alerters special

Alerter components defined by the Slack integration.

slack_alerter

Implementation for slack flavor of alerter component.

SlackAlerter (BaseAlerter)

Send messages to Slack channels.

Source code in zenml/integrations/slack/alerters/slack_alerter.py
class SlackAlerter(BaseAlerter):
    """Send messages to Slack channels."""

    @property
    def config(self) -> SlackAlerterConfig:
        """Returns the `SlackAlerterConfig` config.

        Returns:
            The configuration.
        """
        return cast(SlackAlerterConfig, self._config)

    @property
    def settings_class(self) -> Type[SlackAlerterSettings]:
        """Settings class for the Slack alerter.

        Returns:
            The settings class.
        """
        return SlackAlerterSettings

    def _get_channel_id(
        self, params: Optional[BaseAlerterStepParameters] = None
    ) -> str:
        """Get the Slack channel ID to be used by post/ask.

        Args:
            params: Optional parameters.

        Returns:
            ID of the Slack channel to be used.

        Raises:
            RuntimeError: if config is not of type `BaseAlerterStepConfig`.
            ValueError: if a Slack channel was neither defined in the config
                nor in the slack alerter component.
        """
        if params and not isinstance(params, BaseAlerterStepParameters):
            raise RuntimeError(
                "The config object must be of type `BaseAlerterStepParameters`."
            )
        if (
            params
            and isinstance(params, SlackAlerterParameters)
            and hasattr(params, "slack_channel_id")
            and params.slack_channel_id is not None
        ):
            return params.slack_channel_id

        try:
            settings = cast(
                SlackAlerterSettings,
                self.get_settings(get_step_context().step_run),
            )
        except RuntimeError:
            settings = None

        if settings is not None and settings.slack_channel_id is not None:
            return settings.slack_channel_id

        if self.config.slack_channel_id is not None:
            return self.config.slack_channel_id

        raise ValueError(
            "The `slack_channel_id` is not set either in the runtime settings, "
            "or the component configuration of the alerter. Please specify at "
            "least one."
        )

    def _get_timeout_duration(self) -> int:
        """Gets the timeout duration used by the ask method .

        Returns:
            number of seconds for the timeout to happen.
        """
        try:
            settings = cast(
                SlackAlerterSettings,
                self.get_settings(get_step_context().step_run),
            )
        except RuntimeError:
            settings = None

        if settings is not None:
            return settings.timeout

        return self.config.timeout

    @staticmethod
    def _get_approve_msg_options(
        params: Optional[BaseAlerterStepParameters],
    ) -> List[str]:
        """Define which messages will lead to approval during ask().

        Args:
            params: Optional parameters.

        Returns:
            Set of messages that lead to approval in alerter.ask().
        """
        if (
            isinstance(params, SlackAlerterParameters)
            and hasattr(params, "approve_msg_options")
            and params.approve_msg_options is not None
        ):
            return params.approve_msg_options
        return DEFAULT_APPROVE_MSG_OPTIONS

    @staticmethod
    def _get_disapprove_msg_options(
        params: Optional[BaseAlerterStepParameters],
    ) -> List[str]:
        """Define which messages will lead to disapproval during ask().

        Args:
            params: Optional parameters.

        Returns:
            Set of messages that lead to disapproval in alerter.ask().
        """
        if (
            isinstance(params, SlackAlerterParameters)
            and hasattr(params, "disapprove_msg_options")
            and params.disapprove_msg_options is not None
        ):
            return params.disapprove_msg_options
        return DEFAULT_DISAPPROVE_MSG_OPTIONS

    @staticmethod
    def _create_blocks(
        message: Optional[str],
        params: Optional[BaseAlerterStepParameters],
    ) -> List[Dict]:  # type: ignore
        """Helper function to create slack blocks.

        Args:
            message: message
            params: Optional parameters.

        Returns:
            List of slack blocks.
        """
        if isinstance(params, SlackAlerterParameters):
            if hasattr(params, "blocks") and params.blocks is not None:
                logger.info("Using custom blocks")
                return params.blocks
            elif hasattr(params, "payload") and params.payload is not None:
                logger.info(
                    "No custom blocks set. Using default blocks for Slack "
                    "alerter."
                )
                payload = params.payload
                return [
                    {
                        "type": "section",
                        "fields": [
                            {
                                "type": "mrkdwn",
                                "text": f":star: *Pipeline:*\n{payload.pipeline_name}",
                            },
                            {
                                "type": "mrkdwn",
                                "text": f":arrow_forward: *Step:*\n{payload.step_name}",
                            },
                            {
                                "type": "mrkdwn",
                                "text": f":ring_buoy: *Stack:*\n{payload.stack_name}",
                            },
                        ],
                        "accessory": {
                            "type": "image",
                            "image_url": "https://zenml-strapi-media.s3.eu-central-1.amazonaws.com/03_Zen_ML_Logo_Square_White_efefc24ae7.png",
                            "alt_text": "zenml logo",
                        },
                    },
                    {
                        "type": "section",
                        "fields": [
                            {
                                "type": "mrkdwn",
                                "text": f":email: *Message:*\n{message}",
                            },
                        ],
                    },
                ]
            else:
                logger.info(
                    "No custom blocks or payload set for Slack alerter."
                )
                return []
        else:
            logger.info(
                "params is not of type SlackAlerterParameters. Returning empty "
                "blocks."
            )
            return []

    def post(
        self,
        message: Optional[str] = None,
        params: Optional[BaseAlerterStepParameters] = None,
    ) -> bool:
        """Post a message to a Slack channel.

        Args:
            message: Message to be posted.
            params: Optional parameters.

        Returns:
            True if operation succeeded, else False
        """
        slack_channel_id = self._get_channel_id(params=params)
        client = WebClient(token=self.config.slack_token)
        blocks = self._create_blocks(message, params)
        try:
            response = client.chat_postMessage(
                channel=slack_channel_id, text=message, blocks=blocks
            )
            if not response.get("ok", False):
                error_details = response.get("error", "Unknown error")
                logger.error(
                    f"Failed to send message to Slack channel. "
                    f"Error: {error_details}. Full response: {response}"
                )
                return False
            return True
        except SlackApiError as error:
            error_message = error.response.get("error", "Unknown error")
            logger.error(
                "SlackAlerter.post() failed with Slack API error: "
                f"{error_message}. Full response: {error.response}"
            )
            return False
        except Exception as e:
            logger.error(f"Unexpected error in SlackAlerter.post(): {str(e)}")
            return False

    def ask(
        self, question: str, params: Optional[BaseAlerterStepParameters] = None
    ) -> bool:
        """Post a message to a Slack channel and wait for approval.

        Args:
            question: Initial message to be posted.
            params: Optional parameters.

        Returns:
            True if a user approved the operation, else False
        """
        slack_channel_id = self._get_channel_id(params=params)

        client = WebClient(token=self.config.slack_token)
        approve_options = self._get_approve_msg_options(params)
        disapprove_options = self._get_disapprove_msg_options(params)

        try:
            # Send message to the Slack channel
            response = client.chat_postMessage(
                channel=slack_channel_id,
                text=question,
                blocks=self._create_blocks(question, params),
            )

            if not response.get("ok", False):
                error_details = response.get("error", "Unknown error")
                logger.error(
                    f"Failed to send the initial message to the Slack channel. "
                    f"Error: {error_details}. Full response: {response}"
                )
                return False

            # Retrieve timestamp of sent message
            timestamp = response["ts"]

            # Wait for a response
            start_time = time.time()

            while time.time() - start_time < self._get_timeout_duration():
                history = client.conversations_history(
                    channel=slack_channel_id, oldest=timestamp
                )
                for msg in history["messages"]:
                    if "ts" in msg and "user" in msg:
                        user_message = msg["text"].strip().lower()
                        if user_message in [
                            opt.lower() for opt in approve_options
                        ]:
                            logger.info("User approved the operation.")
                            return True
                        elif user_message in [
                            opt.lower() for opt in disapprove_options
                        ]:
                            logger.info("User disapproved the operation.")
                            return False

                time.sleep(1)  # Polling interval

            logger.warning("No response received within the timeout period.")
            return False

        except SlackApiError as error:
            error_message = error.response.get("error", "Unknown error")
            logger.error(
                f"SlackAlerter.ask() failed with Slack API error: "
                f"{error_message}. Full response: {error.response}"
            )
            return False
        except Exception as e:
            logger.error(f"Unexpected error in SlackAlerter.ask(): {str(e)}")
            return False
config: SlackAlerterConfig property readonly

Returns the SlackAlerterConfig config.

Returns:

Type Description
SlackAlerterConfig

The configuration.

settings_class: Type[zenml.integrations.slack.flavors.slack_alerter_flavor.SlackAlerterSettings] property readonly

Settings class for the Slack alerter.

Returns:

Type Description
Type[zenml.integrations.slack.flavors.slack_alerter_flavor.SlackAlerterSettings]

The settings class.

ask(self, question, params=None)

Post a message to a Slack channel and wait for approval.

Parameters:

Name Type Description Default
question str

Initial message to be posted.

required
params Optional[zenml.alerter.base_alerter.BaseAlerterStepParameters]

Optional parameters.

None

Returns:

Type Description
bool

True if a user approved the operation, else False

Source code in zenml/integrations/slack/alerters/slack_alerter.py
def ask(
    self, question: str, params: Optional[BaseAlerterStepParameters] = None
) -> bool:
    """Post a message to a Slack channel and wait for approval.

    Args:
        question: Initial message to be posted.
        params: Optional parameters.

    Returns:
        True if a user approved the operation, else False
    """
    slack_channel_id = self._get_channel_id(params=params)

    client = WebClient(token=self.config.slack_token)
    approve_options = self._get_approve_msg_options(params)
    disapprove_options = self._get_disapprove_msg_options(params)

    try:
        # Send message to the Slack channel
        response = client.chat_postMessage(
            channel=slack_channel_id,
            text=question,
            blocks=self._create_blocks(question, params),
        )

        if not response.get("ok", False):
            error_details = response.get("error", "Unknown error")
            logger.error(
                f"Failed to send the initial message to the Slack channel. "
                f"Error: {error_details}. Full response: {response}"
            )
            return False

        # Retrieve timestamp of sent message
        timestamp = response["ts"]

        # Wait for a response
        start_time = time.time()

        while time.time() - start_time < self._get_timeout_duration():
            history = client.conversations_history(
                channel=slack_channel_id, oldest=timestamp
            )
            for msg in history["messages"]:
                if "ts" in msg and "user" in msg:
                    user_message = msg["text"].strip().lower()
                    if user_message in [
                        opt.lower() for opt in approve_options
                    ]:
                        logger.info("User approved the operation.")
                        return True
                    elif user_message in [
                        opt.lower() for opt in disapprove_options
                    ]:
                        logger.info("User disapproved the operation.")
                        return False

            time.sleep(1)  # Polling interval

        logger.warning("No response received within the timeout period.")
        return False

    except SlackApiError as error:
        error_message = error.response.get("error", "Unknown error")
        logger.error(
            f"SlackAlerter.ask() failed with Slack API error: "
            f"{error_message}. Full response: {error.response}"
        )
        return False
    except Exception as e:
        logger.error(f"Unexpected error in SlackAlerter.ask(): {str(e)}")
        return False
post(self, message=None, params=None)

Post a message to a Slack channel.

Parameters:

Name Type Description Default
message Optional[str]

Message to be posted.

None
params Optional[zenml.alerter.base_alerter.BaseAlerterStepParameters]

Optional parameters.

None

Returns:

Type Description
bool

True if operation succeeded, else False

Source code in zenml/integrations/slack/alerters/slack_alerter.py
def post(
    self,
    message: Optional[str] = None,
    params: Optional[BaseAlerterStepParameters] = None,
) -> bool:
    """Post a message to a Slack channel.

    Args:
        message: Message to be posted.
        params: Optional parameters.

    Returns:
        True if operation succeeded, else False
    """
    slack_channel_id = self._get_channel_id(params=params)
    client = WebClient(token=self.config.slack_token)
    blocks = self._create_blocks(message, params)
    try:
        response = client.chat_postMessage(
            channel=slack_channel_id, text=message, blocks=blocks
        )
        if not response.get("ok", False):
            error_details = response.get("error", "Unknown error")
            logger.error(
                f"Failed to send message to Slack channel. "
                f"Error: {error_details}. Full response: {response}"
            )
            return False
        return True
    except SlackApiError as error:
        error_message = error.response.get("error", "Unknown error")
        logger.error(
            "SlackAlerter.post() failed with Slack API error: "
            f"{error_message}. Full response: {error.response}"
        )
        return False
    except Exception as e:
        logger.error(f"Unexpected error in SlackAlerter.post(): {str(e)}")
        return False
SlackAlerterParameters (BaseAlerterStepParameters)

Slack alerter parameters.

Source code in zenml/integrations/slack/alerters/slack_alerter.py
class SlackAlerterParameters(BaseAlerterStepParameters):
    """Slack alerter parameters."""

    # The ID of the Slack channel to use for communication.
    slack_channel_id: Optional[str] = None

    # Set of messages that lead to approval in alerter.ask()
    approve_msg_options: Optional[List[str]] = None

    # Set of messages that lead to disapproval in alerter.ask()
    disapprove_msg_options: Optional[List[str]] = None
    payload: Optional[SlackAlerterPayload] = None
    include_format_blocks: Optional[bool] = True

    # Allowing user to use their own custom blocks in the Slack post message
    blocks: Optional[List[Dict]] = None  # type: ignore
SlackAlerterPayload (BaseModel)

Slack alerter payload implementation.

Source code in zenml/integrations/slack/alerters/slack_alerter.py
class SlackAlerterPayload(BaseModel):
    """Slack alerter payload implementation."""

    pipeline_name: Optional[str] = None
    step_name: Optional[str] = None
    stack_name: Optional[str] = None

flavors special

Slack integration flavors.

slack_alerter_flavor

Slack alerter flavor.

SlackAlerterConfig (BaseAlerterConfig, SlackAlerterSettings)

Slack alerter config.

Attributes:

Name Type Description
slack_token str

The Slack token tied to the Slack account to be used.

default_slack_channel_id Optional[str]

(deprecated) The ID of the default Slack channel to use for communication.

Source code in zenml/integrations/slack/flavors/slack_alerter_flavor.py
class SlackAlerterConfig(BaseAlerterConfig, SlackAlerterSettings):
    """Slack alerter config.

    Attributes:
        slack_token: The Slack token tied to the Slack account to be used.
        default_slack_channel_id: (deprecated) The ID of the default Slack
            channel to use for communication.
    """

    slack_token: str = SecretField()

    default_slack_channel_id: Optional[str] = None
    _deprecation_validator = deprecate_pydantic_attributes(
        ("default_slack_channel_id", "slack_channel_id")
    )

    @property
    def is_valid(self) -> bool:
        """Check if the stack component is valid.

        Returns:
            True if the stack component is valid, False otherwise.
        """
        try:
            from slack_sdk import WebClient
            from slack_sdk.errors import SlackApiError
        except ImportError:
            logger.warning(
                "Unable to validate the slack alerter, because the Slack "
                "integration is not installed."
            )
            return True

        client = WebClient(token=self.slack_token)
        try:
            # Check slack token validity
            response = client.auth_test()
            if not response["ok"]:
                return False

            if self.slack_channel_id:
                # Check channel validity
                response = client.conversations_info(
                    channel=self.slack_channel_id
                )
            valid: bool = response["ok"]
            return valid

        except SlackApiError as e:
            logger.error("Slack API Error:", e.response["error"])
            return False
is_valid: bool property readonly

Check if the stack component is valid.

Returns:

Type Description
bool

True if the stack component is valid, False otherwise.

SlackAlerterFlavor (BaseAlerterFlavor)

Slack alerter flavor.

Source code in zenml/integrations/slack/flavors/slack_alerter_flavor.py
class SlackAlerterFlavor(BaseAlerterFlavor):
    """Slack alerter flavor."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return SLACK_ALERTER_FLAVOR

    @property
    def docs_url(self) -> Optional[str]:
        """A URL to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A URL to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A URL to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/alerter/slack.png"

    @property
    def config_class(self) -> Type[SlackAlerterConfig]:
        """Returns `SlackAlerterConfig` config class.

        Returns:
                The config class.
        """
        return SlackAlerterConfig

    @property
    def implementation_class(self) -> Type["SlackAlerter"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.slack.alerters import SlackAlerter

        return SlackAlerter
config_class: Type[zenml.integrations.slack.flavors.slack_alerter_flavor.SlackAlerterConfig] property readonly

Returns SlackAlerterConfig config class.

Returns:

Type Description
Type[zenml.integrations.slack.flavors.slack_alerter_flavor.SlackAlerterConfig]

The config class.

docs_url: Optional[str] property readonly

A URL to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[SlackAlerter] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[SlackAlerter]

The implementation class.

logo_url: str property readonly

A URL to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A URL to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

SlackAlerterSettings (BaseSettings)

Settings for the Slack alerter.

Attributes:

Name Type Description
slack_channel_id Optional[str]

The ID of the Slack channel to use for communication.

timeout int

The amount of seconds to wait for the ask method.

Source code in zenml/integrations/slack/flavors/slack_alerter_flavor.py
class SlackAlerterSettings(BaseSettings):
    """Settings for the Slack alerter.

    Attributes:
        slack_channel_id: The ID of the Slack channel to use for communication.
        timeout: The amount of seconds to wait for the ask method.
    """

    slack_channel_id: Optional[str] = None
    timeout: int = 300

steps special

Built-in steps for the Slack integration.

slack_alerter_ask_step

Step that allows you to send messages to Slack and wait for a response.

slack_alerter_post_step

Step that allows you to post messages to Slack.