Skip to content

Log Stores

zenml.log_stores

Implements the log stores for ZenML.

Attributes

__all__ = ['ArtifactLogStore', 'BaseLogStore', 'BaseLogStoreConfig', 'BaseLogStoreFlavor', 'DatadogLogStore', 'DatadogLogStoreConfig', 'DatadogLogStoreFlavor', 'OtelLogStore', 'OtelLogStoreConfig', 'OtelLogStoreFlavor'] module-attribute

Classes

ArtifactLogStore(artifact_store: BaseArtifactStore, *args: Any, **kwargs: Any)

Bases: OtelLogStore

Log store that saves logs to the artifact store.

This implementation extends OtelLogStore and uses the ArtifactLogExporter to write logs to the artifact store. Inherits all OTEL infrastructure including shared BatchLogRecordProcessor and routing.

Initialize the artifact log store.

Parameters:

Name Type Description Default
artifact_store BaseArtifactStore

The artifact store to use for logging.

required
*args Any

Positional arguments for the base class.

()
**kwargs Any

Keyword arguments for the base class.

{}
Source code in src/zenml/log_stores/artifact/artifact_log_store.py
247
248
249
250
251
252
253
254
255
256
257
258
def __init__(
    self, artifact_store: "BaseArtifactStore", *args: Any, **kwargs: Any
) -> None:
    """Initialize the artifact log store.

    Args:
        artifact_store: The artifact store to use for logging.
        *args: Positional arguments for the base class.
        **kwargs: Keyword arguments for the base class.
    """
    super().__init__(*args, **kwargs)
    self._artifact_store = artifact_store
Attributes
config: ArtifactLogStoreConfig property

Returns the configuration of the artifact log store.

Returns:

Type Description
ArtifactLogStoreConfig

The configuration.

origin_class: Type[ArtifactLogStoreOrigin] property

Class of the origin.

Returns:

Type Description
Type[ArtifactLogStoreOrigin]

The class of the origin.

Functions
cleanup() -> None

Cleanup the artifact log store.

This method is called to ensure that the artifact log store is cleaned up.

Source code in src/zenml/log_stores/artifact/artifact_log_store.py
381
382
383
384
385
386
def cleanup(self) -> None:
    """Cleanup the artifact log store.

    This method is called to ensure that the artifact log store is cleaned up.
    """
    self._artifact_store.cleanup()
fetch(logs_model: LogsResponse, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, limit: int = MAX_ENTRIES_PER_REQUEST) -> List[LogEntry]

Fetch logs from the artifact store.

Parameters:

Name Type Description Default
logs_model LogsResponse

The logs model containing uri and artifact_store_id.

required
start_time Optional[datetime]

Filter logs after this time.

None
end_time Optional[datetime]

Filter logs before this time.

None
limit int

Maximum number of log entries to return.

MAX_ENTRIES_PER_REQUEST

Returns:

Type Description
List[LogEntry]

List of log entries from the artifact store.

Raises:

Type Description
ValueError

If logs_model.uri is not provided.

Source code in src/zenml/log_stores/artifact/artifact_log_store.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
def fetch(
    self,
    logs_model: "LogsResponse",
    start_time: Optional[datetime] = None,
    end_time: Optional[datetime] = None,
    limit: int = MAX_ENTRIES_PER_REQUEST,
) -> List["LogEntry"]:
    """Fetch logs from the artifact store.

    Args:
        logs_model: The logs model containing uri and artifact_store_id.
        start_time: Filter logs after this time.
        end_time: Filter logs before this time.
        limit: Maximum number of log entries to return.

    Returns:
        List of log entries from the artifact store.

    Raises:
        ValueError: If logs_model.uri is not provided.
    """
    if not logs_model.uri:
        raise ValueError(
            "logs_model.uri is required for ArtifactLogStore.fetch()"
        )

    if not logs_model.artifact_store_id:
        raise ValueError(
            "logs_model.artifact_store_id is required "
            "for ArtifactLogStore.fetch()"
        )

    if logs_model.artifact_store_id != self._artifact_store.id:
        raise ValueError(
            "logs_model.artifact_store_id does not match the artifact store "
            "id of the log store."
        )

    if start_time or end_time:
        logger.warning(
            "start_time and end_time are not supported for "
            "ArtifactLogStore.fetch(). Both parameters will be ignored."
        )

    log_entries = fetch_log_records(
        artifact_store=self._artifact_store,
        logs_uri=logs_model.uri,
        limit=limit,
    )

    return log_entries
from_artifact_store(artifact_store: BaseArtifactStore) -> ArtifactLogStore classmethod

Creates an artifact log store from an artifact store.

Parameters:

Name Type Description Default
artifact_store BaseArtifactStore

The artifact store to create the log store from.

required

Returns:

Type Description
ArtifactLogStore

The created artifact log store.

Source code in src/zenml/log_stores/artifact/artifact_log_store.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
@classmethod
def from_artifact_store(
    cls, artifact_store: "BaseArtifactStore"
) -> "ArtifactLogStore":
    """Creates an artifact log store from an artifact store.

    Args:
        artifact_store: The artifact store to create the log store from.

    Returns:
        The created artifact log store.
    """
    return cls(
        artifact_store=artifact_store,
        id=artifact_store.id,
        name="default",
        config=ArtifactLogStoreConfig(endpoint=artifact_store.path),
        flavor="artifact",
        type=StackComponentType.LOG_STORE,
        user=artifact_store.user,
        created=artifact_store.created,
        updated=artifact_store.updated,
    )
get_exporter() -> LogExporter

Get the artifact log exporter for this log store.

Returns:

Type Description
LogExporter

The ArtifactLogExporter instance.

Source code in src/zenml/log_stores/artifact/artifact_log_store.py
302
303
304
305
306
307
308
309
310
311
312
def get_exporter(self) -> "LogExporter":
    """Get the artifact log exporter for this log store.

    Returns:
        The ArtifactLogExporter instance.
    """
    from zenml.log_stores.artifact.artifact_log_exporter import (
        ArtifactLogExporter,
    )

    return ArtifactLogExporter(artifact_store=self._artifact_store)

BaseLogStore(*args: Any, **kwargs: Any)

Bases: StackComponent, ABC

Base class for all ZenML log stores.

A log store is responsible for collecting, storing, and retrieving logs during pipeline and step execution. Different implementations may store logs in different backends (artifact store, OpenTelemetry, Datadog, etc.).

Initialize the log store.

Parameters:

Name Type Description Default
*args Any

Positional arguments for the base class.

()
**kwargs Any

Keyword arguments for the base class.

{}
Source code in src/zenml/log_stores/base_log_store.py
79
80
81
82
83
84
85
86
87
88
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initialize the log store.

    Args:
        *args: Positional arguments for the base class.
        **kwargs: Keyword arguments for the base class.
    """
    super().__init__(*args, **kwargs)
    self._origins: Dict[str, BaseLogStoreOrigin] = {}
    self._lock = threading.RLock()
Attributes
config: BaseLogStoreConfig property

Returns the configuration of the log store.

Returns:

Type Description
BaseLogStoreConfig

The configuration.

origin_class: Type[BaseLogStoreOrigin] property

Class of the origin.

Returns:

Type Description
Type[BaseLogStoreOrigin]

The class of the origin used with this log store.

Functions
deregister_origin(origin: BaseLogStoreOrigin, blocking: bool = True) -> None

Deregister an origin previously registered with the log store.

If no other origins are left, the log store will be flushed. The blocking parameter determines whether to block until the flush is complete.

Parameters:

Name Type Description Default
origin BaseLogStoreOrigin

The origin to deregister.

required
blocking bool

Whether to block until the deregistration is complete and all logs are flushed if this is the last origin registered.

True
Source code in src/zenml/log_stores/base_log_store.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def deregister_origin(
    self,
    origin: BaseLogStoreOrigin,
    blocking: bool = True,
) -> None:
    """Deregister an origin previously registered with the log store.

    If no other origins are left, the log store will be flushed. The
    `blocking` parameter determines whether to block until the flush is
    complete.

    Args:
        origin: The origin to deregister.
        blocking: Whether to block until the deregistration is complete
            and all logs are flushed if this is the last origin registered.
    """
    with self._lock:
        if origin.name not in self._origins:
            return
        self._release_origin(origin)
        del self._origins[origin.name]
        if len(self._origins) == 0:
            self.flush(blocking=blocking)
emit(origin: BaseLogStoreOrigin, record: logging.LogRecord, metadata: Optional[Dict[str, Any]] = None) -> None abstractmethod

Process a log record from the logging system.

Parameters:

Name Type Description Default
origin BaseLogStoreOrigin

The origin used to send the log record.

required
record LogRecord

The Python logging.LogRecord to process.

required
metadata Optional[Dict[str, Any]]

Additional metadata to attach to the log entry.

None
Source code in src/zenml/log_stores/base_log_store.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
@abstractmethod
def emit(
    self,
    origin: BaseLogStoreOrigin,
    record: logging.LogRecord,
    metadata: Optional[Dict[str, Any]] = None,
) -> None:
    """Process a log record from the logging system.

    Args:
        origin: The origin used to send the log record.
        record: The Python logging.LogRecord to process.
        metadata: Additional metadata to attach to the log entry.
    """
fetch(logs_model: LogsResponse, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, limit: int = MAX_ENTRIES_PER_REQUEST) -> List[LogEntry] abstractmethod

Fetch logs from the log store.

This method is called from the server to retrieve logs for display on the dashboard or via API. The implementation should not require any integration-specific SDKs that aren't available on the server.

Parameters:

Name Type Description Default
logs_model LogsResponse

The logs model containing metadata about the logs.

required
start_time Optional[datetime]

Filter logs after this time.

None
end_time Optional[datetime]

Filter logs before this time.

None
limit int

Maximum number of log entries to return.

MAX_ENTRIES_PER_REQUEST

Returns:

Type Description
List[LogEntry]

List of log entries matching the query.

Source code in src/zenml/log_stores/base_log_store.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
@abstractmethod
def fetch(
    self,
    logs_model: LogsResponse,
    start_time: Optional[datetime] = None,
    end_time: Optional[datetime] = None,
    limit: int = MAX_ENTRIES_PER_REQUEST,
) -> List[LogEntry]:
    """Fetch logs from the log store.

    This method is called from the server to retrieve logs for display
    on the dashboard or via API. The implementation should not require
    any integration-specific SDKs that aren't available on the server.

    Args:
        logs_model: The logs model containing metadata about the logs.
        start_time: Filter logs after this time.
        end_time: Filter logs before this time.
        limit: Maximum number of log entries to return.

    Returns:
        List of log entries matching the query.
    """
flush(blocking: bool = True) -> None abstractmethod

Flush the log store.

This method is called to ensure that all logs are flushed to the backend.

Parameters:

Name Type Description Default
blocking bool

Whether to block until the flush is complete.

True
Source code in src/zenml/log_stores/base_log_store.py
183
184
185
186
187
188
189
190
191
@abstractmethod
def flush(self, blocking: bool = True) -> None:
    """Flush the log store.

    This method is called to ensure that all logs are flushed to the backend.

    Args:
        blocking: Whether to block until the flush is complete.
    """
register_origin(name: str, log_model: LogsResponse, metadata: Dict[str, Any]) -> BaseLogStoreOrigin

Register an origin for the log store.

Parameters:

Name Type Description Default
name str

The name of the origin.

required
log_model LogsResponse

The log model associated with the origin.

required
metadata Dict[str, Any]

Additional metadata to attach to the log entry.

required

Returns:

Type Description
BaseLogStoreOrigin

The origin.

Source code in src/zenml/log_stores/base_log_store.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def register_origin(
    self, name: str, log_model: LogsResponse, metadata: Dict[str, Any]
) -> BaseLogStoreOrigin:
    """Register an origin for the log store.

    Args:
        name: The name of the origin.
        log_model: The log model associated with the origin.
        metadata: Additional metadata to attach to the log entry.

    Returns:
        The origin.
    """
    with self._lock:
        origin = self.origin_class(name, self, log_model, metadata)
        self._origins[name] = origin
        return origin

BaseLogStoreConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: StackComponentConfig

Base configuration for all log stores.

Source code in src/zenml/stack/stack_component.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)

BaseLogStoreFlavor

Bases: Flavor

Base class for all ZenML log store flavors.

Attributes
config_class: Type[BaseLogStoreConfig] property

Config class for the base log store flavor.

Returns:

Type Description
Type[BaseLogStoreConfig]

The config class.

implementation_class: Type[BaseLogStore] abstractmethod property

Implementation class for the base log store flavor.

Returns:

Type Description
Type[BaseLogStore]

The implementation class.

type: StackComponentType property

Type of the flavor.

Returns:

Type Description
StackComponentType

The type of the flavor.

DatadogLogStore(*args: Any, **kwargs: Any)

Bases: OtelLogStore

Log store that exports logs to Datadog.

This implementation extends OtelLogStore and configures it to send logs to Datadog's HTTP intake API.

Source code in src/zenml/log_stores/otel/otel_log_store.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initialize the OpenTelemetry log store.

    Args:
        *args: Positional arguments for the base class.
        **kwargs: Keyword arguments for the base class.
    """
    super().__init__(*args, **kwargs)

    self._resource: Optional["Resource"] = None
    self._exporter: Optional["LogExporter"] = None
    self._provider: Optional["LoggerProvider"] = None
    self._processor: Optional["OtelBatchLogRecordProcessor"] = None
    self._handler: Optional["LoggingHandler"] = None
