Skip to content

Hooks

zenml.hooks special

The hooks package exposes some standard hooks that can be used in ZenML.

Hooks are functions that run after a step has exited.

alerter_hooks

Functionality for standard hooks.

alerter_failure_hook(exception)

Standard failure hook that executes after step fails.

This hook uses any BaseAlerter that is configured within the active stack to post a message.

Parameters:

Name Type Description Default
exception BaseException

Original exception that lead to step failing.

required
Source code in zenml/hooks/alerter_hooks.py
def alerter_failure_hook(exception: BaseException) -> None:
    """Standard failure hook that executes after step fails.

    This hook uses any `BaseAlerter` that is configured within the active stack to post a message.

    Args:
        exception: Original exception that lead to step failing.
    """
    context = get_step_context()
    alerter = Client().active_stack.alerter
    if alerter:
        output_captured = io.StringIO()
        original_stdout = sys.stdout
        sys.stdout = output_captured
        console = Console()
        console.print_exception(show_locals=False)

        sys.stdout = original_stdout
        rich_traceback = output_captured.getvalue()

        message = "*Failure Hook Notification! Step failed!*" + "\n\n"
        message += f"Pipeline name: `{context.pipeline.name}`" + "\n"
        message += f"Run name: `{context.pipeline_run.name}`" + "\n"
        message += f"Step name: `{context.step_run.name}`" + "\n"
        message += f"Parameters: `{context.step_run.config.parameters}`" + "\n"
        message += (
            f"Exception: `({type(exception)}) {rich_traceback}`" + "\n\n"
        )
        alerter.post(message)
    else:
        logger.warning(
            "Specified standard failure hook but no alerter configured in the stack. Skipping.."
        )

alerter_success_hook()

Standard success hook that executes after step finishes successfully.

This hook uses any BaseAlerter that is configured within the active stack to post a message.

Source code in zenml/hooks/alerter_hooks.py
def alerter_success_hook() -> None:
    """Standard success hook that executes after step finishes successfully.

    This hook uses any `BaseAlerter` that is configured within the active stack to post a message.
    """
    context = get_step_context()
    alerter = Client().active_stack.alerter
    if alerter:
        message = (
            "*Success Hook Notification! Step completed successfully*" + "\n\n"
        )
        message += f"Pipeline name: `{context.pipeline.name}`" + "\n"
        message += f"Run name: `{context.pipeline_run.name}`" + "\n"
        message += f"Step name: `{context.step_run.name}`" + "\n"
        message += f"Parameters: `{context.step_run.config.parameters}`" + "\n"
        alerter.post(message)
    else:
        logger.warning(
            "Specified standard success hook but no alerter configured in the stack. Skipping.."
        )

hook_validators

Validation functions for hooks.

resolve_and_validate_hook(hook)

Resolves and validates a hook callback.

Parameters:

Name Type Description Default
hook HookSpecification

Hook function or source.

required

Returns:

Type Description
Source

Hook source.

Exceptions:

Type Description
ValueError

If hook_func is not a valid callable.

Source code in zenml/hooks/hook_validators.py
def resolve_and_validate_hook(hook: "HookSpecification") -> Source:
    """Resolves and validates a hook callback.

    Args:
        hook: Hook function or source.

    Returns:
        Hook source.

    Raises:
        ValueError: If `hook_func` is not a valid callable.
    """
    if isinstance(hook, (str, Source)):
        func = source_utils.load(hook)
    else:
        func = hook

    if not callable(func):
        raise ValueError(f"{func} is not a valid function.")

    from zenml.new.steps.step_context import StepContext
    from zenml.steps.base_parameters import BaseParameters

    sig = inspect.getfullargspec(inspect.unwrap(func))
    sig_annotations = sig.annotations
    if "return" in sig_annotations:
        sig_annotations.pop("return")

    if sig.args and len(sig.args) != len(sig_annotations):
        raise ValueError(
            "You can only pass arguments to a hook that are annotated with a "
            "`BaseException` type."
        )

    if sig_annotations:
        annotations = sig_annotations.values()
        seen_annotations = set()
        for annotation in annotations:
            if annotation:
                if annotation not in (
                    BaseException,
                    BaseParameters,
                    StepContext,
                ):
                    raise ValueError(
                        "Hook arguments must be of type `BaseException`, not "
                        f"`{annotation}`."
                    )

                if annotation in seen_annotations:
                    raise ValueError(
                        "You can only pass one `BaseException` type to a hook."
                        "Currently your function has the following"
                        f"annotations: {sig_annotations}"
                    )
                seen_annotations.add(annotation)

    return source_utils.resolve(func)