Skip to content

Logging

zenml.logging special

step_logging

ZenML logging handler.

StepLoggingFormatter (Formatter)

Specialized formatter changing the level name if step handler is used.

Source code in zenml/logging/step_logging.py
class StepLoggingFormatter(logging.Formatter):
    """Specialized formatter changing the level name if step handler is used."""

    def format(self, record: logging.LogRecord) -> str:
        """Specialized format method to distinguish between logs and stdout.

        Args:
            record: the incoming log record

        Returns:
            the formatted string
        """
        if record.name == STEP_STDOUT_LOGGER_NAME:
            record.levelname = "STDOUT"

        if record.name == STEP_STDERR_LOGGER_NAME:
            record.levelname = "STDERR"

        return super().format(record)
format(self, record)

Specialized format method to distinguish between logs and stdout.

Parameters:

Name Type Description Default
record LogRecord

the incoming log record

required

Returns:

Type Description
str

the formatted string

Source code in zenml/logging/step_logging.py
def format(self, record: logging.LogRecord) -> str:
    """Specialized format method to distinguish between logs and stdout.

    Args:
        record: the incoming log record

    Returns:
        the formatted string
    """
    if record.name == STEP_STDOUT_LOGGER_NAME:
        record.levelname = "STDOUT"

    if record.name == STEP_STDERR_LOGGER_NAME:
        record.levelname = "STDERR"

    return super().format(record)

StepLoggingHandler (TimedRotatingFileHandler)

Specialized handler that stores ZenML step logs in artifact stores.

Source code in zenml/logging/step_logging.py
class StepLoggingHandler(TimedRotatingFileHandler):
    """Specialized handler that stores ZenML step logs in artifact stores."""

    def __init__(self, logs_uri: str):
        """Initializes the handler.

        Args:
            logs_uri: URI of the logs file.
        """
        self.logs_uri = logs_uri
        self.max_messages = LOGS_HANDLER_MAX_MESSAGES
        self.buffer = io.StringIO()
        self.message_count = 0
        self.last_upload_time = time.time()
        self.local_temp_file: Optional[str] = None
        self.disabled = False

        # set local_logging_file to self.logs_uri if self.logs_uri is a
        # local path otherwise, set local_logging_file to a temporary file
        if is_remote(self.logs_uri):
            # We log to a temporary file first, because
            # TimedRotatingFileHandler does not support writing
            # to a remote file, but still needs a file to get going
            local_logging_file = f".zenml_tmp_logs_{int(time.time())}"
            self.local_temp_file = local_logging_file
        else:
            local_logging_file = self.logs_uri

        super().__init__(
            local_logging_file,
            when="s",
            interval=LOGS_HANDLER_INTERVAL_SECONDS,
        )

    def emit(self, record: LogRecord) -> None:
        """Emits the log record.

        Args:
            record: Log record to emit.
        """
        msg = self.format(record)
        self.buffer.write(msg + "\n")
        self.message_count += 1

        current_time = time.time()
        time_elapsed = current_time - self.last_upload_time

        if (
            self.message_count >= self.max_messages
            or time_elapsed >= self.interval
        ):
            self.flush()

    def flush(self) -> None:
        """Flushes the buffer to the artifact store."""
        if not self.disabled:
            try:
                self.disabled = True
                with fileio.open(self.logs_uri, mode="wb") as log_file:
                    log_file.write(self.buffer.getvalue().encode("utf-8"))
            except (OSError, IOError) as e:
                # This exception can be raised if there are issues with the
                # underlying system calls, such as reaching the maximum number
                # of open files, permission issues, file corruption, or other
                # I/O errors.
                logger.error(f"Error while trying to write logs: {e}")
            finally:
                self.disabled = False

        self.message_count = 0
        self.last_upload_time = time.time()

    def doRollover(self) -> None:
        """Flushes the buffer and performs a rollover."""
        self.flush()
        super().doRollover()

    def close(self) -> None:
        """Tidy up any resources used by the handler."""
        self.flush()
        super().close()

        if not self.disabled:
            try:
                self.disabled = True
                if self.local_temp_file and fileio.exists(
                    self.local_temp_file
                ):
                    fileio.remove(self.local_temp_file)
            except (OSError, IOError) as e:
                # This exception can be raised if there are issues with the
                # underlying system calls, such as reaching the maximum number
                # of open files, permission issues, file corruption, or other
                # I/O errors.
                logger.error(f"Error while close the logger: {e}")
            finally:
                self.disabled = False