Attributes
config: DatadogLogStoreConfig property

Returns the configuration of the Datadog log store.

Returns:

Type Description
DatadogLogStoreConfig

The configuration.

Functions
cleanup() -> None

Cleanup the Datadog log store.

This method is called when the log store is no longer needed.

Source code in src/zenml/log_stores/datadog/datadog_log_store.py
242
243
244
245
246
247
248
249
def cleanup(self) -> None:
    """Cleanup the Datadog log store.

    This method is called when the log store is no longer needed.
    """
    if self._datadog_exporter:
        self._datadog_exporter.shutdown()
        self._datadog_exporter = None
fetch(logs_model: LogsResponse, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, limit: int = MAX_ENTRIES_PER_REQUEST) -> List[LogEntry]

Fetch logs from Datadog's API.

This method queries Datadog's Logs API to retrieve logs for the specified pipeline run and step. It automatically paginates through results to fetch up to the requested limit.

Parameters:

Name Type Description Default
logs_model LogsResponse

The logs model containing run and step metadata.

required
start_time Optional[datetime]

Filter logs after this time.

None
end_time Optional[datetime]

Filter logs before this time.

None
limit int

Maximum number of log entries to return.

MAX_ENTRIES_PER_REQUEST

Returns:

Type Description
List[LogEntry]

List of log entries from Datadog.

Source code in src/zenml/log_stores/datadog/datadog_log_store.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def fetch(
    self,
    logs_model: "LogsResponse",
    start_time: Optional[datetime] = None,
    end_time: Optional[datetime] = None,
    limit: int = MAX_ENTRIES_PER_REQUEST,
) -> List["LogEntry"]:
    """Fetch logs from Datadog's API.

    This method queries Datadog's Logs API to retrieve logs for the
    specified pipeline run and step. It automatically paginates through
    results to fetch up to the requested limit.

    Args:
        logs_model: The logs model containing run and step metadata.
        start_time: Filter logs after this time.
        end_time: Filter logs before this time.
        limit: Maximum number of log entries to return.

    Returns:
        List of log entries from Datadog.
    """
    query_parts = [
        f"service:{self.config.service_name}",
        f"@zenml.log.id:{logs_model.id}",
    ]

    query = " ".join(query_parts)

    api_endpoint = (
        f"https://api.{self.config.site}/api/v2/logs/events/search"
    )
    headers = {
        "DD-API-KEY": self.config.api_key.get_secret_value(),
        "DD-APPLICATION-KEY": self.config.application_key.get_secret_value(),
        "Content-Type": "application/json",
    }

    log_entries: List[LogEntry] = []
    cursor: Optional[str] = None
    remaining = limit

    try:
        while remaining > 0:
            # Datadog API limit is 1000 per request
            page_limit = min(remaining, 1000)

            body: Dict[str, Any] = {
                "filter": {
                    "query": query,
                    "from": (
                        start_time.isoformat()
                        if start_time
                        else logs_model.created.isoformat()
                    ),
                    "to": (
                        end_time.isoformat()
                        if end_time
                        else datetime.now().astimezone().isoformat()
                    ),
                },
                "page": {
                    "limit": page_limit,
                },
                "sort": "timestamp",
            }

            if cursor:
                body["page"]["cursor"] = cursor

            response = requests.post(
                api_endpoint,
                headers=headers,
                json=body,
                timeout=30,
            )

            if response.status_code != 200:
                logger.error(
                    f"Failed to fetch logs from Datadog: "
                    f"{response.status_code} - {response.text[:200]}"
                )
                break

            data = response.json()
            logs = data.get("data", [])

            if not logs:
                break

            for log in logs:
                entry = self._parse_log_entry(log)
                if entry:
                    log_entries.append(entry)

            remaining -= len(logs)

            # Get cursor for next page
            cursor = data.get("meta", {}).get("page", {}).get("after")
            if not cursor:
                break

        logger.debug(f"Fetched {len(log_entries)} logs from Datadog")
        return log_entries

    except Exception as e:
        logger.exception(f"Error fetching logs from Datadog: {e}")
        return log_entries  # Return what we have so far
get_exporter() -> DatadogLogExporter

Get the Datadog log exporter.

Returns:

Type Description
DatadogLogExporter

DatadogExporter with the proper configuration.

Source code in src/zenml/log_stores/datadog/datadog_log_store.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def get_exporter(self) -> DatadogLogExporter:
    """Get the Datadog log exporter.

    Returns:
        DatadogExporter with the proper configuration.
    """
    if not self._datadog_exporter:
        headers = {
            "dd-api-key": self.config.api_key.get_secret_value(),
            "dd-application-key": self.config.application_key.get_secret_value(),
        }
        if self.config.headers:
            headers.update(self.config.headers)

        self._datadog_exporter = DatadogLogExporter(
            endpoint=self.config.endpoint,
            headers=headers,
            certificate_file=self.config.certificate_file,
            client_key_file=self.config.client_key_file,
            client_certificate_file=self.config.client_certificate_file,
            compression=self.config.compression,
        )
    return self._datadog_exporter

DatadogLogStoreConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: OtelLogStoreConfig

Configuration for Datadog log store.

Attributes:

Name Type Description
api_key PlainSerializedSecretStr

Datadog API key for log ingestion.

application_key PlainSerializedSecretStr

Datadog application key for log extraction.

site str

Datadog site (e.g., "datadoghq.com", "datadoghq.eu").

max_export_batch_size int

Maximum batch size for exports (Datadog limit: 1000).

Source code in src/zenml/stack/stack_component.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Functions
set_default_endpoint(data: Dict[str, Any]) -> Dict[str, Any] classmethod

Set the endpoint based on site if not provided.

Parameters:

Name Type Description Default
data Dict[str, Any]

The input data dictionary.

required

Returns:

Type Description
Dict[str, Any]

The data dictionary with the endpoint set if not provided.

Source code in src/zenml/log_stores/datadog/datadog_flavor.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
@model_validator(mode="before")
@classmethod
def set_default_endpoint(cls, data: Dict[str, Any]) -> Dict[str, Any]:
    """Set the endpoint based on site if not provided.

    Args:
        data: The input data dictionary.

    Returns:
        The data dictionary with the endpoint set if not provided.
    """
    if isinstance(data, dict) and not data.get("endpoint"):
        site = data.get("site", "datadoghq.com")
        data["endpoint"] = f"https://http-intake.logs.{site}/api/v2/logs"
    return data
validate_max_export_batch_size(v: int) -> int classmethod

Validate that max_export_batch_size doesn't exceed Datadog's limit.

Parameters:

Name Type Description Default
v int

The value to validate.

required

Returns:

Type Description
int

The validated value.

Raises:

Type Description
ValueError

If the value exceeds Datadog's limit.

Source code in src/zenml/log_stores/datadog/datadog_flavor.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@field_validator("max_export_batch_size")
@classmethod
def validate_max_export_batch_size(cls, v: int) -> int:
    """Validate that max_export_batch_size doesn't exceed Datadog's limit.

    Args:
        v: The value to validate.

    Returns:
        The validated value.

    Raises:
        ValueError: If the value exceeds Datadog's limit.
    """
    if v > DATADOG_MAX_BATCH_SIZE:
        raise ValueError(
            f"max_export_batch_size cannot exceed {DATADOG_MAX_BATCH_SIZE} "
            f"(Datadog API limit). Got: {v}"
        )
    return v

DatadogLogStoreFlavor

