Hooks
zenml.hooks
special
The hooks package exposes some standard hooks that can be used in ZenML.
Hooks are functions that run after a step has exited.
alerter_hooks
Functionality for standard hooks.
alerter_failure_hook(exception)
Standard failure hook that executes after step fails.
This hook uses any BaseAlerter
that is configured within the active stack to post a message.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
exception |
BaseException |
Original exception that lead to step failing. |
required |
Source code in zenml/hooks/alerter_hooks.py
def alerter_failure_hook(exception: BaseException) -> None:
"""Standard failure hook that executes after step fails.
This hook uses any `BaseAlerter` that is configured within the active stack to post a message.
Args:
exception: Original exception that lead to step failing.
"""
context = get_step_context()
alerter = Client().active_stack.alerter
if alerter:
output_captured = io.StringIO()
original_stdout = sys.stdout
sys.stdout = output_captured
console = Console()
console.print_exception(show_locals=False)
sys.stdout = original_stdout
rich_traceback = output_captured.getvalue()
message = "*Failure Hook Notification! Step failed!*" + "\n\n"
message += f"Pipeline name: `{context.pipeline.name}`" + "\n"
message += f"Run name: `{context.pipeline_run.name}`" + "\n"
message += f"Step name: `{context.step_run.name}`" + "\n"
message += f"Parameters: `{context.step_run.config.parameters}`" + "\n"
message += (
f"Exception: `({type(exception)}) {rich_traceback}`" + "\n\n"
)
alerter.post(message)
else:
logger.warning(
"Specified standard failure hook but no alerter configured in the stack. Skipping.."
)
alerter_success_hook()
Standard success hook that executes after step finishes successfully.
This hook uses any BaseAlerter
that is configured within the active stack to post a message.
Source code in zenml/hooks/alerter_hooks.py
def alerter_success_hook() -> None:
"""Standard success hook that executes after step finishes successfully.
This hook uses any `BaseAlerter` that is configured within the active stack to post a message.
"""
context = get_step_context()
alerter = Client().active_stack.alerter
if alerter:
message = (
"*Success Hook Notification! Step completed successfully*" + "\n\n"
)
message += f"Pipeline name: `{context.pipeline.name}`" + "\n"
message += f"Run name: `{context.pipeline_run.name}`" + "\n"
message += f"Step name: `{context.step_run.name}`" + "\n"
message += f"Parameters: `{context.step_run.config.parameters}`" + "\n"
alerter.post(message)
else:
logger.warning(
"Specified standard success hook but no alerter configured in the stack. Skipping.."
)
hook_validators
Validation functions for hooks.
resolve_and_validate_hook(hook)
Resolves and validates a hook callback.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
hook |
HookSpecification |
Hook function or source. |
required |
Returns:
Type | Description |
---|---|
Source |
Hook source. |
Exceptions:
Type | Description |
---|---|
ValueError |
If |
Source code in zenml/hooks/hook_validators.py
def resolve_and_validate_hook(hook: "HookSpecification") -> Source:
"""Resolves and validates a hook callback.
Args:
hook: Hook function or source.
Returns:
Hook source.
Raises:
ValueError: If `hook_func` is not a valid callable.
"""
if isinstance(hook, (str, Source)):
func = source_utils.load(hook)
else:
func = hook
if not callable(func):
raise ValueError(f"{func} is not a valid function.")
from zenml.new.steps.step_context import StepContext
from zenml.steps.base_parameters import BaseParameters
sig = inspect.getfullargspec(inspect.unwrap(func))
sig_annotations = sig.annotations
if "return" in sig_annotations:
sig_annotations.pop("return")
if sig.args and len(sig.args) != len(sig_annotations):
raise ValueError(
"You can only pass arguments to a hook that are annotated with a "
"`BaseException` type."
)
if sig_annotations:
annotations = sig_annotations.values()
seen_annotations = set()
for annotation in annotations:
if annotation:
if annotation not in (
BaseException,
BaseParameters,
StepContext,
):
raise ValueError(
"Hook arguments must be of type `BaseException`, not "
f"`{annotation}`."
)
if annotation in seen_annotations:
raise ValueError(
"You can only pass one `BaseException` type to a hook."
"Currently your function has the following"
f"annotations: {sig_annotations}"
)
seen_annotations.add(annotation)
return source_utils.resolve(func)