__init__(self, logs_uri) special

Initializes the handler.

Parameters:

Name Type Description Default
logs_uri str

URI of the logs file.

required
Source code in zenml/logging/step_logging.py
def __init__(self, logs_uri: str):
    """Initializes the handler.

    Args:
        logs_uri: URI of the logs file.
    """
    self.logs_uri = logs_uri
    self.max_messages = LOGS_HANDLER_MAX_MESSAGES
    self.buffer = io.StringIO()
    self.message_count = 0
    self.last_upload_time = time.time()
    self.local_temp_file: Optional[str] = None
    self.disabled = False

    # set local_logging_file to self.logs_uri if self.logs_uri is a
    # local path otherwise, set local_logging_file to a temporary file
    if is_remote(self.logs_uri):
        # We log to a temporary file first, because
        # TimedRotatingFileHandler does not support writing
        # to a remote file, but still needs a file to get going
        local_logging_file = f".zenml_tmp_logs_{int(time.time())}"
        self.local_temp_file = local_logging_file
    else:
        local_logging_file = self.logs_uri

    super().__init__(
        local_logging_file,
        when="s",
        interval=LOGS_HANDLER_INTERVAL_SECONDS,
    )
close(self)

Tidy up any resources used by the handler.

Source code in zenml/logging/step_logging.py
def close(self) -> None:
    """Tidy up any resources used by the handler."""
    self.flush()
    super().close()

    if not self.disabled:
        try:
            self.disabled = True
            if self.local_temp_file and fileio.exists(
                self.local_temp_file
            ):
                fileio.remove(self.local_temp_file)
        except (OSError, IOError) as e:
            # This exception can be raised if there are issues with the
            # underlying system calls, such as reaching the maximum number
            # of open files, permission issues, file corruption, or other
            # I/O errors.
            logger.error(f"Error while close the logger: {e}")
        finally:
            self.disabled = False
doRollover(self)

Flushes the buffer and performs a rollover.

Source code in zenml/logging/step_logging.py
def doRollover(self) -> None:
    """Flushes the buffer and performs a rollover."""
    self.flush()
    super().doRollover()
emit(self, record)

Emits the log record.

Parameters:

Name Type Description Default
record LogRecord

Log record to emit.

required
Source code in zenml/logging/step_logging.py
def emit(self, record: LogRecord) -> None:
    """Emits the log record.

    Args:
        record: Log record to emit.
    """
    msg = self.format(record)
    self.buffer.write(msg + "\n")
    self.message_count += 1

    current_time = time.time()
    time_elapsed = current_time - self.last_upload_time

    if (
        self.message_count >= self.max_messages
        or time_elapsed >= self.interval
    ):
        self.flush()
flush(self)

Flushes the buffer to the artifact store.

Source code in zenml/logging/step_logging.py
def flush(self) -> None:
    """Flushes the buffer to the artifact store."""
    if not self.disabled:
        try:
            self.disabled = True
            with fileio.open(self.logs_uri, mode="wb") as log_file:
                log_file.write(self.buffer.getvalue().encode("utf-8"))
        except (OSError, IOError) as e:
            # This exception can be raised if there are issues with the
            # underlying system calls, such as reaching the maximum number
            # of open files, permission issues, file corruption, or other
            # I/O errors.
            logger.error(f"Error while trying to write logs: {e}")
        finally:
            self.disabled = False

    self.message_count = 0
    self.last_upload_time = time.time()

StepStdErr (StringIO)

A replacement for the sys.stderr to turn outputs into logging entries.

When used in combination with the ZenHandler, this class allows us to capture any stderr outputs as logs and store them in the artifact store.

Right now, this is only used during the execution of a ZenML step.

Source code in zenml/logging/step_logging.py
class StepStdErr(StringIO):
    """A replacement for the sys.stderr to turn outputs into logging entries.

    When used in combination with the ZenHandler, this class allows us to
    capture any stderr outputs as logs and store them in the artifact store.

    Right now, this is only used during the execution of a ZenML step.
    """

    stderr_logger = logging.getLogger(STEP_STDERR_LOGGER_NAME)

    def write(self, message: str) -> int:
        """Write the incoming string as an info log entry.

        Args:
            message: the incoming message string

        Returns:
            the length of the message string
        """
        if message != "\n":
            self.stderr_logger.info(message)
        return len(message)