Bases: Flavor

Datadog log store flavor.

Attributes
config_class: Type[BaseLogStoreConfig] property

Returns DatadogLogStoreConfig config class.

Returns:

Type Description
Type[BaseLogStoreConfig]

The config class.

docs_url: str property

URL to the flavor documentation.

Returns:

Type Description
str

The URL to the flavor documentation.

implementation_class: Type[BaseLogStore] property

Implementation class for this flavor.

Returns:

Type Description
Type[BaseLogStore]

The implementation class.

logo_url: str property

URL to the flavor logo.

Returns:

Type Description
str

The URL to the flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: str property

URL to the SDK docs for this flavor.

Returns:

Type Description
str

The URL to the SDK docs for this flavor.

type: StackComponentType property

Stack component type.

Returns:

Type Description
StackComponentType

The stack component type.

OtelLogStore(*args: Any, **kwargs: Any)

Bases: BaseLogStore

Log store that exports logs using OpenTelemetry.

Subclasses should implement get_exporter() to provide the specific log exporter for their backend.

Initialize the OpenTelemetry log store.

Parameters:

Name Type Description Default
*args Any

Positional arguments for the base class.

()
**kwargs Any

Keyword arguments for the base class.

{}
Source code in src/zenml/log_stores/otel/otel_log_store.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initialize the OpenTelemetry log store.

    Args:
        *args: Positional arguments for the base class.
        **kwargs: Keyword arguments for the base class.
    """
    super().__init__(*args, **kwargs)

    self._resource: Optional["Resource"] = None
    self._exporter: Optional["LogExporter"] = None
    self._provider: Optional["LoggerProvider"] = None
    self._processor: Optional["OtelBatchLogRecordProcessor"] = None
    self._handler: Optional["LoggingHandler"] = None
Attributes
config: OtelLogStoreConfig property

Returns the configuration of the OTel log store.

Returns:

Type Description
OtelLogStoreConfig

The configuration.

origin_class: Type[OtelLogStoreOrigin] property

Class of the origin.

Returns:

Type Description
Type[OtelLogStoreOrigin]

The class of the origin.

provider: LoggerProvider property

Returns the OpenTelemetry logger provider.

Returns:

Type Description
LoggerProvider

The logger provider.

Raises:

Type Description
RuntimeError

If the OpenTelemetry log store is not initialized.

Functions
deactivate() -> None

Deactivate log collection and shut down the processor.

Flushes any pending logs and shuts down the processor's background thread.

Source code in src/zenml/log_stores/otel/otel_log_store.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
def deactivate(self) -> None:
    """Deactivate log collection and shut down the processor.

    Flushes any pending logs and shuts down the processor's background thread.
    """
    with self._lock:
        if self._processor:
            try:
                # Force flush any pending logs
                self._processor.flush(blocking=True)
                logger.debug("Flushed pending logs")
            except Exception as e:
                logger.warning(f"Error flushing logs: {e}")

            try:
                self._processor.shutdown()  # type: ignore[no-untyped-call]
                logger.debug(
                    "Shut down log processor and background thread"
                )
            except Exception as e:
                logger.warning(f"Error shutting down processor: {e}")
            else:
                self._processor = None
                self._handler = None
                self._provider = None
                self._resource = None
                self._exporter = None

    logger.debug("OtelLogStore deactivated")
emit(origin: BaseLogStoreOrigin, record: logging.LogRecord, metadata: Optional[Dict[str, Any]] = None) -> None

Process a log record by sending to OpenTelemetry.

Parameters:

Name Type Description Default
origin BaseLogStoreOrigin

The origin used to send the log record.

required
record LogRecord

The log record to process.

required
metadata Optional[Dict[str, Any]]

Additional metadata to attach to the log entry.

None

Raises:

Type Description
RuntimeError

If the OpenTelemetry provider is not initialized.

Source code in src/zenml/log_stores/otel/otel_log_store.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def emit(
    self,
    origin: BaseLogStoreOrigin,
    record: logging.LogRecord,
    metadata: Optional[Dict[str, Any]] = None,
) -> None:
    """Process a log record by sending to OpenTelemetry.

    Args:
        origin: The origin used to send the log record.
        record: The log record to process.
        metadata: Additional metadata to attach to the log entry.

    Raises:
        RuntimeError: If the OpenTelemetry provider is not initialized.
    """
    assert isinstance(origin, OtelLogStoreOrigin)
    with self._lock:
        if not self._provider:
            self._activate()

        if self._handler is None:
            raise RuntimeError("OpenTelemetry provider is not initialized")

        emit_kwargs = self._handler._translate(record)
        emit_kwargs["attributes"].update(origin.metadata)
        if metadata:
            emit_kwargs["attributes"].update(metadata)

        origin.logger.emit(**emit_kwargs)
fetch(logs_model: LogsResponse, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, limit: int = MAX_ENTRIES_PER_REQUEST) -> List[LogEntry]

Fetch logs from the OpenTelemetry backend.

This method should be overridden by subclasses to implement backend-specific log retrieval. The base implementation returns an empty list.

Parameters:

Name Type Description Default
logs_model LogsResponse

The logs model containing run and step metadata.

required
start_time Optional[datetime]

Filter logs after this time.

None
end_time Optional[datetime]

Filter logs before this time.

None
limit int

Maximum number of log entries to return.

MAX_ENTRIES_PER_REQUEST

Raises:

Type Description
NotImplementedError

Log fetching is not supported by the OTEL log store.

Source code in src/zenml/log_stores/otel/otel_log_store.py
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
def fetch(
    self,
    logs_model: "LogsResponse",
    start_time: Optional[datetime] = None,
    end_time: Optional[datetime] = None,
    limit: int = MAX_ENTRIES_PER_REQUEST,
) -> List["LogEntry"]:
    """Fetch logs from the OpenTelemetry backend.

    This method should be overridden by subclasses to implement
    backend-specific log retrieval. The base implementation returns
    an empty list.

    Args:
        logs_model: The logs model containing run and step metadata.
        start_time: Filter logs after this time.
        end_time: Filter logs before this time.
        limit: Maximum number of log entries to return.

    Raises:
        NotImplementedError: Log fetching is not supported by the OTEL log
            store.
    """
    raise NotImplementedError(
        "Log fetching is not supported by the OTEL log store."
    )
flush(blocking: bool = True) -> None

Flush the log store.

Parameters:

Name Type Description Default
blocking bool

Whether to block until the flush is complete.

True

This method is called to ensure that all logs are flushed to the backend.

Source code in src/zenml/log_stores/otel/otel_log_store.py
271
272
273
274
275
276
277
278
279
280
281
def flush(self, blocking: bool = True) -> None:
    """Flush the log store.

    Args:
        blocking: Whether to block until the flush is complete.

    This method is called to ensure that all logs are flushed to the backend.
    """
    with self._lock:
        if self._processor:
            self._processor.flush(blocking=blocking)
get_exporter() -> LogExporter

Get the Datadog log exporter.

Returns:

Type Description
LogExporter

OTLPLogExporter configured with API key and site.

Source code in src/zenml/log_stores/otel/otel_log_store.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def get_exporter(self) -> "LogExporter":
    """Get the Datadog log exporter.

    Returns:
        OTLPLogExporter configured with API key and site.
    """
    if not self._exporter:
        self._exporter = OTLPLogExporter(
            endpoint=self.config.endpoint,
            headers=self.config.headers,
            certificate_file=self.config.certificate_file,
            client_key_file=self.config.client_key_file,
            client_certificate_file=self.config.client_certificate_file,
            compression=self.config.compression,
        )

    return self._exporter
register_origin(name: str, log_model: LogsResponse, metadata: Dict[str, Any]) -> BaseLogStoreOrigin

Register an origin for the log store.

Parameters:

Name Type Description Default
name str

The name of the origin.

required
log_model LogsResponse

The log model associated with the origin.

required
metadata Dict[str, Any]

Additional metadata to attach to the log entry.

required

Returns:

Type Description
BaseLogStoreOrigin

The origin.

Source code in src/zenml/log_stores/otel/otel_log_store.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def register_origin(
    self, name: str, log_model: LogsResponse, metadata: Dict[str, Any]
) -> BaseLogStoreOrigin:
    """Register an origin for the log store.

    Args:
        name: The name of the origin.
        log_model: The log model associated with the origin.
        metadata: Additional metadata to attach to the log entry.

    Returns:
        The origin.
    """
    with self._lock:
        if not self._provider:
            self._activate()

    return super().register_origin(name, log_model, metadata)

OtelLogStoreConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseLogStoreConfig

Configuration for OpenTelemetry log store.

Attributes:

Name Type Description
service_name str

Name of the service (defaults to "zenml").

service_version str

Version of the service (defaults to the ZenML version).

max_queue_size int

Maximum queue size for batch processor.

schedule_delay_millis int

Delay between batch exports in milliseconds.

max_export_batch_size int

Maximum batch size for exports.

endpoint str

The endpoint to export logs to.

headers Optional[Dict[str, str]]

The headers to use for the export.

certificate_file Optional[str]

The certificate file to use for the export.

client_key_file Optional[str]

The client key file to use for the export.

client_certificate_file Optional[str]

The client certificate file to use for the export.

compression Compression

The compression to use for the export.

Source code in src/zenml/stack/stack_component.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)

OtelLogStoreFlavor

Bases: Flavor

OpenTelemetry log store flavor.

Attributes
config_class: Type[BaseLogStoreConfig] property

Returns DatadogLogStoreConfig config class.

Returns:

Type Description
Type[BaseLogStoreConfig]

The config class.

docs_url: str property

URL to the flavor documentation.

Returns:

Type Description
str

The URL to the flavor documentation.

implementation_class: Type[BaseLogStore] property

Implementation class for this flavor.

Returns:

Type Description
Type[BaseLogStore]

The implementation class.

logo_url: str property

URL to the flavor logo.

Returns:

Type Description
str

The URL to the flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: str property

URL to the SDK docs for this flavor.

Returns:

Type Description
str

The URL to the SDK docs for this flavor.

type: StackComponentType property

Stack component type.

Returns:

Type Description
StackComponentType

The stack component type.

Modules

artifact

Artifact log store implementation.

Modules
artifact_log_exporter

OpenTelemetry exporter that writes logs to ZenML artifact store.

Classes
ArtifactLogExporter(artifact_store: BaseArtifactStore)

Bases: LogExporter

OpenTelemetry exporter that writes logs to ZenML artifact store.

Initialize the exporter.

Parameters:

Name Type Description Default
artifact_store BaseArtifactStore

The artifact store to write logs to.

required
Source code in src/zenml/log_stores/artifact/artifact_log_exporter.py
47
48
49
50
51
52
53
def __init__(self, artifact_store: "BaseArtifactStore") -> None:
    """Initialize the exporter.

    Args:
        artifact_store: The artifact store to write logs to.
    """
    self.artifact_store = artifact_store
Functions
export(batch: Sequence[LogData]) -> LogExportResult

Export a batch of logs to the artifact store.

Parameters:

Name Type Description Default
batch Sequence[LogData]

Sequence of LogData to export (can be from multiple contexts).

required

Returns:

Type Description
LogExportResult

LogExportResult indicating success or failure.

Source code in src/zenml/log_stores/artifact/artifact_log_exporter.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def export(self, batch: Sequence["LogData"]) -> LogExportResult:
    """Export a batch of logs to the artifact store.

    Args:
        batch: Sequence of LogData to export (can be from multiple contexts).

    Returns:
        LogExportResult indicating success or failure.
    """
    if not batch:
        return LogExportResult.SUCCESS

    try:
        logs_by_uri: Dict[str, List[str]] = defaultdict(list)
        finalized_log_streams: List[str] = []

        for log_data in batch:
            attrs = log_data.log_record.attributes
            if not attrs:
                continue

            log_uri = attrs.get("zenml.log.uri")
            if not log_uri or not isinstance(log_uri, str):
                continue

            if log_data.log_record.body is END_OF_STREAM_MESSAGE:
                finalized_log_streams.append(log_uri)
                continue

            entries = self._otel_record_to_log_entries(log_data)
            for entry in entries:
                json_line = entry.model_dump_json(exclude_none=True)
                logs_by_uri[log_uri].append(json_line)

        for log_uri, log_lines in logs_by_uri.items():
            if log_lines:
                self._write(log_lines, log_uri)

        for log_uri in finalized_log_streams:
            self._finalize(log_uri)

        return LogExportResult.SUCCESS

    except Exception:
        logger.exception("Failed to export logs to artifact store")
        return LogExportResult.FAILURE
shutdown() -> None

Shutdown the exporter.

Source code in src/zenml/log_stores/artifact/artifact_log_exporter.py
350
351
352
def shutdown(self) -> None:
    """Shutdown the exporter."""
    self.artifact_store.cleanup()
Functions
artifact_log_store

Artifact log store implementation.

Classes
ArtifactLogStore(artifact_store: BaseArtifactStore, *args: Any, **kwargs: Any)

Bases: OtelLogStore

Log store that saves logs to the artifact store.

This implementation extends OtelLogStore and uses the ArtifactLogExporter to write logs to the artifact store. Inherits all OTEL infrastructure including shared BatchLogRecordProcessor and routing.

Initialize the artifact log store.

Parameters:

Name Type Description Default
artifact_store BaseArtifactStore

The artifact store to use for logging.

required
*args Any

Positional arguments for the base class.

()
**kwargs Any

Keyword arguments for the base class.

{}
Source code in src/zenml/log_stores/artifact/artifact_log_store.py
247
248
249
250
251
252
253
254
255
256
257
258
def __init__(
    self, artifact_store: "BaseArtifactStore", *args: Any, **kwargs: Any
) -> None:
    """Initialize the artifact log store.

    Args:
        artifact_store: The artifact store to use for logging.
        *args: Positional arguments for the base class.
        **kwargs: Keyword arguments for the base class.
    """
    super().__init__(*args, **kwargs)
    self._artifact_store = artifact_store
Attributes
config: ArtifactLogStoreConfig property

Returns the configuration of the artifact log store.

Returns:

Type Description
ArtifactLogStoreConfig

The configuration.

origin_class: Type[ArtifactLogStoreOrigin] property

Class of the origin.

Returns:

Type Description
Type[ArtifactLogStoreOrigin]

The class of the origin.

Functions
cleanup() -> None

Cleanup the artifact log store.

This method is called to ensure that the artifact log store is cleaned up.

Source code in src/zenml/log_stores/artifact/artifact_log_store.py
381
382
383
384
385
386
def cleanup(self) -> None:
    """Cleanup the artifact log store.

    This method is called to ensure that the artifact log store is cleaned up.
    """
    self._artifact_store.cleanup()
fetch(logs_model: LogsResponse, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, limit: int = MAX_ENTRIES_PER_REQUEST) -> List[LogEntry]

Fetch logs from the artifact store.

Parameters:

Name Type Description Default
logs_model LogsResponse

The logs model containing uri and artifact_store_id.

required
start_time Optional[datetime]

Filter logs after this time.

None
end_time Optional[datetime]

Filter logs before this time.

None
limit int

Maximum number of log entries to return.

MAX_ENTRIES_PER_REQUEST

Returns:

Type Description
List[LogEntry]

List of log entries from the artifact store.

Raises:

Type Description
ValueError

If logs_model.uri is not provided.

Source code in src/zenml/log_stores/artifact/artifact_log_store.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
def fetch(
    self,
    logs_model: "LogsResponse",
    start_time: Optional[datetime] = None,
    end_time: Optional[datetime] = None,
    limit: int = MAX_ENTRIES_PER_REQUEST,
) -> List["LogEntry"]:
    """Fetch logs from the artifact store.

    Args:
        logs_model: The logs model containing uri and artifact_store_id.
        start_time: Filter logs after this time.
        end_time: Filter logs before this time.
        limit: Maximum number of log entries to return.

    Returns:
        List of log entries from the artifact store.

    Raises:
        ValueError: If logs_model.uri is not provided.
    """
    if not logs_model.uri:
        raise ValueError(
            "logs_model.uri is required for ArtifactLogStore.fetch()"
        )

    if not logs_model.artifact_store_id:
        raise ValueError(
            "logs_model.artifact_store_id is required "
            "for ArtifactLogStore.fetch()"
        )

    if logs_model.artifact_store_id != self._artifact_store.id:
        raise ValueError(
            "logs_model.artifact_store_id does not match the artifact store "
            "id of the log store."
        )

    if start_time or end_time:
        logger.warning(
            "start_time and end_time are not supported for "
            "ArtifactLogStore.fetch(). Both parameters will be ignored."
        )

    log_entries = fetch_log_records(
        artifact_store=self._artifact_store,
        logs_uri=logs_model.uri,
        limit=limit,
    )

    return log_entries
from_artifact_store(artifact_store: BaseArtifactStore) -> ArtifactLogStore classmethod

Creates an artifact log store from an artifact store.

Parameters:

Name Type Description Default
artifact_store BaseArtifactStore

The artifact store to create the log store from.

required

Returns:

Type Description
ArtifactLogStore

The created artifact log store.

Source code in src/zenml/log_stores/artifact/artifact_log_store.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
@classmethod
def from_artifact_store(
    cls, artifact_store: "BaseArtifactStore"
) -> "ArtifactLogStore":
    """Creates an artifact log store from an artifact store.

    Args:
        artifact_store: The artifact store to create the log store from.

    Returns:
        The created artifact log store.
    """
    return cls(
        artifact_store=artifact_store,
        id=artifact_store.id,
        name="default",
        config=ArtifactLogStoreConfig(endpoint=artifact_store.path),
        flavor="artifact",
        type=StackComponentType.LOG_STORE,
        user=artifact_store.user,
        created=artifact_store.created,
        updated=artifact_store.updated,
    )
get_exporter() -> LogExporter

Get the artifact log exporter for this log store.

Returns:

Type Description
LogExporter

The ArtifactLogExporter instance.

Source code in src/zenml/log_stores/artifact/artifact_log_store.py
302
303
304
305
306
307
308
309
310
311
312
def get_exporter(self) -> "LogExporter":
    """Get the artifact log exporter for this log store.

    Returns:
        The ArtifactLogExporter instance.
    """
    from zenml.log_stores.artifact.artifact_log_exporter import (
        ArtifactLogExporter,
    )

    return ArtifactLogExporter(artifact_store=self._artifact_store)
ArtifactLogStoreConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: OtelLogStoreConfig

Configuration for the artifact log store.

Source code in src/zenml/stack/stack_component.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
ArtifactLogStoreOrigin(name: str, log_store: BaseLogStore, log_model: LogsResponse, metadata: Dict[str, Any])

Bases: OtelLogStoreOrigin

Artifact log store origin.

Initialize a log store origin.

Parameters:

Name Type Description Default
name str

The name of the origin.

required
log_store BaseLogStore

The log store to emit logs to.

required
log_model LogsResponse

The log model associated with the origin.

required
metadata Dict[str, Any]

Additional metadata to attach to all log entries that will be emitted by this origin.

required
Source code in src/zenml/log_stores/artifact/artifact_log_store.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def __init__(
    self,
    name: str,
    log_store: "BaseLogStore",
    log_model: LogsResponse,
    metadata: Dict[str, Any],
) -> None:
    """Initialize a log store origin.

    Args:
        name: The name of the origin.
        log_store: The log store to emit logs to.
        log_model: The log model associated with the origin.
        metadata: Additional metadata to attach to all log entries that will
            be emitted by this origin.
    """
    super().__init__(name, log_store, log_model, metadata)

    if log_model.uri:
        self.metadata["zenml.log.uri"] = log_model.uri
Functions
Functions
fetch_log_records(artifact_store: BaseArtifactStore, logs_uri: str, limit: int = MAX_ENTRIES_PER_REQUEST) -> List[LogEntry]

Fetches log entries.

Parameters:

Name Type Description Default
artifact_store BaseArtifactStore

The artifact store.

required
logs_uri str

The URI of the artifact (file or directory).

required
limit int

Maximum number of log entries to return.

MAX_ENTRIES_PER_REQUEST

Returns:

Type Description
List[LogEntry]

List of log entries.

Source code in src/zenml/log_stores/artifact/artifact_log_store.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def fetch_log_records(
    artifact_store: "BaseArtifactStore",
    logs_uri: str,
    limit: int = MAX_ENTRIES_PER_REQUEST,
) -> List[LogEntry]:
    """Fetches log entries.

    Args:
        artifact_store: The artifact store.
        logs_uri: The URI of the artifact (file or directory).
        limit: Maximum number of log entries to return.

    Returns:
        List of log entries.
    """
    log_entries = []

    for line in _stream_logs_line_by_line(artifact_store, logs_uri):
        if log_entry := parse_log_entry(line):
            log_entries.append(log_entry)

        if len(log_entries) >= limit:
            break

    return log_entries
parse_log_entry(log_line: str) -> Optional[LogEntry]

Parse a single log entry into a LogEntry object.

Handles two formats: 1. JSON format: {"timestamp": "...", "level": "...", "message": "...", "location": "..."} Uses Pydantic's model_validate_json for automatic parsing and validation. 2. Plain text: Any other text (defaults to INFO level)

Parameters:

Name Type Description Default
log_line str

A single log line to parse

required

Returns:

Type Description
Optional[LogEntry]

LogEntry object. For JSON logs, all fields are validated and parsed automatically.

Optional[LogEntry]

For plain text logs, only message is populated with INFO level default.

Optional[LogEntry]

Returns None only for empty lines.

Source code in src/zenml/log_stores/artifact/artifact_log_store.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def parse_log_entry(log_line: str) -> Optional[LogEntry]:
    """Parse a single log entry into a LogEntry object.

    Handles two formats:
    1. JSON format: {"timestamp": "...", "level": "...", "message": "...", "location": "..."}
       Uses Pydantic's model_validate_json for automatic parsing and validation.
    2. Plain text: Any other text (defaults to INFO level)

    Args:
        log_line: A single log line to parse

    Returns:
        LogEntry object. For JSON logs, all fields are validated and parsed automatically.
        For plain text logs, only message is populated with INFO level default.
        Returns None only for empty lines.
    """
    line = log_line.strip()
    if not line:
        return None

    if line.startswith("{") and line.endswith("}"):
        try:
            return LogEntry.model_validate_json(line)
        except Exception:
            pass

    old_format = re.search(
        r"^\[(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+UTC\]", line
    )

    timestamp = None
    if old_format:
        timestamp = old_format.group(1) + "Z"
        line = line.replace(old_format.group(0), "").strip()

    return LogEntry(
        message=line,
        name=None,
        level=LoggingLevels.INFO,
        timestamp=timestamp,
    )
prepare_logs_uri(artifact_store: BaseArtifactStore, log_id: UUID) -> str

Generates and prepares a URI for the log file or folder for a step.

Parameters:

Name Type Description Default
artifact_store BaseArtifactStore

The artifact store on which the artifact will be stored.

required
log_id UUID

The ID of the logs entity

required

Returns:

Type Description
str

The URI of the log storage (file or folder).

Source code in src/zenml/log_stores/artifact/artifact_log_store.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def prepare_logs_uri(
    artifact_store: "BaseArtifactStore",
    log_id: UUID,
) -> str:
    """Generates and prepares a URI for the log file or folder for a step.

    Args:
        artifact_store: The artifact store on which the artifact will be stored.
        log_id: The ID of the logs entity

    Returns:
        The URI of the log storage (file or folder).
    """
    logs_base_uri = os.path.join(artifact_store.path, "logs")

    if artifact_store.config.IS_IMMUTABLE_FILESYSTEM:
        logs_uri = os.path.join(logs_base_uri, str(log_id))
    else:
        logs_uri = os.path.join(logs_base_uri, f"{log_id}{LOGS_EXTENSION}")

    return sanitize_remote_path(logs_uri)
remove_ansi_escape_codes(text: str) -> str

Auxiliary function to remove ANSI escape codes from a given string.

Parameters:

Name Type Description Default
text str

the input string

required

Returns:

Type Description
str

the version of the input string where the escape codes are removed.

Source code in src/zenml/log_stores/artifact/artifact_log_store.py
82
83
84
85
86
87
88
89
90
91
def remove_ansi_escape_codes(text: str) -> str:
    """Auxiliary function to remove ANSI escape codes from a given string.

    Args:
        text: the input string

    Returns:
        the version of the input string where the escape codes are removed.
    """
    return ansi_escape.sub("", text)

base_log_store

Base class for log stores.

Classes
BaseLogStore(*args: Any, **kwargs: Any)

Bases: StackComponent, ABC

Base class for all ZenML log stores.

A log store is responsible for collecting, storing, and retrieving logs during pipeline and step execution. Different implementations may store logs in different backends (artifact store, OpenTelemetry, Datadog, etc.).

Initialize the log store.

Parameters:

Name Type Description Default
*args Any

Positional arguments for the base class.

()
**kwargs Any

Keyword arguments for the base class.

{}
Source code in src/zenml/log_stores/base_log_store.py
79
80
81
82
83
84
85
86
87
88
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initialize the log store.

    Args:
        *args: Positional arguments for the base class.
        **kwargs: Keyword arguments for the base class.
    """
    super().__init__(*args, **kwargs)
    self._origins: Dict[str, BaseLogStoreOrigin] = {}
    self._lock = threading.RLock()
