Skip to content

Hooks

zenml.hooks

The hooks package exposes some standard hooks that can be used in ZenML.

Hooks are functions that run after a step has exited.

Attributes

__all__ = ['alerter_success_hook', 'alerter_failure_hook', 'resolve_and_validate_hook'] module-attribute

Functions

alerter_failure_hook(exception: BaseException) -> None

Standard failure hook that executes after step fails.

This hook uses any BaseAlerter that is configured within the active stack to post a message.

Parameters:

Name Type Description Default
exception BaseException

Original exception that lead to step failing.

required
Source code in src/zenml/hooks/alerter_hooks.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def alerter_failure_hook(exception: BaseException) -> None:
    """Standard failure hook that executes after step fails.

    This hook uses any `BaseAlerter` that is configured within the active stack to post a message.

    Args:
        exception: Original exception that lead to step failing.
    """
    context = get_step_context()
    alerter = Client().active_stack.alerter
    if alerter:
        output_captured = io.StringIO()
        original_stdout = sys.stdout
        sys.stdout = output_captured
        console = Console()
        console.print_exception(show_locals=False)

        sys.stdout = original_stdout
        rich_traceback = output_captured.getvalue()

        message = "*Failure Hook Notification! Step failed!*" + "\n\n"
        message += f"Pipeline name: `{context.pipeline.name}`" + "\n"
        message += f"Run name: `{context.pipeline_run.name}`" + "\n"
        message += f"Step name: `{context.step_run.name}`" + "\n"
        message += f"Parameters: `{context.step_run.config.parameters}`" + "\n"
        message += (
            f"Exception: `({type(exception)}) {rich_traceback}`" + "\n\n"
        )
        alerter.post(message)
    else:
        logger.warning(
            "Specified standard failure hook but no alerter configured in the stack. Skipping.."
        )

alerter_success_hook() -> None

Standard success hook that executes after step finishes successfully.

This hook uses any BaseAlerter that is configured within the active stack to post a message.

Source code in src/zenml/hooks/alerter_hooks.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def alerter_success_hook() -> None:
    """Standard success hook that executes after step finishes successfully.

    This hook uses any `BaseAlerter` that is configured within the active stack to post a message.
    """
    context = get_step_context()
    alerter = Client().active_stack.alerter
    if alerter:
        message = (
            "*Success Hook Notification! Step completed successfully*" + "\n\n"
        )
        message += f"Pipeline name: `{context.pipeline.name}`" + "\n"
        message += f"Run name: `{context.pipeline_run.name}`" + "\n"
        message += f"Step name: `{context.step_run.name}`" + "\n"
        message += f"Parameters: `{context.step_run.config.parameters}`" + "\n"
        alerter.post(message)
    else:
        logger.warning(
            "Specified standard success hook but no alerter configured in the stack. Skipping.."
        )

resolve_and_validate_hook(hook: Union[HookSpecification, InitHookSpecification], hook_kwargs: Optional[Dict[str, Any]] = None, allow_exception_arg: bool = False) -> Tuple[Source, Optional[Dict[str, Any]]]

Resolves and validates a hook callback and its arguments.

Parameters:

Name Type Description Default
hook Union[HookSpecification, InitHookSpecification]

Hook function or source.

required
hook_kwargs Optional[Dict[str, Any]]

The arguments to pass to the hook.

None
allow_exception_arg bool

Whether to allow an implicit exception argument to be passed to the hook.

False

Returns:

Type Description
Source

Tuple of hook source and validated hook arguments converted to JSON-safe

Optional[Dict[str, Any]]

values.

Raises:

Type Description
ValueError

If hook_func is not a valid callable.

