Skip to content

Logger

zenml.logger

Logger Implementation.

Attributes

ENABLE_RICH_TRACEBACK = handle_bool_env_var(ENV_ZENML_ENABLE_RICH_TRACEBACK, True) module-attribute

ENV_ZENML_CONSOLE_LOGGING_FORMAT = 'ZENML_CONSOLE_LOGGING_FORMAT' module-attribute

ENV_ZENML_LOGGING_COLORS_DISABLED = 'ZENML_LOGGING_COLORS_DISABLED' module-attribute

ENV_ZENML_LOGGING_FORMAT = 'ZENML_LOGGING_FORMAT' module-attribute

ENV_ZENML_SERVER = 'ZENML_SERVER' module-attribute

ENV_ZENML_SUPPRESS_LOGS = 'ZENML_SUPPRESS_LOGS' module-attribute

ZENML_LOGGING_VERBOSITY = os.getenv(ENV_ZENML_LOGGING_VERBOSITY, default='DEBUG').upper() module-attribute

ZENML_STORAGE_LOGGING_VERBOSITY = os.getenv(ENV_ZENML_STORAGE_LOGGING_VERBOSITY, default=None) module-attribute

_RESERVED_LOG_RECORD_ATTRS: frozenset[str] = frozenset({'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename', 'funcName', 'levelname', 'levelno', 'lineno', 'message', 'module', 'msecs', 'msg', 'name', 'pathname', 'process', 'processName', 'relativeCreated', 'stack_info', 'taskName', 'thread', 'threadName'}) module-attribute

_original_stderr_write: Optional[Any] = None module-attribute

_original_stdout_write: Optional[Any] = None module-attribute

_stderr_wrapped: bool = False module-attribute

_stdout_wrapped: bool = False module-attribute

step_names_in_console: ContextVar[bool] = ContextVar('step_names_in_console', default=False) module-attribute

Classes

LoggingLevels

Bases: Enum

Enum for logging levels.

ZenMLConsoleFormatter(custom_log_format: Optional[str] = None)

Bases: Formatter

Stdlib console formatter for all ZenML console output.

Custom format
  • User-provided Python %-style format string.
Client Side
  • INFO+ default: bare log message with extras and highlights.
  • DEBUG default: structured log format with full context.
Server Side
  • Always: structured log format. Decides based on whether ENV_ZENML_SERVER is set to True.

Initialize the formatter.

Source code in src/zenml/logger.py
243
244
245
246
247
248
249
250
251
252
253
254
255
def __init__(self, custom_log_format: Optional[str] = None) -> None:
    """Initialize the formatter."""
    super().__init__(fmt=custom_log_format or self._LOG_FORMAT)
    self._custom_log_format = custom_log_format

    # disable colors if the env var is set to true
    self._colors_disabled = handle_bool_env_var(
        ENV_ZENML_LOGGING_COLORS_DISABLED, False
    )

    # using this var, we determine if the log record should be formatted
    # for ZenML server or client.
    self._is_zenml_server: Optional[bool] = None
Functions
format(record: logging.LogRecord) -> str

Render a log record.

Parameters:

Name Type Description Default
record LogRecord

The log record to render.

required

Returns:

Type Description
str

The fully formatted line.

Source code in src/zenml/logger.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def format(self, record: logging.LogRecord) -> str:
    """Render a log record.

    Args:
        record: The log record to render.

    Returns:
        The fully formatted line.
    """
    if self._custom_log_format:
        return self._format_custom_log_format(record)

    # For ZenML server logs, always format logs with default LOG_FORMAT layout.
    if self._use_zenml_server_layout():
        return self._format_structured_console_log(record)

    # If client has not set any custom log format:
    # - for DEBUG logs, use default structured layout
    # - for INFO+ logs, use compact layout
    if get_logging_level() == LoggingLevels.DEBUG:
        return self._format_structured_console_log(record)

    return self._format_compact_client_log(record)

ZenMLConsoleHandler(stream: Optional[Any] = None)

Bases: StreamHandler

Console handler owned by the ZenML logging setup.

Default stream is _ZenMLStdoutStream() which writes to the original stdout, bypassing the ZenML wrapper.

Initialize the console handler.

