Skip to content

Logging

zenml.logging

Logging utilities.

Attributes

STEP_LOGS_STORAGE_INTERVAL_SECONDS: int = 15 module-attribute

STEP_LOGS_STORAGE_MAX_MESSAGES: int = 100 module-attribute

STEP_LOGS_STORAGE_MERGE_INTERVAL_SECONDS: int = 10 * 60 module-attribute

Modules

step_logging

ZenML logging handler.

Classes
PipelineLogsStorage(logs_uri: str, artifact_store: BaseArtifactStore, max_messages: int = STEP_LOGS_STORAGE_MAX_MESSAGES, time_interval: int = STEP_LOGS_STORAGE_INTERVAL_SECONDS, merge_files_interval: int = STEP_LOGS_STORAGE_MERGE_INTERVAL_SECONDS)

Helper class which buffers and stores logs to a given URI.

Initialization.

Parameters:

Name Type Description Default
logs_uri str

the URI of the log file or folder.

required
artifact_store BaseArtifactStore

Artifact Store from the current step context

required
max_messages int

the maximum number of messages to save in the buffer.

STEP_LOGS_STORAGE_MAX_MESSAGES
time_interval int

the amount of seconds before the buffer gets saved automatically.

STEP_LOGS_STORAGE_INTERVAL_SECONDS
merge_files_interval int

the amount of seconds before the created files get merged into a single file.

STEP_LOGS_STORAGE_MERGE_INTERVAL_SECONDS
Source code in src/zenml/logging/step_logging.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def __init__(
    self,
    logs_uri: str,
    artifact_store: "BaseArtifactStore",
    max_messages: int = STEP_LOGS_STORAGE_MAX_MESSAGES,
    time_interval: int = STEP_LOGS_STORAGE_INTERVAL_SECONDS,
    merge_files_interval: int = STEP_LOGS_STORAGE_MERGE_INTERVAL_SECONDS,
) -> None:
    """Initialization.

    Args:
        logs_uri: the URI of the log file or folder.
        artifact_store: Artifact Store from the current step context
        max_messages: the maximum number of messages to save in the buffer.
        time_interval: the amount of seconds before the buffer gets saved
            automatically.
        merge_files_interval: the amount of seconds before the created files
            get merged into a single file.
    """
    # Parameters
    self.logs_uri = logs_uri
    self.max_messages = max_messages
    self.time_interval = time_interval
    self.merge_files_interval = merge_files_interval

    # State
    self.buffer: List[str] = []
    self.disabled_buffer: List[str] = []
    self.last_save_time = time.time()
    self.disabled = False
    self.artifact_store = artifact_store

    # Immutable filesystems state
    self.last_merge_time = time.time()
Functions
merge_log_files(merge_all_files: bool = False) -> None

Merges all log files into one in the given URI.

Called on the logging context exit.

Parameters:

Name Type Description Default
merge_all_files bool

whether to merge all files or only raw files

False
Source code in src/zenml/logging/step_logging.py
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
def merge_log_files(self, merge_all_files: bool = False) -> None:
    """Merges all log files into one in the given URI.

    Called on the logging context exit.

    Args:
        merge_all_files: whether to merge all files or only raw files
    """
    if self.artifact_store.config.IS_IMMUTABLE_FILESYSTEM:
        merged_file_suffix = "_merged"
        files_ = self.artifact_store.listdir(self.logs_uri)
        if not merge_all_files:
            # already merged files will not be merged again
            files_ = [f for f in files_ if merged_file_suffix not in f]
        file_name_ = self._get_timestamped_filename(
            suffix=merged_file_suffix
        )
        if len(files_) > 1:
            files_.sort()
            logger.debug("Log files count: %s", len(files_))

            missing_files = set()
            # dump all logs to a local file first
            with self.artifact_store.open(
                os.path.join(self.logs_uri, file_name_), "w"
            ) as merged_file:
                for file in files_:
                    try:
                        merged_file.write(
                            str(
                                _load_file_from_artifact_store(
                                    os.path.join(self.logs_uri, str(file)),
                                    artifact_store=self.artifact_store,
                                    mode="r",
                                )
                            )
                        )
                    except DoesNotExistException:
                        missing_files.add(file)

            # clean up left over files
            for file in files_:
                if file not in missing_files:
                    self.artifact_store.remove(
                        os.path.join(self.logs_uri, str(file))
                    )