Source code in src/zenml/hooks/hook_validators.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def resolve_and_validate_hook(
    hook: Union["HookSpecification", "InitHookSpecification"],
    hook_kwargs: Optional[Dict[str, Any]] = None,
    allow_exception_arg: bool = False,
) -> Tuple[Source, Optional[Dict[str, Any]]]:
    """Resolves and validates a hook callback and its arguments.

    Args:
        hook: Hook function or source.
        hook_kwargs: The arguments to pass to the hook.
        allow_exception_arg: Whether to allow an implicit exception argument
            to be passed to the hook.

    Returns:
        Tuple of hook source and validated hook arguments converted to JSON-safe
        values.

    Raises:
        ValueError: If `hook_func` is not a valid callable.
    """
    # Resolve the hook function
    if isinstance(hook, (str, Source)):
        func = source_utils.load(hook)
    else:
        func = hook

    if not callable(func):
        raise ValueError(f"{func} is not a valid function.")

    # Validate hook arguments
    validated_kwargs = _validate_hook_arguments(
        func, hook_kwargs or {}, allow_exception_arg
    )

    return source_utils.resolve(func), validated_kwargs

Modules

alerter_hooks

Functionality for standard hooks.

Classes
Functions
alerter_failure_hook(exception: BaseException) -> None

Standard failure hook that executes after step fails.

This hook uses any BaseAlerter that is configured within the active stack to post a message.

Parameters:

Name Type Description Default
exception BaseException

Original exception that lead to step failing.

required
Source code in src/zenml/hooks/alerter_hooks.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def alerter_failure_hook(exception: BaseException) -> None:
    """Standard failure hook that executes after step fails.

    This hook uses any `BaseAlerter` that is configured within the active stack to post a message.

    Args:
        exception: Original exception that lead to step failing.
    """
    context = get_step_context()
    alerter = Client().active_stack.alerter
    if alerter:
        output_captured = io.StringIO()
        original_stdout = sys.stdout
        sys.stdout = output_captured
        console = Console()
        console.print_exception(show_locals=False)

        sys.stdout = original_stdout
        rich_traceback = output_captured.getvalue()

        message = "*Failure Hook Notification! Step failed!*" + "\n\n"
        message += f"Pipeline name: `{context.pipeline.name}`" + "\n"
        message += f"Run name: `{context.pipeline_run.name}`" + "\n"
        message += f"Step name: `{context.step_run.name}`" + "\n"
        message += f"Parameters: `{context.step_run.config.parameters}`" + "\n"
        message += (
            f"Exception: `({type(exception)}) {rich_traceback}`" + "\n\n"
        )
        alerter.post(message)
    else:
        logger.warning(
            "Specified standard failure hook but no alerter configured in the stack. Skipping.."
        )
alerter_success_hook() -> None

Standard success hook that executes after step finishes successfully.

This hook uses any BaseAlerter that is configured within the active stack to post a message.

Source code in src/zenml/hooks/alerter_hooks.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def alerter_success_hook() -> None:
    """Standard success hook that executes after step finishes successfully.

    This hook uses any `BaseAlerter` that is configured within the active stack to post a message.
    """
    context = get_step_context()
    alerter = Client().active_stack.alerter
    if alerter:
        message = (
            "*Success Hook Notification! Step completed successfully*" + "\n\n"
        )
        message += f"Pipeline name: `{context.pipeline.name}`" + "\n"
        message += f"Run name: `{context.pipeline_run.name}`" + "\n"
        message += f"Step name: `{context.step_run.name}`" + "\n"
        message += f"Parameters: `{context.step_run.config.parameters}`" + "\n"
        alerter.post(message)
    else:
        logger.warning(
            "Specified standard success hook but no alerter configured in the stack. Skipping.."
        )

hook_validators

Validation functions for hooks.

Classes
Functions
load_and_run_hook(hook_source: Source, hook_parameters: Optional[Dict[str, Any]] = None, step_exception: Optional[BaseException] = None, raise_on_error: bool = False) -> Any

Loads hook source and runs the hook.

Parameters:

Name Type Description Default
hook_source Source

The source of the hook function.

required
hook_parameters Optional[Dict[str, Any]]