Source code in src/zenml/logger.py
827
828
829
830
831
832
833
834
835
836
def __init__(self, stream: Optional[Any] = None) -> None:
    """Initialize the console handler."""
    # initialize the handler with the provided stream or the default stream
    super().__init__(stream or _ZenMLStdoutStream())

    # set the formatter to the ZenML formatter
    self.setFormatter(_select_console_formatter())

    # add filters to the handler to attach structlog contextvars and step name to the log record
    add_zenml_filters(self)
Functions

ZenMLJsonFormatter

Bases: Formatter

Format a log record as a single-line JSON object.

Functions
format(record: logging.LogRecord) -> str

Render record as a single-line JSON object.

Parameters:

Name Type Description Default
record LogRecord

The log record to render.

required

Returns:

Type Description
str

A JSON string (no trailing newline; the handler adds one).

Source code in src/zenml/logger.py
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
def format(self, record: logging.LogRecord) -> str:
    """Render ``record`` as a single-line JSON object.

    Args:
        record: The log record to render.

    Returns:
        A JSON string (no trailing newline; the handler adds one).
    """
    payload: dict[str, Any] = {
        "timestamp": self.formatTime(record),
        "level": record.levelname,
        "logger": record.name,
        "function": record.funcName,
        "line": record.lineno,
        "message": record.getMessage(),
    }

    payload.update(
        _collect_extra_fields(record, exclude_attrs=set(payload))
    )

    if record.exc_info:
        exc_type, exc_value, exc_traceback = record.exc_info
        payload["exception"] = {
            "type": exc_type.__name__ if exc_type else None,
            "message": str(exc_value) if exc_value else None,
            "stacktrace": "".join(
                traceback.format_exception(
                    exc_type, exc_value, exc_traceback
                )
            ),
        }

    if record.stack_info:
        payload["stack_info"] = record.stack_info

    return json.dumps(payload, default=str, separators=(",", ":"))

ZenMLLoggingHandler()

Bases: Handler

Custom handler that routes logs through LoggingContext.

Initialize the logging handler.

Source code in src/zenml/logger.py
804
805
806
807
def __init__(self) -> None:
    """Initialize the logging handler."""
    super().__init__()
    add_zenml_filters(self)
Functions
emit(record: logging.LogRecord) -> None

Emit a log record through LoggingContext.

Parameters:

Name Type Description Default
record LogRecord

The log record to emit.

required
Source code in src/zenml/logger.py
809
810
811
812
813
814
815
816
817
def emit(self, record: logging.LogRecord) -> None:
    """Emit a log record through LoggingContext.

    Args:
        record: The log record to emit.
    """
    from zenml.utils.logging_utils import LoggingContext

    LoggingContext.emit(record)

_ContextVarsFilter

Bases: Filter

Copy structlog contextvars onto every log record.

If the attribute already exists, it is left untouched.

This bridges the gap between the structlog contextvars and the log record. Without this filter, the structlog contextvars would not be available to the log record and neither the formatter nor the OTel handler would be able to access them.

Functions
filter(record: logging.LogRecord) -> bool

Enrich a LogRecord with structlog contextvars.

Parameters:

Name Type Description Default
record LogRecord

The log record to enrich.

required

Returns:

Type Description
bool

Always True (never filters out records).

Source code in src/zenml/logger.py
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
def filter(self, record: logging.LogRecord) -> bool:
    """Enrich a LogRecord with structlog contextvars.

    Args:
        record: The log record to enrich.

    Returns:
        Always True (never filters out records).
    """
    ctx = structlog.contextvars.get_contextvars()
    for key, value in ctx.items():
        # Set the attribute on the log record if it doesn't already exist
        if not hasattr(record, key):
            setattr(record, key, value)
    return True

_StepNameFilter

Bases: Filter

Inject pipeline step name into the log record when available.

Functions
filter(record: logging.LogRecord) -> bool

Set record.step to the active step name when available.

Parameters:

Name Type Description Default
record LogRecord

The log record to enrich.

required

Returns:

Type Description
bool

Always True (never filters records out).

Source code in src/zenml/logger.py
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
def filter(self, record: logging.LogRecord) -> bool:
    """Set ``record.step`` to the active step name when available.

    Args:
        record: The log record to enrich.

    Returns:
        Always True (never filters records out).
    """
    try:
        from zenml.steps import get_step_context

        step_context = get_step_context()
        if step_context is not None:
            record.step = step_context.step_name
    except Exception:
        pass
    return True

_ZenMLStdoutStream

Stream that writes to the original stdout, bypassing the ZenML wrapper.

