Skip to content

Logger

zenml.logger

Logger implementation.

Attributes

ENABLE_RICH_TRACEBACK = handle_bool_env_var(ENV_ZENML_ENABLE_RICH_TRACEBACK, True) module-attribute

ENV_ZENML_CAPTURE_PRINTS = 'ZENML_CAPTURE_PRINTS' module-attribute

ENV_ZENML_LOGGING_COLORS_DISABLED = 'ZENML_LOGGING_COLORS_DISABLED' module-attribute

ENV_ZENML_LOGGING_FORMAT = 'ZENML_LOGGING_FORMAT' module-attribute

ENV_ZENML_SUPPRESS_LOGS = 'ZENML_SUPPRESS_LOGS' module-attribute

ZENML_LOGGING_COLORS_DISABLED = handle_bool_env_var(ENV_ZENML_LOGGING_COLORS_DISABLED, False) module-attribute

ZENML_LOGGING_VERBOSITY = os.getenv(ENV_ZENML_LOGGING_VERBOSITY, default='DEBUG').upper() module-attribute

ZENML_STORAGE_LOGGING_VERBOSITY = os.getenv(ENV_ZENML_STORAGE_LOGGING_VERBOSITY, default='DEBUG').upper() module-attribute

logging_handlers: ContextVarList[ArtifactStoreHandler] = ContextVarList('logging_handlers') module-attribute

step_names_in_console: ContextVar[bool] = ContextVar('step_names_in_console', default=False) module-attribute

Classes

ArtifactStoreHandler(storage: PipelineLogsStorage)

Bases: Handler

Handler that writes log messages to artifact store storage.

Initialize the handler with a storage instance.

Parameters:

Name Type Description Default
storage PipelineLogsStorage

The PipelineLogsStorage instance to write to.

required
Source code in src/zenml/logging/step_logging.py
128
129
130
131
132
133
134
135
136
137
138
def __init__(self, storage: "PipelineLogsStorage"):
    """Initialize the handler with a storage instance.

    Args:
        storage: The PipelineLogsStorage instance to write to.
    """
    super().__init__()
    self.storage = storage

    # Get storage log level from environment
    self.setLevel(get_storage_log_level().value)
Functions
emit(record: logging.LogRecord) -> None

Emit a log record to the storage.

Parameters:

Name Type Description Default
record LogRecord

The log record to emit.

required
Source code in src/zenml/logging/step_logging.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def emit(self, record: logging.LogRecord) -> None:
    """Emit a log record to the storage.

    Args:
        record: The log record to emit.
    """
    try:
        # Get level enum
        level = LoggingLevels.__members__.get(record.levelname.upper())

        # Get the message
        message = self.format(record)
        message = remove_ansi_escape_codes(message).rstrip()

        # Check if message needs to be chunked
        message_bytes = message.encode("utf-8")
        if len(message_bytes) <= DEFAULT_MESSAGE_SIZE:
            # Message is small enough, emit as-is
            log_record = LogEntry.model_construct(
                message=message,
                name=record.name,
                level=level,
                timestamp=utc_now(),
                module=record.module,
                filename=record.filename,
                lineno=record.lineno,
            )
            json_line = log_record.model_dump_json(exclude_none=True)
            self.storage.write(json_line)
        else:
            # Message is too large, split into chunks and emit each one
            chunks = self._split_to_chunks(message)
            entry_id = uuid4()
            for i, chunk in enumerate(chunks):
                log_record = LogEntry.model_construct(
                    message=chunk,
                    name=record.name,
                    level=level,
                    module=record.module,
                    filename=record.filename,
                    lineno=record.lineno,
                    timestamp=utc_now(),
                    chunk_index=i,
                    total_chunks=len(chunks),
                    id=entry_id,
                )

                json_line = log_record.model_dump_json(exclude_none=True)
                self.storage.write(json_line)
    except Exception:
        pass

ContextVarList(name: str)

Bases: Generic[T]

Thread-safe wrapper around ContextVar[List] with atomic add/remove operations.

Initialize the context variable list.

Parameters:

Name Type Description Default
name str

The name for the underlying ContextVar.

