Skip to content

Logging

zenml.logging special

step_logging

ZenML logging handler.

StepLogsStorage

Helper class which buffers and stores logs to a given URI.

Source code in zenml/logging/step_logging.py
class StepLogsStorage:
    """Helper class which buffers and stores logs to a given URI."""

    def __init__(
        self,
        logs_uri: str,
        max_messages: int = STEP_LOGS_STORAGE_MAX_MESSAGES,
        time_interval: int = STEP_LOGS_STORAGE_INTERVAL_SECONDS,
    ) -> None:
        """Initialization.

        Args:
            logs_uri: the target URI to store the logs.
            max_messages: the maximum number of messages to save in the buffer.
            time_interval: the amount of seconds before the buffer gets saved
                automatically.
        """
        # Parameters
        self.logs_uri = logs_uri
        self.max_messages = max_messages
        self.time_interval = time_interval

        # State
        self.buffer: List[str] = []
        self.disabled_buffer: List[str] = []
        self.last_save_time = time.time()
        self.disabled = False

    def write(self, text: str) -> None:
        """Main write method.

        Args:
            text: the incoming string.
        """
        if text == "\n":
            return

        if not self.disabled:
            self.buffer.append(text)

            if (
                len(self.buffer) >= self.max_messages
                or time.time() - self.last_save_time >= self.time_interval
            ):
                self.save_to_file()

    def save_to_file(self) -> None:
        """Method to save the buffer to the given URI."""
        artifact_store = Client().active_stack.artifact_store
        if not self.disabled:
            try:
                self.disabled = True

                if self.buffer:
                    with artifact_store.open(self.logs_uri, "a") as file:
                        for message in self.buffer:
                            file.write(
                                remove_ansi_escape_codes(message) + "\n"
                            )

            except (OSError, IOError) as e:
                # This exception can be raised if there are issues with the
                # underlying system calls, such as reaching the maximum number
                # of open files, permission issues, file corruption, or other
                # I/O errors.
                logger.error(f"Error while trying to write logs: {e}")
            finally:
                self.buffer = []
                self.last_save_time = time.time()

                self.disabled = False
__init__(self, logs_uri, max_messages=100, time_interval=15) special

Initialization.

Parameters:

Name Type Description Default
logs_uri str

the target URI to store the logs.

required
max_messages int

the maximum number of messages to save in the buffer.

100
time_interval int

the amount of seconds before the buffer gets saved automatically.

15
Source code in zenml/logging/step_logging.py
def __init__(
    self,
    logs_uri: str,
    max_messages: int = STEP_LOGS_STORAGE_MAX_MESSAGES,
    time_interval: int = STEP_LOGS_STORAGE_INTERVAL_SECONDS,
) -> None:
    """Initialization.

    Args:
        logs_uri: the target URI to store the logs.
        max_messages: the maximum number of messages to save in the buffer.
        time_interval: the amount of seconds before the buffer gets saved
            automatically.
    """
    # Parameters
    self.logs_uri = logs_uri
    self.max_messages = max_messages
    self.time_interval = time_interval

    # State
    self.buffer: List[str] = []
    self.disabled_buffer: List[str] = []
    self.last_save_time = time.time()
    self.disabled = False
save_to_file(self)

Method to save the buffer to the given URI.

Source code in zenml/logging/step_logging.py
def save_to_file(self) -> None:
    """Method to save the buffer to the given URI."""
    artifact_store = Client().active_stack.artifact_store
    if not self.disabled:
        try:
            self.disabled = True

            if self.buffer:
                with artifact_store.open(self.logs_uri, "a") as file:
                    for message in self.buffer:
                        file.write(
                            remove_ansi_escape_codes(message) + "\n"
                        )

        except (OSError, IOError) as e:
            # This exception can be raised if there are issues with the
            # underlying system calls, such as reaching the maximum number
            # of open files, permission issues, file corruption, or other
            # I/O errors.
            logger.error(f"Error while trying to write logs: {e}")
        finally:
            self.buffer = []
            self.last_save_time = time.time()

            self.disabled = False