This ensures console logging doesn't trigger the LoggingContext wrapper, preventing duplicate log entries in stored logs.

Functions
flush() -> None

Flush the stdout buffer.

Source code in src/zenml/logger.py
653
654
655
def flush(self) -> None:
    """Flush the stdout buffer."""
    sys.stdout.flush()
write(text: str) -> Any

Write text to the original stdout.

Parameters:

Name Type Description Default
text str

The text to write.

required

Returns:

Type Description
Any

The number of characters written.

Source code in src/zenml/logger.py
640
641
642
643
644
645
646
647
648
649
650
651
def write(self, text: str) -> Any:
    """Write text to the original stdout.

    Args:
        text: The text to write.

    Returns:
        The number of characters written.
    """
    if _original_stdout_write:
        return _original_stdout_write(text)
    return sys.stdout.write(text)

Functions

_add_step_name_to_message(message: str) -> str

Adds the step name to the message.

Parameters:

Name Type Description Default
message str

The message to add the step name to.

required

Returns:

Type Description
str

The message with the step name added.

Source code in src/zenml/logger.py
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
def _add_step_name_to_message(message: str) -> str:
    """Adds the step name to the message.

    Args:
        message: The message to add the step name to.

    Returns:
        The message with the step name added.
    """
    try:
        if step_names_in_console.get():
            from zenml.steps import get_step_context

            step_context = get_step_context()

            if step_context and message not in ["\n", ""]:
                message = _prefix_step_name(message, step_context.step_name)
    except Exception:
        pass

    return message

_collect_extra_fields(record: logging.LogRecord, exclude_attrs: Optional[set[str]] = None) -> dict[str, Any]

Extract structured fields that aren't stdlib LogRecord attrs.

Parameters:

Name Type Description Default
record LogRecord

The log record whose __dict__ we mine.

required
exclude_attrs Optional[set[str]]

Extra attributes to skip, if any.

None

Returns:

Type Description
dict[str, Any]

Mapping of extra-field name to value.

Source code in src/zenml/logger.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def _collect_extra_fields(
    record: logging.LogRecord,
    exclude_attrs: Optional[set[str]] = None,
) -> dict[str, Any]:
    """Extract structured fields that aren't stdlib LogRecord attrs.

    Args:
        record: The log record whose ``__dict__`` we mine.
        exclude_attrs: Extra attributes to skip, if any.

    Returns:
        Mapping of extra-field name to value.
    """
    exclude_attrs = exclude_attrs or set()
    return {
        k: v
        for k, v in record.__dict__.items()
        if k not in _RESERVED_LOG_RECORD_ATTRS
        and not k.startswith("_")
        and k not in exclude_attrs
    }

_get_console_logging_format() -> Optional[str]

Get the configured client console logging format.

Source code in src/zenml/logger.py
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
def _get_console_logging_format() -> Optional[str]:
    """Get the configured client console logging format."""
    # ZENML_CONSOLE_LOGGING_FORMAT takes precedence over older deprecated ZENML_LOGGING_FORMAT.
    log_format = os.environ.get(
        ENV_ZENML_CONSOLE_LOGGING_FORMAT
    ) or os.environ.get(ENV_ZENML_LOGGING_FORMAT)

    # If no log format is provided, return None.
    # Both Client and Server would default to structured layout.
    if not log_format:
        return None

    # For "json" or "console", return the format directly.
    if log_format.lower() in {"json", "console"}:
        return log_format.lower()

    # Validate if client provided log format is a valid Python logging
    # format string in %-style format. If not, return None
    try:
        logging.Formatter(log_format, validate=True)
    except ValueError:
        get_logger(__name__).warning(
            "Invalid console logging format: %s. Defaulting to console formatter. "
            "Please use a valid Python logging format string in %%-style format.",
            log_format,
        )
        return None

    return log_format

_prefix_step_name(message: str, step_name: str) -> str

Prefix a console/stdout message with the active step name.

Source code in src/zenml/logger.py
613
614
615
616
617
618
619
620
621
622
623
624
625
def _prefix_step_name(message: str, step_name: str) -> str:
    """Prefix a console/stdout message with the active step name."""
    # Console writes can arrive as blank chunks; prefixing those would turn
    # empty lines into visible "[step]" noise.
    if message in ["\n", ""]:
        return message

    # Progress bars and status updates often use carriage returns to redraw the
    # same line, so place the prefix after each redraw marker.
    if "\r" in message:
        return message.replace("\r", f"\r[{step_name}] ")

    return f"[{step_name}] {message}"