write(self, message)

Write the incoming string as an info log entry.

Parameters:

Name Type Description Default
message str

the incoming message string

required

Returns:

Type Description
int

the length of the message string

Source code in zenml/logging/step_logging.py
def write(self, message: str) -> int:
    """Write the incoming string as an info log entry.

    Args:
        message: the incoming message string

    Returns:
        the length of the message string
    """
    if message != "\n":
        self.stderr_logger.info(message)
    return len(message)

StepStdOut (StringIO)

A replacement for the sys.stdout to turn outputs into logging entries.

When used in combination with the ZenHandler, this class allows us to capture any print statements or third party outputs as logs and store them in the artifact store.

Right now, this is only used during the execution of a ZenML step.

Source code in zenml/logging/step_logging.py
class StepStdOut(StringIO):
    """A replacement for the sys.stdout to turn outputs into logging entries.

    When used in combination with the ZenHandler, this class allows us to
    capture any print statements or third party outputs as logs and store them
    in the artifact store.

    Right now, this is only used during the execution of a ZenML step.
    """

    stdout_logger = logging.getLogger(STEP_STDOUT_LOGGER_NAME)

    def write(self, message: str) -> int:
        """Write the incoming string as an info log entry.

        Args:
            message: the incoming message string

        Returns:
            the length of the message string
        """
        if message != "\n":
            self.stdout_logger.info(message)
        return len(message)
write(self, message)

Write the incoming string as an info log entry.

Parameters:

Name Type Description Default
message str

the incoming message string

required

Returns:

Type Description
int

the length of the message string

Source code in zenml/logging/step_logging.py
def write(self, message: str) -> int:
    """Write the incoming string as an info log entry.

    Args:
        message: the incoming message string

    Returns:
        the length of the message string
    """
    if message != "\n":
        self.stdout_logger.info(message)
    return len(message)

get_step_logging_handler(logs_uri)

Sets up a logging handler for the artifact store.

Parameters:

Name Type Description Default
logs_uri str

The URI of the output artifact.

required

Returns:

Type Description
StepLoggingHandler

The logging handler.

Source code in zenml/logging/step_logging.py
def get_step_logging_handler(logs_uri: str) -> StepLoggingHandler:
    """Sets up a logging handler for the artifact store.

    Args:
        logs_uri: The URI of the output artifact.

    Returns:
        The logging handler.
    """
    log_format = "%(asctime)s - %(levelname)s - %(message)s"
    date_format = "%Y-%m-%dT%H:%M:%S"  # ISO 8601 format
    handler = StepLoggingHandler(logs_uri)
    handler.setFormatter(StepLoggingFormatter(log_format, datefmt=date_format))
    return handler

prepare_logs_uri(artifact_store, step_name, log_key=None)

Generates and prepares a URI for the log file for a step.

Parameters:

Name Type Description Default
artifact_store BaseArtifactStore

The artifact store on which the artifact will be stored.

required
step_name str

Name of the step.

required
log_key Optional[str]

The unique identification key of the log file.

None

Returns:

Type Description
str

The URI of the logs file.

Source code in zenml/logging/step_logging.py
def prepare_logs_uri(
    artifact_store: "BaseArtifactStore",
    step_name: str,
    log_key: Optional[str] = None,
) -> str:
    """Generates and prepares a URI for the log file for a step.

    Args:
        artifact_store: The artifact store on which the artifact will be stored.
        step_name: Name of the step.
        log_key: The unique identification key of the log file.

    Returns:
        The URI of the logs file.
    """
    if log_key is None:
        log_key = str(uuid4())

    logs_base_uri = os.path.join(
        artifact_store.path,
        step_name,
        "logs",
    )

    # Create the dir
    if not fileio.exists(logs_base_uri):
        fileio.makedirs(logs_base_uri)

    # Delete the file if it already exists
    logs_uri = os.path.join(logs_base_uri, f"{log_key}.log")
    if fileio.exists(logs_uri):
        logger.warning(
            f"Logs file {logs_uri} already exists! Removing old log file..."
        )
        fileio.remove(logs_uri)
    return logs_uri