save_to_file(force: bool = False) -> None

Method to save the buffer to the given URI.

Parameters:

Name Type Description Default
force bool

whether to force a save even if the write conditions not met.

False
Source code in src/zenml/logging/step_logging.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def save_to_file(self, force: bool = False) -> None:
    """Method to save the buffer to the given URI.

    Args:
        force: whether to force a save even if the write conditions not met.
    """
    import asyncio
    import threading

    # Most artifact stores are based on fsspec, which converts between
    # sync and async operations by using a separate AIO thread.
    # It may happen that the fsspec call itself will log something,
    # which will trigger this method, which may then use fsspec again,
    # causing a "Calling sync() from within a running loop" error, because
    # the fsspec library does not expect sync calls being made as a result
    # of a logging call made by itself.
    # To avoid this, we simply check if we're running in the fsspec AIO
    # thread and skip the save if that's the case.
    try:
        if (
            asyncio.events.get_running_loop() is not None
            and threading.current_thread().name == "fsspecIO"
        ):
            return
    except RuntimeError:
        # No running loop
        pass

    if not self.disabled and (self._is_write_needed or force):
        # IMPORTANT: keep this as the first code line in this method! The
        # code that follows might still emit logging messages, which will
        # end up triggering this method again, causing an infinite loop.
        self.disabled = True

        try:
            # The configured logging handler uses a lock to ensure that
            # logs generated by different threads are not interleaved.
            # Given that most artifact stores are based on fsspec, which
            # use a separate thread for async operations, it may happen that
            # the fsspec library itself will log something, which will end
            # up in a deadlock.
            # To avoid this, we temporarily disable the lock in the logging
            # handler while writing to the file.
            logging_handler = logging.getLogger().handlers[0]
            logging_lock = logging_handler.lock
            logging_handler.lock = None

            if self.buffer:
                if self.artifact_store.config.IS_IMMUTABLE_FILESYSTEM:
                    _logs_uri = self._get_timestamped_filename()
                    with self.artifact_store.open(
                        os.path.join(
                            self.logs_uri,
                            _logs_uri,
                        ),
                        "w",
                    ) as file:
                        for message in self.buffer:
                            file.write(f"{message}\n")
                else:
                    with self.artifact_store.open(
                        self.logs_uri, "a"
                    ) as file:
                        for message in self.buffer:
                            file.write(f"{message}\n")
                    self.artifact_store._remove_previous_file_versions(
                        self.logs_uri
                    )

        except (OSError, IOError) as e:
            # This exception can be raised if there are issues with the
            # underlying system calls, such as reaching the maximum number
            # of open files, permission issues, file corruption, or other
            # I/O errors.
            logger.error(f"Error while trying to write logs: {e}")
        finally:
            # Restore the original logging handler lock
            logging_handler.lock = logging_lock

            self.buffer = []
            self.last_save_time = time.time()

            self.disabled = False
    # merge created files on a given interval (defaults to 10 minutes)
    # only runs on Immutable Filesystems
    if (
        self.artifact_store.config.IS_IMMUTABLE_FILESYSTEM
        and time.time() - self.last_merge_time > self.merge_files_interval
    ):
        try:
            self.merge_log_files()
        except (OSError, IOError) as e:
            logger.error(f"Error while trying to roll up logs: {e}")
        finally:
            self.last_merge_time = time.time()
write(text: str) -> None

Main write method.

Parameters:

Name Type Description Default
text str

the incoming string.

required
Source code in src/zenml/logging/step_logging.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def write(self, text: str) -> None:
    """Main write method.

    Args:
        text: the incoming string.
    """
    if text == "\n":
        return

    if not self.disabled:
        # Add timestamp to the message when it's received
        timestamp = utc_now().strftime("%Y-%m-%d %H:%M:%S")
        formatted_message = (
            f"[{timestamp} UTC] {remove_ansi_escape_codes(text)}"
        )
        self.buffer.append(formatted_message.rstrip())
        self.save_to_file()
PipelineLogsStorageContext(logs_uri: str, artifact_store: BaseArtifactStore, prepend_step_name: bool = True)

Context manager which patches stdout and stderr during pipeline run execution.

Initializes and prepares a storage object.

Parameters:

Name Type Description Default
logs_uri str

the URI of the logs file.

required
artifact_store BaseArtifactStore

Artifact Store from the current pipeline run context.

required
prepend_step_name bool

Whether to prepend the step name to the logs.

True
Source code in src/zenml/logging/step_logging.py
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
def __init__(
    self,
    logs_uri: str,
    artifact_store: "BaseArtifactStore",
    prepend_step_name: bool = True,
) -> None:
    """Initializes and prepares a storage object.

    Args:
        logs_uri: the URI of the logs file.
        artifact_store: Artifact Store from the current pipeline run context.
        prepend_step_name: Whether to prepend the step name to the logs.
    """
    self.storage = PipelineLogsStorage(
        logs_uri=logs_uri, artifact_store=artifact_store
    )
    self.prepend_step_name = prepend_step_name
Functions
Functions
fetch_logs(zen_store: BaseZenStore, artifact_store_id: Union[str, UUID], logs_uri: str, offset: int = 0, length: int = 1024 * 1024 * 16) -> str

Fetches the logs from the artifact store.

Parameters:

Name Type Description Default
zen_store BaseZenStore

The store in which the artifact is stored.

required
artifact_store_id Union[str, UUID]

The ID of the artifact store.

required
logs_uri str

The URI of the artifact.

required
offset int

The offset from which to start reading.

0
length int

The amount of bytes that should be read.

1024 * 1024 * 16

Returns:

Type Description
str

The logs as a string.

Raises:

Type Description
DoesNotExistException

If the artifact does not exist in the artifact store.

Source code in src/zenml/logging/step_logging.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def fetch_logs(
    zen_store: "BaseZenStore",
    artifact_store_id: Union[str, UUID],
    logs_uri: str,
    offset: int = 0,
    length: int = 1024 * 1024 * 16,  # Default to 16MiB of data
) -> str:
    """Fetches the logs from the artifact store.

    Args:
        zen_store: The store in which the artifact is stored.
        artifact_store_id: The ID of the artifact store.
        logs_uri: The URI of the artifact.
        offset: The offset from which to start reading.
        length: The amount of bytes that should be read.

    Returns:
        The logs as a string.

    Raises:
        DoesNotExistException: If the artifact does not exist in the artifact
            store.
    """

    def _read_file(
        uri: str, offset: int = 0, length: Optional[int] = None
    ) -> str:
        return str(
            _load_file_from_artifact_store(
                uri,
                artifact_store=artifact_store,
                mode="rb",
                offset=offset,
                length=length,
            ).decode()
        )

    artifact_store = _load_artifact_store(artifact_store_id, zen_store)
    try:
        if not artifact_store.isdir(logs_uri):
            return _read_file(logs_uri, offset, length)
        else:
            files = artifact_store.listdir(logs_uri)
            if len(files) == 1:
                return _read_file(
                    os.path.join(logs_uri, str(files[0])), offset, length
                )
            else:
                is_negative_offset = offset < 0
                files.sort(reverse=is_negative_offset)

                # search for the first file we need to read
                latest_file_id = 0
                for i, file in enumerate(files):
                    file_size: int = artifact_store.size(
                        os.path.join(logs_uri, str(file))
                    )  # type: ignore[assignment]

                    if is_negative_offset:
                        if file_size >= -offset:
                            latest_file_id = -(i + 1)
                            break
                        else:
                            offset += file_size
                    else:
                        if file_size > offset:
                            latest_file_id = i
                            break
                        else:
                            offset -= file_size

                # read the files according to pre-filtering
                files.sort()
                ret = []
                for file in files[latest_file_id:]:
                    ret.append(
                        _read_file(
                            os.path.join(logs_uri, str(file)),
                            offset,
                            length,
                        )
                    )
                    offset = 0
                    length -= len(ret[-1])
                    if length <= 0:
                        # stop further reading, if the whole length is already read
                        break

                if not ret:
                    raise DoesNotExistException(
                        f"Folder '{logs_uri}' is empty in artifact store "
                        f"'{artifact_store.name}'."
                    )
                return "".join(ret)
    finally:
        artifact_store.cleanup()