_remove_zenml_handlers(root_logger: logging.Logger) -> None

Remove handlers owned by the ZenML logging setup.

Source code in src/zenml/logger.py
867
868
869
870
871
872
def _remove_zenml_handlers(root_logger: logging.Logger) -> None:
    """Remove handlers owned by the ZenML logging setup."""
    for handler in root_logger.handlers[:]:
        if isinstance(handler, (ZenMLConsoleHandler, ZenMLLoggingHandler)):
            root_logger.removeHandler(handler)
            handler.close()

_select_console_formatter() -> logging.Formatter

Return the configured formatter for terminal output.

Returns:

Type Description
Formatter

The formatter to attach to the console handler.

Source code in src/zenml/logger.py
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
def _select_console_formatter() -> logging.Formatter:
    """Return the configured formatter for terminal output.

    Returns:
        The formatter to attach to the console handler.
    """
    fmt = _get_console_logging_format()

    if fmt == "json":
        return ZenMLJsonFormatter()

    # If no log format is provided, default to console.
    if fmt in (None, "console"):
        return ZenMLConsoleFormatter()

    # If any other valid Python `%`-style logging format string, use it.
    return ZenMLConsoleFormatter(custom_log_format=fmt)

_wrapped_write(original_write: Any, stream_name: str) -> Any

Wrap stdout/stderr write method to route logs to LoggingContext.

Parameters:

Name Type Description Default
original_write Any

The original write method.

required
stream_name str

The name of the stream.

required

Returns:

Type Description
Any

The wrapped write method.

Source code in src/zenml/logger.py
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
def _wrapped_write(original_write: Any, stream_name: str) -> Any:
    """Wrap stdout/stderr write method to route logs to LoggingContext.

    Args:
        original_write: The original write method.
        stream_name: The name of the stream.

    Returns:
        The wrapped write method.
    """

    def wrapped_write(text: str) -> Any:
        """Write method that routes logs through LoggingContext.

        Args:
            text: The text to write.

        Returns:
            The result of the original write method.
        """
        from zenml.utils.logging_utils import LoggingContext

        level = logging.INFO if stream_name == "stdout" else logging.ERROR

        if logging.root.isEnabledFor(level):
            record = logging.LogRecord(
                name=stream_name,
                level=level,
                pathname="",
                lineno=0,
                msg=text,
                args=(),
                exc_info=None,
                func="",
            )
            LoggingContext.emit(record)

        return original_write(_add_step_name_to_message(text))

    return wrapped_write

add_zenml_filters(handler: logging.Handler) -> logging.Handler

Add filters to logging handler to attach structlog contextvars and step name to the log record.

Parameters:

Name Type Description Default
handler Handler

The logging handler to enrich.

required

Returns:

Type Description
Handler

The same handler, with the filters attached.

Source code in src/zenml/logger.py
780
781
782
783
784
785
786
787
788
789
790
791
792
793
def add_zenml_filters(handler: logging.Handler) -> logging.Handler:
    """Add filters to logging handler to attach structlog contextvars and step name to the log record.

    Args:
        handler: The logging handler to enrich.

    Returns:
        The same handler, with the filters attached.
    """
    # Copies bound contextvars (request_id, method, etc.) onto each LogRecord
    handler.addFilter(_ContextVarsFilter())
    # Injects step name onto LogRecords emitted during step execution
    handler.addFilter(_StepNameFilter())
    return handler

bind_request_context(**fields: Any) -> None

Bind extra context variables to the current logging context.

Use it to propagate common context variables to every downstream log record. The extra context persists until the current logging context is cleared.

E.g.: This can be used at request boundaries to propagate context like request_id, method, path, etc. to all log records in a request.

Any previously bound contextvars are cleared first, so this sets a fresh set of context variables to the current logging context.

Source code in src/zenml/logger.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def bind_request_context(**fields: Any) -> None:
    """Bind extra context variables to the current logging context.

    Use it to propagate common context variables to every downstream
    log record. The extra context persists until the current logging
    context is cleared.

    E.g.: This can be used at request boundaries to propagate context
    like request_id, method, path, etc. to all log records in a request.

    Any previously bound contextvars are cleared first, so this sets a
    fresh set of context variables to the current logging context.
    """
    structlog.contextvars.clear_contextvars()
    structlog.contextvars.bind_contextvars(**fields)