The parameters of the hook function.

None
step_exception Optional[BaseException]

The exception of the original step.

None
raise_on_error bool

Whether to raise an error if the hook fails.

False

Returns:

Type Description
Any

The return value of the hook function.

Raises:

Type Description
HookValidationException

If hook validation fails.

RuntimeError

If the hook fails and raise_on_error is True.

Source code in src/zenml/hooks/hook_validators.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def load_and_run_hook(
    hook_source: "Source",
    hook_parameters: Optional[Dict[str, Any]] = None,
    step_exception: Optional[BaseException] = None,
    raise_on_error: bool = False,
) -> Any:
    """Loads hook source and runs the hook.

    Args:
        hook_source: The source of the hook function.
        hook_parameters: The parameters of the hook function.
        step_exception: The exception of the original step.
        raise_on_error: Whether to raise an error if the hook fails.

    Returns:
        The return value of the hook function.

    Raises:
        HookValidationException: If hook validation fails.
        RuntimeError: If the hook fails and raise_on_error is True.
    """
    try:
        hook = source_utils.load(hook_source)
    except Exception as e:
        msg = f"Failed to load hook source '{hook_source}' with exception: {e}"
        if raise_on_error:
            raise RuntimeError(msg) from e
        else:
            logger.error(msg)
            return None
    try:
        validated_kwargs = _validate_hook_arguments(
            hook, hook_parameters or {}, step_exception or False
        )
    except HookValidationException as e:
        if raise_on_error:
            raise
        else:
            logger.error(e)
            return None

    try:
        logger.debug(f"Running hook {hook} with params: {validated_kwargs}")
        return hook(**validated_kwargs)
    except Exception as e:
        msg = (
            f"Failed to run hook '{hook_source}' with params: "
            f"{validated_kwargs} with exception: '{e}'"
        )
        if raise_on_error:
            raise RuntimeError(msg) from e
        else:
            logger.error(msg)
            return None
resolve_and_validate_hook(hook: Union[HookSpecification, InitHookSpecification], hook_kwargs: Optional[Dict[str, Any]] = None, allow_exception_arg: bool = False) -> Tuple[Source, Optional[Dict[str, Any]]]

Resolves and validates a hook callback and its arguments.

Parameters:

Name Type Description Default
hook Union[HookSpecification, InitHookSpecification]

Hook function or source.

required
hook_kwargs Optional[Dict[str, Any]]

The arguments to pass to the hook.

None
allow_exception_arg bool

Whether to allow an implicit exception argument to be passed to the hook.

False

Returns:

Type Description
Source

Tuple of hook source and validated hook arguments converted to JSON-safe

Optional[Dict[str, Any]]

values.

Raises:

Type Description
ValueError

If hook_func is not a valid callable.

Source code in src/zenml/hooks/hook_validators.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def resolve_and_validate_hook(
    hook: Union["HookSpecification", "InitHookSpecification"],
    hook_kwargs: Optional[Dict[str, Any]] = None,
    allow_exception_arg: bool = False,
) -> Tuple[Source, Optional[Dict[str, Any]]]:
    """Resolves and validates a hook callback and its arguments.

    Args:
        hook: Hook function or source.
        hook_kwargs: The arguments to pass to the hook.
        allow_exception_arg: Whether to allow an implicit exception argument
            to be passed to the hook.

    Returns:
        Tuple of hook source and validated hook arguments converted to JSON-safe
        values.

    Raises:
        ValueError: If `hook_func` is not a valid callable.
    """
    # Resolve the hook function
    if isinstance(hook, (str, Source)):
        func = source_utils.load(hook)
    else:
        func = hook

    if not callable(func):
        raise ValueError(f"{func} is not a valid function.")

    # Validate hook arguments
    validated_kwargs = _validate_hook_arguments(
        func, hook_kwargs or {}, allow_exception_arg
    )

    return source_utils.resolve(func), validated_kwargs
Modules