Attributes
config: BaseLogStoreConfig property

Returns the configuration of the log store.

Returns:

Type Description
BaseLogStoreConfig

The configuration.

origin_class: Type[BaseLogStoreOrigin] property

Class of the origin.

Returns:

Type Description
Type[BaseLogStoreOrigin]

The class of the origin used with this log store.

Functions
deregister_origin(origin: BaseLogStoreOrigin, blocking: bool = True) -> None

Deregister an origin previously registered with the log store.

If no other origins are left, the log store will be flushed. The blocking parameter determines whether to block until the flush is complete.

Parameters:

Name Type Description Default
origin BaseLogStoreOrigin

The origin to deregister.

required
blocking bool

Whether to block until the deregistration is complete and all logs are flushed if this is the last origin registered.

True
Source code in src/zenml/log_stores/base_log_store.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def deregister_origin(
    self,
    origin: BaseLogStoreOrigin,
    blocking: bool = True,
) -> None:
    """Deregister an origin previously registered with the log store.

    If no other origins are left, the log store will be flushed. The
    `blocking` parameter determines whether to block until the flush is
    complete.

    Args:
        origin: The origin to deregister.
        blocking: Whether to block until the deregistration is complete
            and all logs are flushed if this is the last origin registered.
    """
    with self._lock:
        if origin.name not in self._origins:
            return
        self._release_origin(origin)
        del self._origins[origin.name]
        if len(self._origins) == 0:
            self.flush(blocking=blocking)