get_logger(logger_name: str) -> logging.Logger

Returns a stdlib logger by name.

To attach structured fields to a log record, use the stdlib pattern:

``logger.info("event", extra={"key": "value"})``
Source code in src/zenml/logger.py
90
91
92
93
94
95
96
97
98
def get_logger(logger_name: str) -> logging.Logger:
    """Returns a stdlib logger by name.

    To attach structured fields to a log record,
    use the stdlib pattern:

        ``logger.info("event", extra={"key": "value"})``
    """
    return logging.getLogger(logger_name)

get_logging_context() -> dict[str, Any]

Return a snapshot of the currently bound logging context.

Returns a fresh dict (safe to mutate) with all the context variables bound via bind_request_context or logging_scope.

Useful at response/error boundaries that need to surface a single field (typically request_id) back to the caller.

Source code in src/zenml/logger.py
164
165
166
167
168
169
170
171
172
173
def get_logging_context() -> dict[str, Any]:
    """Return a snapshot of the currently bound logging context.

    Returns a fresh dict (safe to mutate) with all the context
    variables bound via ``bind_request_context`` or ``logging_scope``.

    Useful at response/error boundaries that need to surface a single field
    (typically ``request_id``) back to the caller.
    """
    return dict(structlog.contextvars.get_contextvars())

get_logging_level() -> LoggingLevels

Get logging level from the env variable.

Returns:

Type Description
LoggingLevels

The logging level.

Raises:

Type Description
KeyError

If the logging level is not found.

Source code in src/zenml/logger.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def get_logging_level() -> LoggingLevels:
    """Get logging level from the env variable.

    Returns:
        The logging level.

    Raises:
        KeyError: If the logging level is not found.
    """
    verbosity = ZENML_LOGGING_VERBOSITY.upper()
    if verbosity not in LoggingLevels.__members__:
        raise KeyError(
            f"Verbosity must be one of {list(LoggingLevels.__members__.keys())}"
        )

    if ZENML_STORAGE_LOGGING_VERBOSITY is not None:
        get_logger(__name__).warning(
            "The ZENML_STORAGE_LOGGING_VERBOSITY is no longer supported. "
            "Please use the ZENML_LOGGING_VERBOSITY instead."
        )

    return LoggingLevels[verbosity]

handle_bool_env_var(var: str, default: bool = False) -> bool

Converts normal env var to boolean.

Parameters:

Name Type Description Default
var str

The environment variable to convert.

required
default bool

The default value to return if the env var is not set.

False

Returns:

Type Description
bool

The converted value.

Source code in src/zenml/constants.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def handle_bool_env_var(var: str, default: bool = False) -> bool:
    """Converts normal env var to boolean.

    Args:
        var: The environment variable to convert.
        default: The default value to return if the env var is not set.

    Returns:
        The converted value.
    """
    value = os.getenv(var)
    if is_true_string_value(value):
        return True
    elif is_false_string_value(value):
        return False
    return default

init_logging() -> None

Initialize the ZenML logging system.