write(self, text)

Main write method.

Parameters:

Name Type Description Default
text str

the incoming string.

required
Source code in zenml/logging/step_logging.py
def write(self, text: str) -> None:
    """Main write method.

    Args:
        text: the incoming string.
    """
    if text == "\n":
        return

    if not self.disabled:
        self.buffer.append(text)

        if (
            len(self.buffer) >= self.max_messages
            or time.time() - self.last_save_time >= self.time_interval
        ):
            self.save_to_file()

StepLogsStorageContext

Context manager which patches stdout and stderr during step execution.

Source code in zenml/logging/step_logging.py
class StepLogsStorageContext:
    """Context manager which patches stdout and stderr during step execution."""

    def __init__(self, logs_uri: str) -> None:
        """Initializes and prepares a storage object.

        Args:
            logs_uri: the URI of the logs file.
        """
        self.storage = StepLogsStorage(logs_uri=logs_uri)

    def __enter__(self) -> "StepLogsStorageContext":
        """Enter condition of the context manager.

        Wraps the `write` method of both stderr and stdout, so each incoming
        message gets stored in the step logs storage.

        Returns:
            self
        """
        self.stdout_write = getattr(sys.stdout, "write")
        self.stdout_flush = getattr(sys.stdout, "flush")

        self.stderr_write = getattr(sys.stderr, "write")
        self.stderr_flush = getattr(sys.stderr, "flush")

        setattr(sys.stdout, "write", self._wrap_write(self.stdout_write))
        setattr(sys.stdout, "flush", self._wrap_flush(self.stdout_flush))

        setattr(sys.stderr, "write", self._wrap_write(self.stdout_write))
        setattr(sys.stderr, "flush", self._wrap_flush(self.stdout_flush))

        redirected.set(True)
        return self

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_val: Optional[BaseException],
        exc_tb: Optional[TracebackType],
    ) -> None:
        """Exit condition of the context manager.

        Args:
            exc_type: The class of the exception
            exc_val: The instance of the exception
            exc_tb: The traceback of the exception

        Restores the `write` method of both stderr and stdout.
        """
        self.storage.save_to_file()

        setattr(sys.stdout, "write", self.stdout_write)
        setattr(sys.stdout, "flush", self.stdout_flush)

        setattr(sys.stderr, "write", self.stderr_write)
        setattr(sys.stderr, "flush", self.stderr_flush)

        redirected.set(False)

    def _wrap_write(self, method: Callable[..., Any]) -> Callable[..., Any]:
        """Wrapper function that utilizes the storage object to store logs.

        Args:
            method: the original write method

        Returns:
            the wrapped write method.
        """

        def wrapped_write(*args: Any, **kwargs: Any) -> Any:
            output = method(*args, **kwargs)
            if args:
                self.storage.write(args[0])
            return output

        return wrapped_write

    def _wrap_flush(self, method: Callable[..., Any]) -> Callable[..., Any]:
        """Wrapper function that flushes the buffer of the storage object.

        Args:
            method: the original flush method

        Returns:
            the wrapped flush method.
        """

        def wrapped_flush(*args: Any, **kwargs: Any) -> Any:
            output = method(*args, **kwargs)
            self.storage.save_to_file()
            return output

        return wrapped_flush
__enter__(self) special

Enter condition of the context manager.

Wraps the write method of both stderr and stdout, so each incoming message gets stored in the step logs storage.

Returns:

Type Description
StepLogsStorageContext

self

Source code in zenml/logging/step_logging.py
def __enter__(self) -> "StepLogsStorageContext":
    """Enter condition of the context manager.

    Wraps the `write` method of both stderr and stdout, so each incoming
    message gets stored in the step logs storage.

    Returns:
        self
    """
    self.stdout_write = getattr(sys.stdout, "write")
    self.stdout_flush = getattr(sys.stdout, "flush")

    self.stderr_write = getattr(sys.stderr, "write")
    self.stderr_flush = getattr(sys.stderr, "flush")

    setattr(sys.stdout, "write", self._wrap_write(self.stdout_write))
    setattr(sys.stdout, "flush", self._wrap_flush(self.stdout_flush))

    setattr(sys.stderr, "write", self._wrap_write(self.stdout_write))
    setattr(sys.stderr, "flush", self._wrap_flush(self.stdout_flush))

    redirected.set(True)
    return self