emit(origin: BaseLogStoreOrigin, record: logging.LogRecord, metadata: Optional[Dict[str, Any]] = None) -> None abstractmethod

Process a log record from the logging system.

Parameters:

Name Type Description Default
origin BaseLogStoreOrigin

The origin used to send the log record.

required
record LogRecord

The Python logging.LogRecord to process.

required
metadata Optional[Dict[str, Any]]

Additional metadata to attach to the log entry.

None
Source code in src/zenml/log_stores/base_log_store.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
@abstractmethod
def emit(
    self,
    origin: BaseLogStoreOrigin,
    record: logging.LogRecord,
    metadata: Optional[Dict[str, Any]] = None,
) -> None:
    """Process a log record from the logging system.

    Args:
        origin: The origin used to send the log record.
        record: The Python logging.LogRecord to process.
        metadata: Additional metadata to attach to the log entry.
    """
fetch(logs_model: LogsResponse, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, limit: int = MAX_ENTRIES_PER_REQUEST) -> List[LogEntry] abstractmethod

Fetch logs from the log store.

This method is called from the server to retrieve logs for display on the dashboard or via API. The implementation should not require any integration-specific SDKs that aren't available on the server.

Parameters:

Name Type Description Default
logs_model LogsResponse

The logs model containing metadata about the logs.

required
start_time Optional[datetime]

Filter logs after this time.

None
end_time Optional[datetime]

Filter logs before this time.

None
limit int

Maximum number of log entries to return.

MAX_ENTRIES_PER_REQUEST

Returns:

Type Description
List[LogEntry]

List of log entries matching the query.

Source code in src/zenml/log_stores/base_log_store.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
@abstractmethod
def fetch(
    self,
    logs_model: LogsResponse,
    start_time: Optional[datetime] = None,
    end_time: Optional[datetime] = None,
    limit: int = MAX_ENTRIES_PER_REQUEST,
) -> List[LogEntry]:
    """Fetch logs from the log store.

    This method is called from the server to retrieve logs for display
    on the dashboard or via API. The implementation should not require
    any integration-specific SDKs that aren't available on the server.

    Args:
        logs_model: The logs model containing metadata about the logs.
        start_time: Filter logs after this time.
        end_time: Filter logs before this time.
        limit: Maximum number of log entries to return.

    Returns:
        List of log entries matching the query.
    """
flush(blocking: bool = True) -> None abstractmethod

Flush the log store.

This method is called to ensure that all logs are flushed to the backend.

Parameters:

Name Type Description Default
blocking bool

Whether to block until the flush is complete.

True
Source code in src/zenml/log_stores/base_log_store.py
183
184
185
186
187
188
189
190
191
@abstractmethod
def flush(self, blocking: bool = True) -> None:
    """Flush the log store.

    This method is called to ensure that all logs are flushed to the backend.

    Args:
        blocking: Whether to block until the flush is complete.
    """
register_origin(name: str, log_model: LogsResponse, metadata: Dict[str, Any]) -> BaseLogStoreOrigin

Register an origin for the log store.

Parameters:

Name Type Description Default
name str

The name of the origin.

required
log_model LogsResponse

The log model associated with the origin.

required
metadata Dict[str, Any]

Additional metadata to attach to the log entry.

required

Returns:

Type Description
BaseLogStoreOrigin

The origin.

Source code in src/zenml/log_stores/base_log_store.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def register_origin(
    self, name: str, log_model: LogsResponse, metadata: Dict[str, Any]
) -> BaseLogStoreOrigin:
    """Register an origin for the log store.

    Args:
        name: The name of the origin.
        log_model: The log model associated with the origin.
        metadata: Additional metadata to attach to the log entry.

    Returns:
        The origin.
    """
    with self._lock:
        origin = self.origin_class(name, self, log_model, metadata)
        self._origins[name] = origin
        return origin
BaseLogStoreConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: StackComponentConfig

Base configuration for all log stores.

Source code in src/zenml/stack/stack_component.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
BaseLogStoreFlavor

Bases: Flavor

Base class for all ZenML log store flavors.

Attributes
config_class: Type[BaseLogStoreConfig] property

Config class for the base log store flavor.

Returns:

Type Description
Type[BaseLogStoreConfig]

The config class.

implementation_class: Type[BaseLogStore] abstractmethod property

Implementation class for the base log store flavor.

Returns:

Type Description
Type[BaseLogStore]

The implementation class.

type: StackComponentType property

Type of the flavor.

Returns:

Type Description
StackComponentType

The type of the flavor.

BaseLogStoreOrigin(name: str, log_store: BaseLogStore, log_model: LogsResponse, metadata: Dict[str, Any])

Base class for all ZenML log store origins.

The origin is the entry point for all log records to be sent to the log store for processing. The process of sending a log record is as follows:

  1. instantiate the log store or use the active log store
  2. register an origin by calling log_store.register_origin() and passing the log model and optional metadata to be attached to each log record
  3. emit the log record by calling log_store.emit() and passing the origin and log record
  4. deregister the origin when all logs have been emitted by calling log_store.deregister(origin)

Initialize a log store origin.

Parameters:

Name Type Description Default
name str

The name of the origin.

required
log_store BaseLogStore

The log store to emit logs to.

required
log_model LogsResponse

The log model associated with the origin.

required
metadata Dict[str, Any]

Additional metadata to attach to all log entries that will be emitted by this origin.

required
Source code in src/zenml/log_stores/base_log_store.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def __init__(
    self,
    name: str,
    log_store: "BaseLogStore",
    log_model: LogsResponse,
    metadata: Dict[str, Any],
) -> None:
    """Initialize a log store origin.

    Args:
        name: The name of the origin.
        log_store: The log store to emit logs to.
        log_model: The log model associated with the origin.
        metadata: Additional metadata to attach to all log entries that will
            be emitted by this origin.
    """
    self.name = name
    self.log_store = log_store
    self.log_model = log_model
    self.metadata = metadata
Functions

datadog

Datadog log store implementation.

Modules
datadog_flavor

Datadog log store flavor.

Classes
DatadogLogStoreConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: OtelLogStoreConfig

Configuration for Datadog log store.

Attributes:

Name Type Description
api_key PlainSerializedSecretStr

Datadog API key for log ingestion.

application_key PlainSerializedSecretStr

Datadog application key for log extraction.

site str

Datadog site (e.g., "datadoghq.com", "datadoghq.eu").

max_export_batch_size int

Maximum batch size for exports (Datadog limit: 1000).

Source code in src/zenml/stack/stack_component.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Functions
set_default_endpoint(data: Dict[str, Any]) -> Dict[str, Any] classmethod

Set the endpoint based on site if not provided.

Parameters:

Name Type Description Default
data Dict[str, Any]

The input data dictionary.

required

Returns:

Type Description
Dict[str, Any]

The data dictionary with the endpoint set if not provided.

Source code in src/zenml/log_stores/datadog/datadog_flavor.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
@model_validator(mode="before")
@classmethod
def set_default_endpoint(cls, data: Dict[str, Any]) -> Dict[str, Any]:
    """Set the endpoint based on site if not provided.

    Args:
        data: The input data dictionary.

    Returns:
        The data dictionary with the endpoint set if not provided.
    """
    if isinstance(data, dict) and not data.get("endpoint"):
        site = data.get("site", "datadoghq.com")
        data["endpoint"] = f"https://http-intake.logs.{site}/api/v2/logs"
    return data
validate_max_export_batch_size(v: int) -> int classmethod

Validate that max_export_batch_size doesn't exceed Datadog's limit.

Parameters:

Name Type Description Default
v int

The value to validate.

required

Returns:

Type Description
int

The validated value.

Raises:

Type Description
ValueError

If the value exceeds Datadog's limit.

Source code in src/zenml/log_stores/datadog/datadog_flavor.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@field_validator("max_export_batch_size")
@classmethod
def validate_max_export_batch_size(cls, v: int) -> int:
    """Validate that max_export_batch_size doesn't exceed Datadog's limit.

    Args:
        v: The value to validate.

    Returns:
        The validated value.

    Raises:
        ValueError: If the value exceeds Datadog's limit.
    """
    if v > DATADOG_MAX_BATCH_SIZE:
        raise ValueError(
            f"max_export_batch_size cannot exceed {DATADOG_MAX_BATCH_SIZE} "
            f"(Datadog API limit). Got: {v}"
        )
    return v
DatadogLogStoreFlavor

Bases: Flavor

Datadog log store flavor.

Attributes
config_class: Type[BaseLogStoreConfig] property

Returns DatadogLogStoreConfig config class.

Returns:

Type Description
Type[BaseLogStoreConfig]

The config class.

docs_url: str property

URL to the flavor documentation.

Returns:

Type Description
str

The URL to the flavor documentation.

implementation_class: Type[BaseLogStore] property

Implementation class for this flavor.

Returns:

Type Description
Type[BaseLogStore]

The implementation class.

logo_url: str property

URL to the flavor logo.

Returns:

Type Description
str

The URL to the flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: str property

URL to the SDK docs for this flavor.

Returns:

Type Description
str

The URL to the SDK docs for this flavor.

type: StackComponentType property

Stack component type.

Returns:

Type Description
StackComponentType

The stack component type.

datadog_log_exporter

Log exporter that writes logs to Datadog.

Classes
DatadogLogExporter(endpoint: str, certificate_file: Optional[str] = None, client_key_file: Optional[str] = None, client_certificate_file: Optional[str] = None, headers: Optional[Dict[str, str]] = None, timeout: float = DEFAULT_TIMEOUT, compression: Compression = Compression.NoCompression)

Bases: OTLPLogExporter

Datadog log exporter.

This exporter writes OpenTelemetry logs to Datadog's HTTP intake API with slightly modified log records to adapt to Datadog's format.

Source code in src/zenml/log_stores/otel/otel_log_exporter.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def __init__(
    self,
    endpoint: str,
    certificate_file: Optional[str] = None,
    client_key_file: Optional[str] = None,
    client_certificate_file: Optional[str] = None,
    headers: Optional[Dict[str, str]] = None,
    timeout: float = DEFAULT_TIMEOUT,
    compression: Compression = Compression.NoCompression,
):
    """Initialize the exporter.

    Args:
        endpoint: The endpoint to export logs to.
        certificate_file: The certificate file to use for the export.
        client_key_file: The client key file to use for the export.
        client_certificate_file: The client certificate file to use for the export.
        headers: The headers to use for the export.
        timeout: The timeout to use for the export.
        compression: The compression to use for the export.
    """
    self._shutdown_is_occurring = threading.Event()
    self._endpoint = endpoint
    self._certificate_file = certificate_file
    self._client_key_file = client_key_file
    self._client_certificate_file = client_certificate_file
    self._client_cert = (
        (self._client_certificate_file, self._client_key_file)
        if self._client_certificate_file and self._client_key_file
        else self._client_certificate_file
    )
    self._timeout = timeout
    self._compression = compression
    self._session = requests.Session()
    self._session.headers.update(
        {
            "Content-Type": "application/json",
            "Accept": "application/json",
            "User-Agent": f"zenml/{zenml_version}",
        }
    )
    if headers:
        self._session.headers.update(headers)

    # Retries are triggered on specific HTTP status codes:
    #
    #     408: Request Timeout.
    #     429: Too Many Requests.
    #     502: Bad Gateway.
    #     503: Service Unavailable.
    #     504: Gateway Timeout
    #
    # This also handles connection level errors, if a connection attempt
    # fails due to transient issues like:
    #
    #     DNS resolution errors.
    #     Connection timeouts.
    #     Network disruptions.
    #
    # Additional errors retried:
    #
    #     Read Timeouts: If the server does not send a response within
    #     the timeout period.
    #     Connection Refused: If the server refuses the connection.
    #
    retries = Retry(
        connect=5,
        read=5,
        redirect=3,
        status=5,
        allowed_methods=[
            "POST",
        ],
        status_forcelist=[
            408,  # Request Timeout
            429,  # Too Many Requests
            500,  # Internal Server Error
            502,  # Bad Gateway
            503,  # Service Unavailable
            504,  # Gateway Timeout
        ],
        other=3,
        backoff_factor=0.5,
        respect_retry_after_header=True,
        raise_on_status=False,
    )
    http_adapter = HTTPAdapter(
        max_retries=retries,
        pool_maxsize=1,
    )
    self._session.mount("https://", http_adapter)
    self._session.mount("http://", http_adapter)

    self._shutdown = False
Functions
datadog_log_store

Datadog log store implementation.

Classes
DatadogLogStore(*args: Any, **kwargs: Any)

Bases: OtelLogStore

Log store that exports logs to Datadog.

This implementation extends OtelLogStore and configures it to send logs to Datadog's HTTP intake API.