required
Source code in src/zenml/utils/context_utils.py
26
27
28
29
30
31
32
33
34
35
36
37
def __init__(self, name: str) -> None:
    """Initialize the context variable list.

    Args:
        name: The name for the underlying ContextVar.
    """
    # Use None as default to avoid mutable default issues
    self._context_var: ContextVar[Optional[List[T]]] = ContextVar(
        name, default=None
    )
    # Lock to ensure atomic operations
    self._lock = threading.Lock()
Functions
add(item: T) -> None

Thread-safely add an item to the list.

Parameters:

Name Type Description Default
item T

The item to add to the list.

required
Source code in src/zenml/utils/context_utils.py
48
49
50
51
52
53
54
55
56
57
58
def add(self, item: T) -> None:
    """Thread-safely add an item to the list.

    Args:
        item: The item to add to the list.
    """
    with self._lock:
        current_list = self.get()
        if not any(x is item for x in current_list):
            new_list = current_list + [item]
            self._context_var.set(new_list)
get() -> List[T]

Get the current list value. Returns empty list if not set.

Returns:

Type Description
List[T]

The current list value.

Source code in src/zenml/utils/context_utils.py
39
40
41
42
43
44
45
46
def get(self) -> List[T]:
    """Get the current list value. Returns empty list if not set.

    Returns:
        The current list value.
    """
    value = self._context_var.get()
    return value if value is not None else []
remove(item: T) -> None

Thread-safely remove an item from the list.

Parameters:

Name Type Description Default
item T

The item to remove from the list.

required
Source code in src/zenml/utils/context_utils.py
60
61
62
63
64
65
66
67
68
69
70
def remove(self, item: T) -> None:
    """Thread-safely remove an item from the list.

    Args:
        item: The item to remove from the list.
    """
    with self._lock:
        current_list = self.get()
        if any(x is item for x in current_list):
            new_list = [x for x in current_list if x is not item]
            self._context_var.set(new_list)

CustomFormatter

Bases: Formatter

Formats logs according to custom specifications.

Functions
format(record: logging.LogRecord) -> str

Converts a log record to a (colored) string.

Parameters:

Name Type Description Default
record LogRecord

LogRecord generated by the code.

required

Returns:

Type Description
str

A string formatted according to specifications.

Source code in src/zenml/logger.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def format(self, record: logging.LogRecord) -> str:
    """Converts a log record to a (colored) string.

    Args:
        record: LogRecord generated by the code.

    Returns:
        A string formatted according to specifications.
    """
    # Get the template
    format_template = self._get_format_template(record)

    # Apply step name prepending if enabled (for console display)
    message = record.getMessage()
    try:
        if step_names_in_console.get():
            message = _add_step_name_to_message(message)
    except Exception:
        # If we can't get step context, just use the original message
        pass

    # Create a new record with the modified message
    modified_record = logging.LogRecord(
        name=record.name,
        level=record.levelno,
        pathname=record.pathname,
        lineno=record.lineno,
        msg=message,
        args=(),
        exc_info=record.exc_info,
    )

    if ZENML_LOGGING_COLORS_DISABLED:
        # If color formatting is disabled, use the default format without colors
        formatter = logging.Formatter(format_template)
        return formatter.format(modified_record)
    else:
        # Use color formatting
        log_fmt = (
            self.COLORS[LoggingLevels(record.levelno)]
            + format_template
            + self.reset
        )
        formatter = logging.Formatter(log_fmt)
        formatted_message = formatter.format(modified_record)
        quoted_groups = re.findall("`([^`]*)`", formatted_message)
        for quoted in quoted_groups:
            formatted_message = formatted_message.replace(
                "`" + quoted + "`",
                self.reset
                + self.purple
                + quoted
                + self.COLORS.get(LoggingLevels(record.levelno)),
            )

        # Format URLs
        url_pattern = r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
        urls = re.findall(url_pattern, formatted_message)
        for url in urls:
            formatted_message = formatted_message.replace(
                url,
                self.reset
                + self.blue
                + url
                + self.COLORS.get(LoggingLevels(record.levelno)),
            )
        return formatted_message

LoggingLevels

Bases: Enum

Enum for logging levels.

Functions

_add_step_name_to_message(message: str) -> str

Adds the step name to the message.

Parameters:

Name Type Description Default
message str

The message to add the step name to.

required

Returns:

Type Description
str