__exit__(self, exc_type, exc_val, exc_tb) special

Exit condition of the context manager.

Parameters:

Name Type Description Default
exc_type Optional[Type[BaseException]]

The class of the exception

required
exc_val Optional[BaseException]

The instance of the exception

required
exc_tb Optional[traceback]

The traceback of the exception

required

Restores the write method of both stderr and stdout.

Source code in zenml/logging/step_logging.py
def __exit__(
    self,
    exc_type: Optional[Type[BaseException]],
    exc_val: Optional[BaseException],
    exc_tb: Optional[TracebackType],
) -> None:
    """Exit condition of the context manager.

    Args:
        exc_type: The class of the exception
        exc_val: The instance of the exception
        exc_tb: The traceback of the exception

    Restores the `write` method of both stderr and stdout.
    """
    self.storage.save_to_file()

    setattr(sys.stdout, "write", self.stdout_write)
    setattr(sys.stdout, "flush", self.stdout_flush)

    setattr(sys.stderr, "write", self.stderr_write)
    setattr(sys.stderr, "flush", self.stderr_flush)

    redirected.set(False)
__init__(self, logs_uri) special

Initializes and prepares a storage object.

Parameters:

Name Type Description Default
logs_uri str

the URI of the logs file.

required
Source code in zenml/logging/step_logging.py
def __init__(self, logs_uri: str) -> None:
    """Initializes and prepares a storage object.

    Args:
        logs_uri: the URI of the logs file.
    """
    self.storage = StepLogsStorage(logs_uri=logs_uri)

prepare_logs_uri(artifact_store, step_name, log_key=None)

Generates and prepares a URI for the log file for a step.

Parameters:

Name Type Description Default
artifact_store BaseArtifactStore

The artifact store on which the artifact will be stored.

required
step_name str

Name of the step.

required
log_key Optional[str]

The unique identification key of the log file.

None

Returns:

Type Description
str

The URI of the logs file.

Source code in zenml/logging/step_logging.py
def prepare_logs_uri(
    artifact_store: "BaseArtifactStore",
    step_name: str,
    log_key: Optional[str] = None,
) -> str:
    """Generates and prepares a URI for the log file for a step.

    Args:
        artifact_store: The artifact store on which the artifact will be stored.
        step_name: Name of the step.
        log_key: The unique identification key of the log file.

    Returns:
        The URI of the logs file.
    """
    artifact_store = Client().active_stack.artifact_store
    if log_key is None:
        log_key = str(uuid4())

    logs_base_uri = os.path.join(
        artifact_store.path,
        step_name,
        "logs",
    )

    # Create the dir
    if not artifact_store.exists(logs_base_uri):
        artifact_store.makedirs(logs_base_uri)

    # Delete the file if it already exists
    logs_uri = os.path.join(logs_base_uri, f"{log_key}.log")
    if artifact_store.exists(logs_uri):
        logger.warning(
            f"Logs file {logs_uri} already exists! Removing old log file..."
        )
        artifact_store.remove(logs_uri)
    return logs_uri

remove_ansi_escape_codes(text)

Auxiliary function to remove ANSI escape codes from a given string.

Parameters:

Name Type Description Default
text str

the input string

required

Returns:

Type Description
str

the version of the input string where the escape codes are removed.

Source code in zenml/logging/step_logging.py
def remove_ansi_escape_codes(text: str) -> str:
    """Auxiliary function to remove ANSI escape codes from a given string.

    Args:
        text: the input string

    Returns:
        the version of the input string where the escape codes are removed.
    """
    ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
    return ansi_escape.sub("", text)