Source code in src/zenml/log_stores/otel/otel_log_store.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initialize the OpenTelemetry log store.

    Args:
        *args: Positional arguments for the base class.
        **kwargs: Keyword arguments for the base class.
    """
    super().__init__(*args, **kwargs)

    self._resource: Optional["Resource"] = None
    self._exporter: Optional["LogExporter"] = None
    self._provider: Optional["LoggerProvider"] = None
    self._processor: Optional["OtelBatchLogRecordProcessor"] = None
    self._handler: Optional["LoggingHandler"] = None
Attributes
config: DatadogLogStoreConfig property

Returns the configuration of the Datadog log store.

Returns:

Type Description
DatadogLogStoreConfig

The configuration.

Functions
cleanup() -> None

Cleanup the Datadog log store.

This method is called when the log store is no longer needed.

Source code in src/zenml/log_stores/datadog/datadog_log_store.py
242
243
244
245
246
247
248
249
def cleanup(self) -> None:
    """Cleanup the Datadog log store.

    This method is called when the log store is no longer needed.
    """
    if self._datadog_exporter:
        self._datadog_exporter.shutdown()
        self._datadog_exporter = None
fetch(logs_model: LogsResponse, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, limit: int = MAX_ENTRIES_PER_REQUEST) -> List[LogEntry]

Fetch logs from Datadog's API.

This method queries Datadog's Logs API to retrieve logs for the specified pipeline run and step. It automatically paginates through results to fetch up to the requested limit.

Parameters:

Name Type Description Default
logs_model LogsResponse

The logs model containing run and step metadata.

required
start_time Optional[datetime]

Filter logs after this time.

None
end_time Optional[datetime]

Filter logs before this time.

None
limit int

Maximum number of log entries to return.

MAX_ENTRIES_PER_REQUEST

Returns:

Type Description
List[LogEntry]

List of log entries from Datadog.

Source code in src/zenml/log_stores/datadog/datadog_log_store.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def fetch(
    self,
    logs_model: "LogsResponse",
    start_time: Optional[datetime] = None,
    end_time: Optional[datetime] = None,
    limit: int = MAX_ENTRIES_PER_REQUEST,
) -> List["LogEntry"]:
    """Fetch logs from Datadog's API.

    This method queries Datadog's Logs API to retrieve logs for the
    specified pipeline run and step. It automatically paginates through
    results to fetch up to the requested limit.

    Args:
        logs_model: The logs model containing run and step metadata.
        start_time: Filter logs after this time.
        end_time: Filter logs before this time.
        limit: Maximum number of log entries to return.

    Returns:
        List of log entries from Datadog.
    """
    query_parts = [
        f"service:{self.config.service_name}",
        f"@zenml.log.id:{logs_model.id}",
    ]

    query = " ".join(query_parts)

    api_endpoint = (
        f"https://api.{self.config.site}/api/v2/logs/events/search"
    )
    headers = {
        "DD-API-KEY": self.config.api_key.get_secret_value(),
        "DD-APPLICATION-KEY": self.config.application_key.get_secret_value(),
        "Content-Type": "application/json",
    }

    log_entries: List[LogEntry] = []
    cursor: Optional[str] = None
    remaining = limit

    try:
        while remaining > 0:
            # Datadog API limit is 1000 per request
            page_limit = min(remaining, 1000)

            body: Dict[str, Any] = {
                "filter": {
                    "query": query,
                    "from": (
                        start_time.isoformat()
                        if start_time
                        else logs_model.created.isoformat()
                    ),
                    "to": (
                        end_time.isoformat()
                        if end_time
                        else datetime.now().astimezone().isoformat()
                    ),
                },
                "page": {
                    "limit": page_limit,
                },
                "sort": "timestamp",
            }

            if cursor:
                body["page"]["cursor"] = cursor

            response = requests.post(
                api_endpoint,
                headers=headers,
                json=body,
                timeout=30,
            )

            if response.status_code != 200:
                logger.error(
                    f"Failed to fetch logs from Datadog: "
                    f"{response.status_code} - {response.text[:200]}"
                )
                break

            data = response.json()
            logs = data.get("data", [])

            if not logs:
                break

            for log in logs:
                entry = self._parse_log_entry(log)
                if entry:
                    log_entries.append(entry)

            remaining -= len(logs)

            # Get cursor for next page
            cursor = data.get("meta", {}).get("page", {}).get("after")
            if not cursor:
                break

        logger.debug(f"Fetched {len(log_entries)} logs from Datadog")
        return log_entries

    except Exception as e:
        logger.exception(f"Error fetching logs from Datadog: {e}")
        return log_entries  # Return what we have so far
get_exporter() -> DatadogLogExporter

Get the Datadog log exporter.

Returns:

Type Description
DatadogLogExporter

DatadogExporter with the proper configuration.

Source code in src/zenml/log_stores/datadog/datadog_log_store.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def get_exporter(self) -> DatadogLogExporter:
    """Get the Datadog log exporter.

    Returns:
        DatadogExporter with the proper configuration.
    """
    if not self._datadog_exporter:
        headers = {
            "dd-api-key": self.config.api_key.get_secret_value(),
            "dd-application-key": self.config.application_key.get_secret_value(),
        }
        if self.config.headers:
            headers.update(self.config.headers)

        self._datadog_exporter = DatadogLogExporter(
            endpoint=self.config.endpoint,
            headers=headers,
            certificate_file=self.config.certificate_file,
            client_key_file=self.config.client_key_file,
            client_certificate_file=self.config.client_certificate_file,
            compression=self.config.compression,
        )
    return self._datadog_exporter
Functions

otel

OpenTelemetry log store implementation.

Modules
otel_flavor

OpenTelemetry log store flavor.

Classes
Compression

Bases: StrEnum

Compression types.

OtelLogStoreConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseLogStoreConfig

Configuration for OpenTelemetry log store.

Attributes:

Name Type Description
service_name str

Name of the service (defaults to "zenml").

service_version str

Version of the service (defaults to the ZenML version).

max_queue_size int

Maximum queue size for batch processor.

schedule_delay_millis int

Delay between batch exports in milliseconds.

max_export_batch_size int

Maximum batch size for exports.

endpoint str

The endpoint to export logs to.

headers Optional[Dict[str, str]]

The headers to use for the export.

certificate_file Optional[str]

The certificate file to use for the export.

client_key_file Optional[str]

The client key file to use for the export.

client_certificate_file Optional[str]

The client certificate file to use for the export.

compression Compression

The compression to use for the export.

Source code in src/zenml/stack/stack_component.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
OtelLogStoreFlavor

Bases: Flavor

OpenTelemetry log store flavor.

Attributes
config_class: Type[BaseLogStoreConfig] property

Returns DatadogLogStoreConfig config class.

Returns:

Type Description
Type[BaseLogStoreConfig]

The config class.

docs_url: str property

URL to the flavor documentation.

Returns:

Type Description
str

The URL to the flavor documentation.

implementation_class: Type[BaseLogStore] property

Implementation class for this flavor.

Returns:

Type Description
Type[BaseLogStore]

The implementation class.

logo_url: str property

URL to the flavor logo.

Returns:

Type Description
str

The URL to the flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: str property

URL to the SDK docs for this flavor.

Returns:

Type Description
str

The URL to the SDK docs for this flavor.

type: StackComponentType property

Stack component type.

Returns:

Type Description
StackComponentType

The stack component type.

otel_log_exporter

OpenTelemetry exporter that writes logs to any OpenTelemetry backend.

Classes
OTLPLogExporter(endpoint: str, certificate_file: Optional[str] = None, client_key_file: Optional[str] = None, client_certificate_file: Optional[str] = None, headers: Optional[Dict[str, str]] = None, timeout: float = DEFAULT_TIMEOUT, compression: Compression = Compression.NoCompression)

Bases: LogExporter

OpenTelemetry exporter using JSON protocol.

This exporter is a placeholder until the actual implementation of the OpenTelemetry exporter is available in the opentelemetry-exporter-otlp-proto-json package.

Initialize the exporter.

Parameters:

Name Type Description Default
endpoint str

The endpoint to export logs to.

required
certificate_file Optional[str]

The certificate file to use for the export.

None
client_key_file Optional[str]

The client key file to use for the export.

None
client_certificate_file Optional[str]

The client certificate file to use for the export.

None
headers Optional[Dict[str, str]]

The headers to use for the export.

None
timeout float

The timeout to use for the export.

DEFAULT_TIMEOUT
compression Compression

The compression to use for the export.

NoCompression
Source code in src/zenml/log_stores/otel/otel_log_exporter.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def __init__(
    self,
    endpoint: str,
    certificate_file: Optional[str] = None,
    client_key_file: Optional[str] = None,
    client_certificate_file: Optional[str] = None,
    headers: Optional[Dict[str, str]] = None,
    timeout: float = DEFAULT_TIMEOUT,
    compression: Compression = Compression.NoCompression,
):
    """Initialize the exporter.

    Args:
        endpoint: The endpoint to export logs to.
        certificate_file: The certificate file to use for the export.
        client_key_file: The client key file to use for the export.
        client_certificate_file: The client certificate file to use for the export.
        headers: The headers to use for the export.
        timeout: The timeout to use for the export.
        compression: The compression to use for the export.
    """
    self._shutdown_is_occurring = threading.Event()
    self._endpoint = endpoint
    self._certificate_file = certificate_file
    self._client_key_file = client_key_file
    self._client_certificate_file = client_certificate_file
    self._client_cert = (
        (self._client_certificate_file, self._client_key_file)
        if self._client_certificate_file and self._client_key_file
        else self._client_certificate_file
    )
    self._timeout = timeout
    self._compression = compression
    self._session = requests.Session()
    self._session.headers.update(
        {
            "Content-Type": "application/json",
            "Accept": "application/json",
            "User-Agent": f"zenml/{zenml_version}",
        }
    )
    if headers:
        self._session.headers.update(headers)

    # Retries are triggered on specific HTTP status codes:
    #
    #     408: Request Timeout.
    #     429: Too Many Requests.
    #     502: Bad Gateway.
    #     503: Service Unavailable.
    #     504: Gateway Timeout
    #
    # This also handles connection level errors, if a connection attempt
    # fails due to transient issues like:
    #
    #     DNS resolution errors.
    #     Connection timeouts.
    #     Network disruptions.
    #
    # Additional errors retried:
    #
    #     Read Timeouts: If the server does not send a response within
    #     the timeout period.
    #     Connection Refused: If the server refuses the connection.
    #
    retries = Retry(
        connect=5,
        read=5,
        redirect=3,
        status=5,
        allowed_methods=[
            "POST",
        ],
        status_forcelist=[
            408,  # Request Timeout
            429,  # Too Many Requests
            500,  # Internal Server Error
            502,  # Bad Gateway
            503,  # Service Unavailable
            504,  # Gateway Timeout
        ],
        other=3,
        backoff_factor=0.5,
        respect_retry_after_header=True,
        raise_on_status=False,
    )
    http_adapter = HTTPAdapter(
        max_retries=retries,
        pool_maxsize=1,
    )
    self._session.mount("https://", http_adapter)
    self._session.mount("http://", http_adapter)

    self._shutdown = False
Functions
export(batch: Sequence[LogData]) -> LogExportResult

Export a batch of logs to the OpenTelemetry backend.

Parameters:

Name Type Description Default
batch Sequence[LogData]

The batch of logs to export.

required

Returns:

Type Description
LogExportResult

LogExportResult indicating success or failure.

Source code in src/zenml/log_stores/otel/otel_log_exporter.py
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
def export(self, batch: Sequence["LogData"]) -> LogExportResult:
    """Export a batch of logs to the OpenTelemetry backend.

    Args:
        batch: The batch of logs to export.

    Returns:
        LogExportResult indicating success or failure.
    """
    if self._shutdown:
        logger.warning("Exporter already shutdown, ignoring batch")
        return LogExportResult.FAILURE
    encoded_logs = self._encode_logs(batch)

    serialized_data = json.dumps(
        encoded_logs,
        default=pydantic_encoder,
    ).encode("utf-8")

    try:
        resp = self._export(serialized_data, self._timeout)
        if resp.ok:
            return LogExportResult.SUCCESS
        return LogExportResult.FAILURE
    except Exception as e:
        logger.error(f"Error exporting logs: {e}")
        return LogExportResult.FAILURE
shutdown() -> None

Shutdown the exporter.

Source code in src/zenml/log_stores/otel/otel_log_exporter.py
375
376
377
378
379
380
381
382
def shutdown(self) -> None:
    """Shutdown the exporter."""
    if self._shutdown:
        logger.warning("Exporter already shutdown, ignoring call")
        return
    self._shutdown = True
    self._shutdown_is_occurring.set()
    self._session.close()
Functions
otel_log_store

OpenTelemetry log store implementation.

Classes
OtelBatchLogRecordProcessor

Bases: BatchLogRecordProcessor

OpenTelemetry batch log record processor.

This is a subclass of the BatchLogRecordProcessor that allows for a non-blocking flush.

Functions
flush(blocking: bool = True) -> bool

Force flush the batch log record processor.

Parameters:

Name Type Description Default
blocking bool

Whether to block until the flush is complete.

True

Returns:

Type Description
bool

True if the flush is successful, False otherwise.

Source code in src/zenml/log_stores/otel/otel_log_store.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def flush(self, blocking: bool = True) -> bool:
    """Force flush the batch log record processor.

    Args:
        blocking: Whether to block until the flush is complete.

    Returns:
        True if the flush is successful, False otherwise.
    """
    if not blocking:
        # For a non-blocking flush, we simply need to wake up the worker
        # thread and it will handle the flush in the background.
        self._batch_processor._worker_awaken.set()
        return True
    else:
        return self.force_flush()
OtelLogStore(*args: Any, **kwargs: Any)

Bases: BaseLogStore

Log store that exports logs using OpenTelemetry.

Subclasses should implement get_exporter() to provide the specific log exporter for their backend.

Initialize the OpenTelemetry log store.

Parameters:

Name Type Description Default
*args Any

Positional arguments for the base class.

()
**kwargs Any

Keyword arguments for the base class.

{}
Source code in src/zenml/log_stores/otel/otel_log_store.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initialize the OpenTelemetry log store.

    Args:
        *args: Positional arguments for the base class.
        **kwargs: Keyword arguments for the base class.
    """
    super().__init__(*args, **kwargs)

    self._resource: Optional["Resource"] = None
    self._exporter: Optional["LogExporter"] = None
    self._provider: Optional["LoggerProvider"] = None
    self._processor: Optional["OtelBatchLogRecordProcessor"] = None
    self._handler: Optional["LoggingHandler"] = None