The message with the step name added.

Source code in src/zenml/logger.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def _add_step_name_to_message(message: str) -> str:
    """Adds the step name to the message.

    Args:
        message: The message to add the step name to.

    Returns:
        The message with the step name added.
    """
    try:
        if step_names_in_console.get():
            from zenml.steps import get_step_context

            step_context = get_step_context()

            if step_context and message not in ["\n", ""]:
                # For progress bar updates (with \r), inject the step name after the \r
                if "\r" in message:
                    message = message.replace(
                        "\r", f"\r[{step_context.step_name}] "
                    )
                else:
                    message = f"[{step_context.step_name}] {message}"
    except Exception:
        # If we can't get step context, just use the original message
        pass

    return message

get_console_handler() -> Any

Get console handler for logging.

Returns:

Type Description
Any

A console handler.

Source code in src/zenml/logger.py
314
315
316
317
318
319
320
321
322
323
324
def get_console_handler() -> Any:
    """Get console handler for logging.

    Returns:
        A console handler.
    """
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setFormatter(get_formatter())
    # Set console handler level explicitly to console verbosity
    console_handler.setLevel(get_logging_level().value)
    return console_handler

get_formatter() -> logging.Formatter

Get a configured logging formatter.

Returns:

Type Description
Formatter

The formatter.

Source code in src/zenml/logger.py
302
303
304
305
306
307
308
309
310
311
def get_formatter() -> logging.Formatter:
    """Get a configured logging formatter.

    Returns:
        The formatter.
    """
    if log_format := os.environ.get(ENV_ZENML_LOGGING_FORMAT, None):
        return logging.Formatter(fmt=log_format)
    else:
        return CustomFormatter()

get_logger(logger_name: str) -> logging.Logger

Main function to get logger name,.

Parameters:

Name Type Description Default
logger_name str

Name of logger to initialize.

required

Returns:

Type Description
Logger

A logger object.

Source code in src/zenml/logger.py
327
328
329
330
331
332
333
334
335
336
def get_logger(logger_name: str) -> logging.Logger:
    """Main function to get logger name,.

    Args:
        logger_name: Name of logger to initialize.

    Returns:
        A logger object.
    """
    return logging.getLogger(logger_name)

get_logging_level() -> LoggingLevels

Get logging level from the env variable.

Returns:

Type Description
LoggingLevels

The logging level.

Raises:

Type Description
KeyError

If the logging level is not found.

Source code in src/zenml/logger.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def get_logging_level() -> LoggingLevels:
    """Get logging level from the env variable.

    Returns:
        The logging level.

    Raises:
        KeyError: If the logging level is not found.
    """
    verbosity = ZENML_LOGGING_VERBOSITY.upper()
    if verbosity not in LoggingLevels.__members__:
        raise KeyError(
            f"Verbosity must be one of {list(LoggingLevels.__members__.keys())}"
        )
    return LoggingLevels[verbosity]

get_storage_log_level() -> LoggingLevels

Get storage logging level from the env variable with safe fallback.

Returns:

Type Description
LoggingLevels

The storage logging level, defaulting to INFO if invalid.

Raises:

Type Description
KeyError

If the storage logging level is not found.

Source code in src/zenml/logger.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def get_storage_log_level() -> LoggingLevels:
    """Get storage logging level from the env variable with safe fallback.

    Returns:
        The storage logging level, defaulting to INFO if invalid.

    Raises:
        KeyError: If the storage logging level is not found.
    """
    verbosity = ZENML_STORAGE_LOGGING_VERBOSITY.upper()
    if verbosity not in LoggingLevels.__members__:
        raise KeyError(
            f"Verbosity must be one of {list(LoggingLevels.__members__.keys())}"
        )
    return LoggingLevels[verbosity]

handle_bool_env_var(var: str, default: bool = False) -> bool

Converts normal env var to boolean.

Parameters:

Name Type Description Default
var str

The environment variable to convert.

required
default bool

The default value to return if the env var is not set.

False

Returns:

Type Description
bool

The converted value.

Source code in src/zenml/constants.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def handle_bool_env_var(var: str, default: bool = False) -> bool:
    """Converts normal env var to boolean.

    Args:
        var: The environment variable to convert.
        default: The default value to return if the env var is not set.

    Returns:
        The converted value.
    """
    value = os.getenv(var)
    if is_true_string_value(value):
        return True
    elif is_false_string_value(value):
        return False
    return default

