Skip to content

Logger

zenml.logger

Logger implementation.

Attributes

ENABLE_RICH_TRACEBACK = handle_bool_env_var(ENV_ZENML_ENABLE_RICH_TRACEBACK, True) module-attribute

ENV_ZENML_LOGGING_COLORS_DISABLED = 'ZENML_LOGGING_COLORS_DISABLED' module-attribute

ENV_ZENML_SUPPRESS_LOGS = 'ZENML_SUPPRESS_LOGS' module-attribute

ZENML_LOGGING_COLORS_DISABLED = handle_bool_env_var(ENV_ZENML_LOGGING_COLORS_DISABLED, False) module-attribute

ZENML_LOGGING_VERBOSITY = os.getenv(ENV_ZENML_LOGGING_VERBOSITY, default='DEBUG').upper() module-attribute

ZENML_STORAGE_LOGGING_VERBOSITY = os.getenv(ENV_ZENML_STORAGE_LOGGING_VERBOSITY, default=None) module-attribute

_original_stderr_write: Optional[Any] = None module-attribute

_original_stdout_write: Optional[Any] = None module-attribute

_stderr_wrapped: bool = False module-attribute

_stdout_wrapped: bool = False module-attribute

step_names_in_console: ContextVar[bool] = ContextVar('step_names_in_console', default=False) module-attribute

Classes

ConsoleFormatter

Bases: Formatter

Formats logs according to custom specifications.

Functions
format(record: logging.LogRecord) -> str

Converts a log record to a (colored) string.

Parameters:

Name Type Description Default
record LogRecord

LogRecord generated by the code.

required

Returns:

Type Description
str

A string formatted according to specifications.

Source code in src/zenml/logger.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def format(self, record: logging.LogRecord) -> str:
    """Converts a log record to a (colored) string.

    Args:
        record: LogRecord generated by the code.

    Returns:
        A string formatted according to specifications.
    """
    format_template = self._get_format_template(record)

    message = record.getMessage()
    try:
        if step_names_in_console.get():
            message = _add_step_name_to_message(message)
    except Exception:
        pass

    modified_record = logging.LogRecord(
        name=record.name,
        level=record.levelno,
        pathname=record.pathname,
        lineno=record.lineno,
        msg=message,
        args=(),
        exc_info=record.exc_info,
    )

    if ZENML_LOGGING_COLORS_DISABLED:
        formatter = logging.Formatter(format_template)
        return formatter.format(modified_record)
    else:
        log_fmt = (
            self.COLORS[LoggingLevels(record.levelno)]
            + format_template
            + self.reset
        )
        formatter = logging.Formatter(log_fmt)
        formatted_message = formatter.format(modified_record)
        quoted_groups = re.findall("`([^`]*)`", formatted_message)
        for quoted in quoted_groups:
            formatted_message = formatted_message.replace(
                "`" + quoted + "`",
                self.reset
                + self.purple
                + quoted
                + self.COLORS.get(LoggingLevels(record.levelno)),
            )

        url_pattern = r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
        urls = re.findall(url_pattern, formatted_message)
        for url in urls:
            formatted_message = formatted_message.replace(
                url,
                self.reset
                + self.blue
                + url
                + self.COLORS.get(LoggingLevels(record.levelno)),
            )
        return formatted_message

LoggingLevels

Bases: Enum

Enum for logging levels.

ZenMLLoggingHandler

Bases: Handler

Custom handler that routes logs through LoggingContext.

Functions
emit(record: logging.LogRecord) -> None

Emit a log record through LoggingContext.

Parameters:

Name Type Description Default
record LogRecord

The log record to emit.

required
Source code in src/zenml/logger.py
325
326
327
328
329
330
331
332
333
def emit(self, record: logging.LogRecord) -> None:
    """Emit a log record through LoggingContext.

    Args:
        record: The log record to emit.
    """
    from zenml.utils.logging_utils import LoggingContext

    LoggingContext.emit(record)

_ZenMLStdoutStream

Stream that writes to the original stdout, bypassing the ZenML wrapper.

This ensures console logging doesn't trigger the LoggingContext wrapper, preventing duplicate log entries in stored logs.

Functions
flush() -> None

Flush the stdout buffer.

Source code in src/zenml/logger.py
69
70
71
def flush(self) -> None:
    """Flush the stdout buffer."""
    sys.stdout.flush()
write(text: str) -> Any