prepare_logs_uri(artifact_store: BaseArtifactStore, step_name: Optional[str] = None, log_key: Optional[str] = None) -> str

Generates and prepares a URI for the log file or folder for a step.

Parameters:

Name Type Description Default
artifact_store BaseArtifactStore

The artifact store on which the artifact will be stored.

required
step_name Optional[str]

Name of the step. Skipped for global pipeline run logs.

None
log_key Optional[str]

The unique identification key of the log file.

None

Returns:

Type Description
str

The URI of the log storage (file or folder).

Source code in src/zenml/logging/step_logging.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def prepare_logs_uri(
    artifact_store: "BaseArtifactStore",
    step_name: Optional[str] = None,
    log_key: Optional[str] = None,
) -> str:
    """Generates and prepares a URI for the log file or folder for a step.

    Args:
        artifact_store: The artifact store on which the artifact will be stored.
        step_name: Name of the step. Skipped for global pipeline run logs.
        log_key: The unique identification key of the log file.

    Returns:
        The URI of the log storage (file or folder).
    """
    if log_key is None:
        log_key = str(uuid4())

    subfolder = step_name or PIPELINE_RUN_LOGS_FOLDER
    logs_base_uri = os.path.join(artifact_store.path, subfolder, "logs")

    # Create the dir
    if not artifact_store.exists(logs_base_uri):
        artifact_store.makedirs(logs_base_uri)

    # Delete the file if it already exists
    if artifact_store.config.IS_IMMUTABLE_FILESYSTEM:
        logs_uri_folder = os.path.join(logs_base_uri, log_key)
        if artifact_store.exists(logs_uri_folder):
            logger.warning(
                f"Logs directory {logs_uri_folder} already exists! Removing old log directory..."
            )
            artifact_store.rmtree(logs_uri_folder)

        artifact_store.makedirs(logs_uri_folder)
        return logs_uri_folder
    else:
        logs_uri = os.path.join(logs_base_uri, f"{log_key}{LOGS_EXTENSION}")
        if artifact_store.exists(logs_uri):
            logger.warning(
                f"Logs file {logs_uri} already exists! Removing old log file..."
            )
            artifact_store.remove(logs_uri)
        return logs_uri
remove_ansi_escape_codes(text: str) -> str

Auxiliary function to remove ANSI escape codes from a given string.

Parameters:

Name Type Description Default
text str

the input string

required

Returns:

Type Description
str

the version of the input string where the escape codes are removed.

Source code in src/zenml/logging/step_logging.py
55
56
57
58
59
60
61
62
63
64
65
def remove_ansi_escape_codes(text: str) -> str:
    """Auxiliary function to remove ANSI escape codes from a given string.

    Args:
        text: the input string

    Returns:
        the version of the input string where the escape codes are removed.
    """
    ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
    return ansi_escape.sub("", text)