init_logging() -> None

Initialize logging with default levels.

Source code in src/zenml/logger.py
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
def init_logging() -> None:
    """Initialize logging with default levels."""
    # Mute tensorflow cuda warnings
    os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
    set_root_verbosity()

    # Check if console handler already exists to avoid duplicates
    root_logger = logging.getLogger()
    has_console_handler = any(
        isinstance(handler, logging.StreamHandler)
        and handler.stream == sys.stdout
        for handler in root_logger.handlers
    )

    if not has_console_handler:
        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.setFormatter(get_formatter())
        # Set console handler level explicitly to console verbosity
        console_handler.setLevel(get_logging_level().value)
        root_logger.addHandler(console_handler)

    # Initialize global print wrapping
    setup_global_print_wrapping()

    # Enable logs if environment variable SUPPRESS_ZENML_LOGS is not set to True
    suppress_zenml_logs: bool = handle_bool_env_var(
        ENV_ZENML_SUPPRESS_LOGS, True
    )
    if suppress_zenml_logs:
        # suppress logger info messages
        suppressed_logger_names = [
            "urllib3",
            "azure.core.pipeline.policies.http_logging_policy",
            "grpc",
            "requests",
            "kfp",
            "tensorflow",
        ]
        for logger_name in suppressed_logger_names:
            logging.getLogger(logger_name).setLevel(logging.WARNING)

        # disable logger messages
        disabled_logger_names = [
            "rdbms_metadata_access_object",
            "backoff",
            "segment",
        ]
        for logger_name in disabled_logger_names:
            logging.getLogger(logger_name).setLevel(logging.WARNING)
            logging.getLogger(logger_name).disabled = True

set_root_verbosity() -> None

Set the root verbosity.

Source code in src/zenml/logger.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def set_root_verbosity() -> None:
    """Set the root verbosity."""
    level = get_logging_level()
    if level != LoggingLevels.NOTSET:
        if ENABLE_RICH_TRACEBACK:
            rich_tb_install(show_locals=(level == LoggingLevels.DEBUG))

        logging.root.setLevel(level=level.value)
        get_logger(__name__).debug(
            f"Logging set to level: {logging.getLevelName(level.value)}"
        )
    else:
        logging.disable(sys.maxsize)
        logging.getLogger().disabled = True
        get_logger(__name__).debug("Logging NOTSET")

setup_global_print_wrapping() -> None

Set up global print() wrapping with context-aware handlers.

Source code in src/zenml/logger.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def setup_global_print_wrapping() -> None:
    """Set up global print() wrapping with context-aware handlers."""
    # Check if we should capture prints
    capture_prints = handle_bool_env_var(
        ENV_ZENML_CAPTURE_PRINTS, default=True
    )

    if not capture_prints:
        return

    # Check if already wrapped to avoid double wrapping
    if hasattr(__builtins__, "_zenml_original_print"):
        return

    original_print = builtins.print

    def wrapped_print(*args: Any, **kwargs: Any) -> None:
        # Convert print arguments to message
        message = " ".join(str(arg) for arg in args)

        # Determine if this should go to stderr or stdout based on file argument
        file_arg = kwargs.get("file", sys.stdout)

        # Call active handlers first (for storage)
        if message.strip():
            handlers = logging_handlers.get()

            for handler in handlers:
                try:
                    # Create a LogRecord for the handler
                    record = logging.LogRecord(
                        name="print",
                        level=logging.ERROR
                        if file_arg == sys.stderr
                        else logging.INFO,
                        pathname="",
                        lineno=0,
                        msg=message,
                        args=(),
                        exc_info=None,
                    )
                    # Check if handler's level would accept this record
                    if record.levelno >= handler.level:
                        handler.emit(record)
                except Exception:
                    # Don't let handler errors break print
                    pass

        if step_names_in_console.get():
            message = _add_step_name_to_message(message)

        # Then call original print for console display
        return original_print(message, *args[1:], **kwargs)

    # Store original and replace print
    setattr(builtins, "_zenml_original_print", original_print)
    setattr(builtins, "print", wrapped_print)