Write text to the original stdout.

Parameters:

Name Type Description Default
text str

The text to write.

required

Returns:

Type Description
Any

The number of characters written.

Source code in src/zenml/logger.py
56
57
58
59
60
61
62
63
64
65
66
67
def write(self, text: str) -> Any:
    """Write text to the original stdout.

    Args:
        text: The text to write.

    Returns:
        The number of characters written.
    """
    if _original_stdout_write:
        return _original_stdout_write(text)
    return sys.stdout.write(text)

Functions

_add_step_name_to_message(message: str) -> str

Adds the step name to the message.

Parameters:

Name Type Description Default
message str

The message to add the step name to.

required

Returns:

Type Description
str

The message with the step name added.

Source code in src/zenml/logger.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def _add_step_name_to_message(message: str) -> str:
    """Adds the step name to the message.

    Args:
        message: The message to add the step name to.

    Returns:
        The message with the step name added.
    """
    try:
        if step_names_in_console.get():
            from zenml.steps import get_step_context

            step_context = get_step_context()

            if step_context and message not in ["\n", ""]:
                # For progress bar updates (with \r), inject the step name after the \r
                if "\r" in message:
                    message = message.replace(
                        "\r", f"\r[{step_context.step_name}] "
                    )
                else:
                    message = f"[{step_context.step_name}] {message}"
    except Exception:
        # If we can't get step context, just use the original message
        pass

    return message

_wrapped_write(original_write: Any, stream_name: str) -> Any

Wrap stdout/stderr write method to route logs to LoggingContext.

Parameters:

Name Type Description Default
original_write Any

The original write method.

required
stream_name str

The name of the stream.

required

Returns:

Type Description
Any

The wrapped write method.

Source code in src/zenml/logger.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
def _wrapped_write(original_write: Any, stream_name: str) -> Any:
    """Wrap stdout/stderr write method to route logs to LoggingContext.

    Args:
        original_write: The original write method.
        stream_name: The name of the stream.

    Returns:
        The wrapped write method.
    """

    def wrapped_write(text: str) -> Any:
        """Write method that routes logs through LoggingContext.

        Args:
            text: The text to write.

        Returns:
            The result of the original write method.
        """
        from zenml.utils.logging_utils import LoggingContext

        level = logging.INFO if stream_name == "stdout" else logging.ERROR

        if logging.root.isEnabledFor(level):
            record = logging.LogRecord(
                name=stream_name,
                level=level,
                pathname="",
                lineno=0,
                msg=text,
                args=(),
                exc_info=None,
                func="",
            )
            LoggingContext.emit(record)

        return original_write(text)

    return wrapped_write

get_console_handler() -> logging.Handler

Get console handler that writes to original stdout.

Returns:

Type Description
Handler

A console handler.

Source code in src/zenml/logger.py
336
337
338
339
340
341
342
343
344
def get_console_handler() -> logging.Handler:
    """Get console handler that writes to original stdout.

    Returns:
        A console handler.
    """
    handler = logging.StreamHandler(_ZenMLStdoutStream())
    handler.setFormatter(ConsoleFormatter())
    return handler

get_logger(logger_name: str) -> logging.Logger

Main function to get logger name,.

Parameters:

Name Type Description Default
logger_name str

Name of logger to initialize.

required

Returns:

Type Description
Logger

A logger object.

Source code in src/zenml/logger.py
74
75
76
77
78
79
80
81
82
83
def get_logger(logger_name: str) -> logging.Logger:
    """Main function to get logger name,.

    Args:
        logger_name: Name of logger to initialize.

    Returns:
        A logger object.
    """
    return logging.getLogger(logger_name)

get_logging_level() -> LoggingLevels

Get logging level from the env variable.

Returns:

Type Description
LoggingLevels

The logging level.

Raises:

Type Description
KeyError

If the logging level is not found.

Source code in src/zenml/logger.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def get_logging_level() -> LoggingLevels:
    """Get logging level from the env variable.

    Returns:
        The logging level.

    Raises:
        KeyError: If the logging level is not found.
    """
    verbosity = ZENML_LOGGING_VERBOSITY.upper()
    if verbosity not in LoggingLevels.__members__:
        raise KeyError(
            f"Verbosity must be one of {list(LoggingLevels.__members__.keys())}"
        )

    if ZENML_STORAGE_LOGGING_VERBOSITY is not None:
        get_logger(__name__).warning(
            "The ZENML_STORAGE_LOGGING_VERBOSITY is no longer supported. "
            "Please use the ZENML_LOGGING_VERBOSITY instead."
        )

    return LoggingLevels[verbosity]