Attributes
config: OtelLogStoreConfig property

Returns the configuration of the OTel log store.

Returns:

Type Description
OtelLogStoreConfig

The configuration.

origin_class: Type[OtelLogStoreOrigin] property

Class of the origin.

Returns:

Type Description
Type[OtelLogStoreOrigin]

The class of the origin.

provider: LoggerProvider property

Returns the OpenTelemetry logger provider.

Returns:

Type Description
LoggerProvider

The logger provider.

Raises:

Type Description
RuntimeError

If the OpenTelemetry log store is not initialized.

Functions
deactivate() -> None

Deactivate log collection and shut down the processor.

Flushes any pending logs and shuts down the processor's background thread.

Source code in src/zenml/log_stores/otel/otel_log_store.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
def deactivate(self) -> None:
    """Deactivate log collection and shut down the processor.

    Flushes any pending logs and shuts down the processor's background thread.
    """
    with self._lock:
        if self._processor:
            try:
                # Force flush any pending logs
                self._processor.flush(blocking=True)
                logger.debug("Flushed pending logs")
            except Exception as e:
                logger.warning(f"Error flushing logs: {e}")

            try:
                self._processor.shutdown()  # type: ignore[no-untyped-call]
                logger.debug(
                    "Shut down log processor and background thread"
                )
            except Exception as e:
                logger.warning(f"Error shutting down processor: {e}")
            else:
                self._processor = None
                self._handler = None
                self._provider = None
                self._resource = None
                self._exporter = None

    logger.debug("OtelLogStore deactivated")
emit(origin: BaseLogStoreOrigin, record: logging.LogRecord, metadata: Optional[Dict[str, Any]] = None) -> None

Process a log record by sending to OpenTelemetry.

Parameters:

Name Type Description Default
origin BaseLogStoreOrigin

The origin used to send the log record.

required
record LogRecord

The log record to process.

required
metadata Optional[Dict[str, Any]]

Additional metadata to attach to the log entry.

None

Raises:

Type Description
RuntimeError

If the OpenTelemetry provider is not initialized.

Source code in src/zenml/log_stores/otel/otel_log_store.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def emit(
    self,
    origin: BaseLogStoreOrigin,
    record: logging.LogRecord,
    metadata: Optional[Dict[str, Any]] = None,
) -> None:
    """Process a log record by sending to OpenTelemetry.

    Args:
        origin: The origin used to send the log record.
        record: The log record to process.
        metadata: Additional metadata to attach to the log entry.

    Raises:
        RuntimeError: If the OpenTelemetry provider is not initialized.
    """
    assert isinstance(origin, OtelLogStoreOrigin)
    with self._lock:
        if not self._provider:
            self._activate()

        if self._handler is None:
            raise RuntimeError("OpenTelemetry provider is not initialized")

        emit_kwargs = self._handler._translate(record)
        emit_kwargs["attributes"].update(origin.metadata)
        if metadata:
            emit_kwargs["attributes"].update(metadata)

        origin.logger.emit(**emit_kwargs)
fetch(logs_model: LogsResponse, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, limit: int = MAX_ENTRIES_PER_REQUEST) -> List[LogEntry]

Fetch logs from the OpenTelemetry backend.

This method should be overridden by subclasses to implement backend-specific log retrieval. The base implementation returns an empty list.

Parameters:

Name Type Description Default
logs_model LogsResponse

The logs model containing run and step metadata.

required
start_time Optional[datetime]

Filter logs after this time.

None
end_time Optional[datetime]

Filter logs before this time.

None
limit int

Maximum number of log entries to return.

MAX_ENTRIES_PER_REQUEST

Raises:

Type Description
NotImplementedError

Log fetching is not supported by the OTEL log store.

Source code in src/zenml/log_stores/otel/otel_log_store.py
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
def fetch(
    self,
    logs_model: "LogsResponse",
    start_time: Optional[datetime] = None,
    end_time: Optional[datetime] = None,
    limit: int = MAX_ENTRIES_PER_REQUEST,
) -> List["LogEntry"]:
    """Fetch logs from the OpenTelemetry backend.

    This method should be overridden by subclasses to implement
    backend-specific log retrieval. The base implementation returns
    an empty list.

    Args:
        logs_model: The logs model containing run and step metadata.
        start_time: Filter logs after this time.
        end_time: Filter logs before this time.
        limit: Maximum number of log entries to return.

    Raises:
        NotImplementedError: Log fetching is not supported by the OTEL log
            store.
    """
    raise NotImplementedError(
        "Log fetching is not supported by the OTEL log store."
    )
flush(blocking: bool = True) -> None

Flush the log store.

Parameters:

Name Type Description Default
blocking bool

Whether to block until the flush is complete.

True

This method is called to ensure that all logs are flushed to the backend.

Source code in src/zenml/log_stores/otel/otel_log_store.py
271
272
273
274
275
276
277
278
279
280
281
def flush(self, blocking: bool = True) -> None:
    """Flush the log store.

    Args:
        blocking: Whether to block until the flush is complete.

    This method is called to ensure that all logs are flushed to the backend.
    """
    with self._lock:
        if self._processor:
            self._processor.flush(blocking=blocking)
get_exporter() -> LogExporter

Get the Datadog log exporter.

Returns:

Type Description
LogExporter

OTLPLogExporter configured with API key and site.

Source code in src/zenml/log_stores/otel/otel_log_store.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def get_exporter(self) -> "LogExporter":
    """Get the Datadog log exporter.

    Returns:
        OTLPLogExporter configured with API key and site.
    """
    if not self._exporter:
        self._exporter = OTLPLogExporter(
            endpoint=self.config.endpoint,
            headers=self.config.headers,
            certificate_file=self.config.certificate_file,
            client_key_file=self.config.client_key_file,
            client_certificate_file=self.config.client_certificate_file,
            compression=self.config.compression,
        )

    return self._exporter
register_origin(name: str, log_model: LogsResponse, metadata: Dict[str, Any]) -> BaseLogStoreOrigin

Register an origin for the log store.

Parameters:

Name Type Description Default
name str

The name of the origin.

required
log_model LogsResponse

The log model associated with the origin.

required
metadata Dict[str, Any]

Additional metadata to attach to the log entry.

required

Returns:

Type Description
BaseLogStoreOrigin

The origin.

Source code in src/zenml/log_stores/otel/otel_log_store.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def register_origin(
    self, name: str, log_model: LogsResponse, metadata: Dict[str, Any]
) -> BaseLogStoreOrigin:
    """Register an origin for the log store.

    Args:
        name: The name of the origin.
        log_model: The log model associated with the origin.
        metadata: Additional metadata to attach to the log entry.

    Returns:
        The origin.
    """
    with self._lock:
        if not self._provider:
            self._activate()

    return super().register_origin(name, log_model, metadata)
OtelLogStoreOrigin(name: str, log_store: BaseLogStore, log_model: LogsResponse, metadata: Dict[str, Any])

Bases: BaseLogStoreOrigin

OpenTelemetry log store origin.

Initialize a log store origin.

Parameters:

Name Type Description Default
name str

The name of the origin.

required
log_store BaseLogStore

The log store to emit logs to.

required
log_model LogsResponse

The log model associated with the origin.

required
metadata Dict[str, Any]

Additional metadata to attach to all log entries that will be emitted by this origin.

required
Source code in src/zenml/log_stores/otel/otel_log_store.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def __init__(
    self,
    name: str,
    log_store: "BaseLogStore",
    log_model: LogsResponse,
    metadata: Dict[str, Any],
) -> None:
    """Initialize a log store origin.

    Args:
        name: The name of the origin.
        log_store: The log store to emit logs to.
        log_model: The log model associated with the origin.
        metadata: Additional metadata to attach to all log entries that will
            be emitted by this origin.
    """
    metadata = {f"zenml.{key}": value for key, value in metadata.items()}

    metadata.update(
        {
            "zenml.log.id": str(log_model.id),
            "zenml.log.source": log_model.source,
            "zenml.log_store.id": str(log_store.id),
            "zenml.log_store.name": log_store.name,
        }
    )

    super().__init__(name, log_store, log_model, metadata)
    assert isinstance(log_store, OtelLogStore)

    self._logger = log_store.provider.get_logger(name)
Attributes
logger: Logger property

Returns the OpenTelemetry logger for this origin.

Returns:

Type Description
Logger

The logger.

Functions
Functions