Source code in src/zenml/logger.py
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
def init_logging() -> None:
    """Initialize the ZenML logging system."""
    level = set_root_verbosity()

    # If the root verbosity is NOTSET, return early.
    if level == LoggingLevels.NOTSET:
        return

    root_logger = logging.getLogger()

    # Clear existing ZenML handlers to avoid duplicates
    # This makes repeated init_logging() calls idempotent.
    #
    # We are following, "remove and re-configure" pattern instead of
    # "add once, and skip later" pattern. On repeated init_logging() calls,
    # this allows us to re-configure the handlers with updated env vars (if any)
    # like ZENML_CONSOLE_LOGGING_FORMAT, ZENML_LOGGING_VERBOSITY, etc.
    _remove_zenml_handlers(root_logger)

    # Add new ZenML handlers to the root logger
    root_logger.addHandler(ZenMLConsoleHandler())
    root_logger.addHandler(ZenMLLoggingHandler())

    # Warn about deprecated logging format variables. We have to handle
    # it separately to avoid circular imports
    if (
        ENV_ZENML_LOGGING_FORMAT in os.environ
        and ENV_ZENML_CONSOLE_LOGGING_FORMAT not in os.environ
    ):
        get_logger(__name__).warning(
            "The `%s` environment variable is deprecated and will be "
            "removed in a future version. Use `%s` instead.",
            ENV_ZENML_LOGGING_FORMAT,
            ENV_ZENML_CONSOLE_LOGGING_FORMAT,
        )

    # Wraps stdout/stderr after handlers are attached so the
    # wrapped writes flow through a fully configured pipeline.
    wrap_stdout_stderr()

    # Mute tensorflow cuda warnings
    os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"

    # logging capture warnings
    logging.captureWarnings(True)

    # Enable logs if environment variable SUPPRESS_ZENML_LOGS is not set to True
    suppress_zenml_logs: bool = handle_bool_env_var(
        ENV_ZENML_SUPPRESS_LOGS, True
    )
    if suppress_zenml_logs:
        # suppress logger info messages
        suppressed_logger_names = [
            "urllib3",
            "azure.core.pipeline.policies.http_logging_policy",
            "grpc",
            "requests",
            "kfp",
            "tensorflow",
        ]
        for logger_name in suppressed_logger_names:
            logging.getLogger(logger_name).setLevel(logging.WARNING)

        # disable logger messages
        disabled_logger_names = [
            "rdbms_metadata_access_object",
            "backoff",
            "segment",
        ]
        for logger_name in disabled_logger_names:
            logging.getLogger(logger_name).setLevel(logging.WARNING)
            logging.getLogger(logger_name).disabled = True

logging_scope(**fields: Any) -> Generator[None, None, None]

Bind extra context to a specific set of log records.

Use this for narrow scopes inside a request (e.g. a single transaction, a step execution, a retry attempt) where you want a few fields to propagate to certain downstream log records within a scope.

E.g.::

with logging_scope(transaction_id=tx_id, attempt=2):
    logger.info("retrying transaction")
    do_something()
    logger.info("transaction attempt succeeded")

This will propagate extra context (on top of the context bound by
`bind_request_context`) to the log records emitted within the scope.
Source code in src/zenml/logger.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
@contextmanager
def logging_scope(**fields: Any) -> Generator[None, None, None]:
    """Bind extra context to a specific set of log records.

    Use this for narrow scopes inside a request (e.g. a single transaction,
    a step execution, a retry attempt) where you want a few fields to
    propagate to certain downstream log records within a scope.

    E.g.::

        with logging_scope(transaction_id=tx_id, attempt=2):
            logger.info("retrying transaction")
            do_something()
            logger.info("transaction attempt succeeded")

        This will propagate extra context (on top of the context bound by
        `bind_request_context`) to the log records emitted within the scope.
    """
    with structlog.contextvars.bound_contextvars(**fields):
        yield

set_root_verbosity() -> LoggingLevels

Set the root verbosity.

Returns:

Type Description
LoggingLevels

The active logging level.

Source code in src/zenml/logger.py
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
def set_root_verbosity() -> LoggingLevels:
    """Set the root verbosity.

    Returns:
        The active logging level.
    """
    level = get_logging_level()
    if level != LoggingLevels.NOTSET:
        if ENABLE_RICH_TRACEBACK:
            rich_tb_install(show_locals=(level == LoggingLevels.DEBUG))

        logging.root.setLevel(level=level.value)
        logging.getLogger(__name__).debug(
            f"Logging set to level: {logging.getLevelName(level.value)}"
        )
    else:
        logging.disable(sys.maxsize)
        logging.getLogger().disabled = True
        logging.getLogger(__name__).debug("Logging NOTSET")

    return level

wrap_stdout_stderr() -> None

Wrap stdout and stderr write methods to route through LoggingContext.

Source code in src/zenml/logger.py
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
def wrap_stdout_stderr() -> None:
    """Wrap stdout and stderr write methods to route through LoggingContext."""
    global _stdout_wrapped, _stderr_wrapped
    global _original_stdout_write, _original_stderr_write

    if not _stdout_wrapped:
        _original_stdout_write = getattr(sys.stdout, "write")
        setattr(
            sys.stdout,
            "write",
            _wrapped_write(_original_stdout_write, "stdout"),
        )
        _stdout_wrapped = True

    if not _stderr_wrapped:
        _original_stderr_write = getattr(sys.stderr, "write")
        setattr(
            sys.stderr,
            "write",
            _wrapped_write(_original_stderr_write, "stderr"),
        )
        _stderr_wrapped = True