get_zenml_handler() -> logging.Handler

Get ZenML handler that routes logs through LoggingContext.

Returns:

Type Description
Handler

A ZenML handler.

Source code in src/zenml/logger.py
347
348
349
350
351
352
353
def get_zenml_handler() -> logging.Handler:
    """Get ZenML handler that routes logs through LoggingContext.

    Returns:
        A ZenML handler.
    """
    return ZenMLLoggingHandler()

handle_bool_env_var(var: str, default: bool = False) -> bool

Converts normal env var to boolean.

Parameters:

Name Type Description Default
var str

The environment variable to convert.

required
default bool

The default value to return if the env var is not set.

False

Returns:

Type Description
bool

The converted value.

Source code in src/zenml/constants.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def handle_bool_env_var(var: str, default: bool = False) -> bool:
    """Converts normal env var to boolean.

    Args:
        var: The environment variable to convert.
        default: The default value to return if the env var is not set.

    Returns:
        The converted value.
    """
    value = os.getenv(var)
    if is_true_string_value(value):
        return True
    elif is_false_string_value(value):
        return False
    return default

init_logging() -> None

Initialize the logging system.

Source code in src/zenml/logger.py
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
def init_logging() -> None:
    """Initialize the logging system."""
    set_root_verbosity()
    wrap_stdout_stderr()

    # Add both handlers to the root logger
    root_logger = logging.getLogger()

    # Console handler - writes to original stdout
    root_logger.addHandler(get_console_handler())

    # ZenML handler - routes through LoggingContext
    root_logger.addHandler(get_zenml_handler())

    # Mute tensorflow cuda warnings
    os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"

    # logging capture warnings
    logging.captureWarnings(True)

    # Enable logs if environment variable SUPPRESS_ZENML_LOGS is not set to True
    suppress_zenml_logs: bool = handle_bool_env_var(
        ENV_ZENML_SUPPRESS_LOGS, True
    )
    if suppress_zenml_logs:
        # suppress logger info messages
        suppressed_logger_names = [
            "urllib3",
            "azure.core.pipeline.policies.http_logging_policy",
            "grpc",
            "requests",
            "kfp",
            "tensorflow",
        ]
        for logger_name in suppressed_logger_names:
            logging.getLogger(logger_name).setLevel(logging.WARNING)

        # disable logger messages
        disabled_logger_names = [
            "rdbms_metadata_access_object",
            "backoff",
            "segment",
        ]
        for logger_name in disabled_logger_names:
            logging.getLogger(logger_name).setLevel(logging.WARNING)
            logging.getLogger(logger_name).disabled = True

set_root_verbosity() -> None

Set the root verbosity.

Source code in src/zenml/logger.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
def set_root_verbosity() -> None:
    """Set the root verbosity."""
    level = get_logging_level()
    if level != LoggingLevels.NOTSET:
        if ENABLE_RICH_TRACEBACK:
            rich_tb_install(show_locals=(level == LoggingLevels.DEBUG))

        logging.root.setLevel(level=level.value)
        get_logger(__name__).debug(
            f"Logging set to level: {logging.getLevelName(level.value)}"
        )
    else:
        logging.disable(sys.maxsize)
        logging.getLogger().disabled = True
        get_logger(__name__).debug("Logging NOTSET")

wrap_stdout_stderr() -> None

Wrap stdout and stderr write methods to route through LoggingContext.

Source code in src/zenml/logger.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def wrap_stdout_stderr() -> None:
    """Wrap stdout and stderr write methods to route through LoggingContext."""
    global _stdout_wrapped, _stderr_wrapped
    global _original_stdout_write, _original_stderr_write

    if not _stdout_wrapped:
        _original_stdout_write = getattr(sys.stdout, "write")
        setattr(
            sys.stdout,
            "write",
            _wrapped_write(_original_stdout_write, "stdout"),
        )
        _stdout_wrapped = True

    if not _stderr_wrapped:
        _original_stderr_write = getattr(sys.stderr, "write")
        setattr(
            sys.stderr,
            "write",
            _wrapped_write(_original_stderr_write, "stderr"),
        )
        _stderr_wrapped = True