Analytics
zenml.analytics
special
The 'analytics' module of ZenML.
This module is based on the 'analytics-python' package created by Segment. The base functionalities are adapted to work with the ZenML analytics server.
group(user_id, group_id, traits)
Send a group call with the default client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
UUID |
The user ID. |
required |
group_id |
UUID |
The group ID. |
required |
traits |
Optional[Dict[Any, Any]] |
Traits to assign to the group. |
required |
Returns:
Type | Description |
---|---|
Tuple[bool, str] |
Tuple (success flag, the original message). |
Source code in zenml/analytics/__init__.py
def group(
user_id: UUID, group_id: UUID, traits: Optional[Dict[Any, Any]]
) -> Tuple[bool, str]:
"""Send a group call with the default client.
Args:
user_id: The user ID.
group_id: The group ID.
traits: Traits to assign to the group.
Returns:
Tuple (success flag, the original message).
"""
set_default_client()
assert default_client is not None
return default_client.group(
user_id=user_id, group_id=group_id, traits=traits
)
identify(user_id, traits)
Send an identify call with the default client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
UUID |
The user ID. |
required |
traits |
Optional[Dict[Any, Any]] |
The traits for the identification process. |
required |
Returns:
Type | Description |
---|---|
Tuple[bool, str] |
Tuple (success flag, the original message). |
Source code in zenml/analytics/__init__.py
def identify(
user_id: UUID, traits: Optional[Dict[Any, Any]]
) -> Tuple[bool, str]:
"""Send an identify call with the default client.
Args:
user_id: The user ID.
traits: The traits for the identification process.
Returns:
Tuple (success flag, the original message).
"""
set_default_client()
assert default_client is not None
return default_client.identify(
user_id=user_id,
traits=traits,
)
set_default_client()
Sets up a default client with the default configuration.
Source code in zenml/analytics/__init__.py
def set_default_client() -> None:
"""Sets up a default client with the default configuration."""
global default_client
if default_client is None:
default_client = Client(
debug=debug,
max_queue_size=max_queue_size,
send=send,
on_error=on_error,
max_retries=max_retries,
sync_mode=sync_mode,
timeout=timeout,
)
track(user_id, event, properties)
Send a track call with the default client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
UUID |
The user ID. |
required |
event |
AnalyticsEvent |
The type of the event. |
required |
properties |
Optional[Dict[Any, Any]] |
Dict of additional properties for the event. |
required |
Returns:
Type | Description |
---|---|
Tuple[bool, str] |
Tuple (success flag, the original message). |
Source code in zenml/analytics/__init__.py
def track(
user_id: UUID,
event: "AnalyticsEvent",
properties: Optional[Dict[Any, Any]],
) -> Tuple[bool, str]:
"""Send a track call with the default client.
Args:
user_id: The user ID.
event: The type of the event.
properties: Dict of additional properties for the event.
Returns:
Tuple (success flag, the original message).
"""
set_default_client()
assert default_client is not None
return default_client.track(
user_id=user_id, event=event, properties=properties
)
client
The analytics module of ZenML.
This module is based on the 'analytics-python' package created by Segment. The base functionalities are adapted to work with the ZenML analytics server.
AnalyticsEncoder (JSONEncoder)
Helper encoder class for JSON serialization.
Source code in zenml/analytics/client.py
class AnalyticsEncoder(json.JSONEncoder):
"""Helper encoder class for JSON serialization."""
def default(self, obj: Any) -> Any:
"""The default method to handle UUID and 'AnalyticsEvent' objects.
Args:
obj: The object to encode.
Returns:
The encoded object.
"""
from zenml.utils.analytics_utils import AnalyticsEvent
# If the object is UUID, we simply return the value of UUID
if isinstance(obj, UUID):
return str(obj)
# If the object is an AnalyticsEvent, return its value
elif isinstance(obj, AnalyticsEvent):
return str(obj.value)
return json.JSONEncoder.default(self, obj)
default(self, obj)
The default method to handle UUID and 'AnalyticsEvent' objects.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj |
Any |
The object to encode. |
required |
Returns:
Type | Description |
---|---|
Any |
The encoded object. |
Source code in zenml/analytics/client.py
def default(self, obj: Any) -> Any:
"""The default method to handle UUID and 'AnalyticsEvent' objects.
Args:
obj: The object to encode.
Returns:
The encoded object.
"""
from zenml.utils.analytics_utils import AnalyticsEvent
# If the object is UUID, we simply return the value of UUID
if isinstance(obj, UUID):
return str(obj)
# If the object is an AnalyticsEvent, return its value
elif isinstance(obj, AnalyticsEvent):
return str(obj.value)
return json.JSONEncoder.default(self, obj)
Client
The client class for ZenML analytics.
Source code in zenml/analytics/client.py
class Client(object):
"""The client class for ZenML analytics."""
class DefaultConfig(object):
"""The configuration class for the client.
Attributes:
on_error: Function to call if an error occurs.
debug: Flag to set to switch to the debug mode.
send: Flag to determine whether to send the message.
sync_mode: Flag, if set to True, uses the main thread to send
the messages, and if set to False, creates other threads
for the analytics.
max_queue_size: The maximum number of entries a single queue
can hold.
timeout: Timeout in seconds.
max_retries: The number of max tries before failing.
thread: The number of additional threads to create for the
analytics if the 'sync_mode' is set to False.
upload_interval: The upload_interval in seconds if the
'sync_mode' is set to False.
upload_size: The maximum size for messages a consumer can send
if the 'sync_mode' is set to False.
"""
on_error: Optional[Callable[..., Any]] = None
debug: bool = False
send: bool = True
sync_mode: bool = True
max_queue_size: int = 10000
timeout: int = 15
max_retries: int = 1
thread: int = 1
upload_interval: float = 0.5
upload_size: int = 100
def __init__(
self,
debug: bool = DefaultConfig.debug,
max_queue_size: int = DefaultConfig.max_queue_size,
send: bool = DefaultConfig.send,
on_error: Optional[Callable[..., Any]] = DefaultConfig.on_error,
max_retries: int = DefaultConfig.max_retries,
sync_mode: bool = DefaultConfig.sync_mode,
timeout: int = DefaultConfig.timeout,
thread: int = DefaultConfig.thread,
upload_size: int = DefaultConfig.upload_size,
upload_interval: float = DefaultConfig.upload_interval,
) -> None:
"""Initialization of the client.
Args:
debug: Flag to set to switch to the debug mode.
max_queue_size: The maximum number of entries a single queue
can hold.
send: Flag to determine whether to send the message.
on_error: Function to call if an error occurs.
max_retries: The number of max tries before failing.
sync_mode: Flag, if set to True, uses the main thread to send
the messages, and if set to False, creates other threads
for the analytics.
timeout: Timeout in seconds.
thread: The number of additional threads to create for the
analytics if the 'sync_mode' is set to False.
upload_size: The maximum size for messages a consumer can send
if the 'sync_mode' is set to False.
upload_interval: The upload_interval in seconds if the
'sync_mode' is set to False.
"""
self.queue = queue.Queue(max_queue_size) # type: ignore[var-annotated]
self.on_error = on_error
self.debug = debug
self.send = send
self.sync_mode = sync_mode
self.timeout = timeout
if debug:
logger.setLevel(logging.DEBUG)
if sync_mode:
self.consumers = None
else:
# On program exit, allow the consumer thread to exit cleanly.
# This prevents exceptions and a messy shutdown when the
# interpreter is destroyed before the daemon thread finishes
# execution. However, it is *not* the same as flushing the queue!
# To guarantee all messages have been delivered, you'll still need
# to call flush().
if send:
atexit.register(self.join)
for _ in range(thread):
self.consumers = []
consumer = Consumer(
self.queue,
base_source_context=source_context.get(),
on_error=on_error,
upload_size=upload_size,
upload_interval=upload_interval,
retries=max_retries,
timeout=timeout,
)
self.consumers.append(consumer)
# if we've disabled sending, just don't start the consumer
if send:
consumer.start()
def identify(
self, user_id: UUID, traits: Optional[Dict[Any, Any]]
) -> Tuple[bool, str]:
"""Method to identify a user with given traits.
Args:
user_id: The user ID.
traits: The traits for the identification process.
Returns:
Tuple (success flag, the original message).
"""
msg = {
"user_id": user_id,
"traits": traits or {},
"type": "identify",
"debug": IS_DEBUG_ENV,
}
return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))
def track(
self,
user_id: UUID,
event: "AnalyticsEvent",
properties: Optional[Dict[Any, Any]],
) -> Tuple[bool, str]:
"""Method to track events.
Args:
user_id: The user ID.
event: The type of the event.
properties: Dict of additional properties for the event.
Returns:
Tuple (success flag, the original message).
"""
msg = {
"user_id": user_id,
"event": event,
"properties": properties or {},
"type": "track",
"debug": IS_DEBUG_ENV,
}
return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))
def group(
self, user_id: UUID, group_id: UUID, traits: Optional[Dict[Any, Any]]
) -> Tuple[bool, str]:
"""Method to group users.
Args:
user_id: The user ID.
group_id: The group ID.
traits: Traits to assign to the group.
Returns:
Tuple (success flag, the original message).
"""
msg = {
"user_id": user_id,
"group_id": group_id,
"traits": traits or {},
"type": "group",
"debug": IS_DEBUG_ENV,
}
return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))
def _enqueue(self, msg: str) -> Tuple[bool, str]:
"""Method to queue messages to be sent.
Args:
msg: The message to queue.
Returns:
Tuple (success flag, the original message).
"""
# if send is False, return msg as if it was successfully queued
if not self.send:
return True, msg
if self.sync_mode:
post(timeout=self.timeout, batch=[msg])
return True, msg
try:
self.queue.put(msg, block=False)
return True, msg
except queue.Full:
logger.debug("ZenML analytics-python queue is full")
return False, msg
def flush(self) -> None:
"""Method to force a flush from the internal queue to the server."""
q = self.queue
size = q.qsize()
q.join()
# Note that this message may not be precise, because of threading.
logger.debug("successfully flushed about %s items.", size)
def join(self) -> None:
"""Method to end the consumer thread once the queue is empty."""
for consumer in self.consumers:
consumer.pause()
try:
consumer.join()
except RuntimeError:
# consumer thread has not started
pass
def shutdown(self) -> None:
"""Method to flush all messages and cleanly shutdown the client."""
self.flush()
self.join()
DefaultConfig
The configuration class for the client.
Attributes:
Name | Type | Description |
---|---|---|
on_error |
Optional[Callable[..., Any]] |
Function to call if an error occurs. |
debug |
bool |
Flag to set to switch to the debug mode. |
send |
bool |
Flag to determine whether to send the message. |
sync_mode |
bool |
Flag, if set to True, uses the main thread to send the messages, and if set to False, creates other threads for the analytics. |
max_queue_size |
int |
The maximum number of entries a single queue can hold. |
timeout |
int |
Timeout in seconds. |
max_retries |
int |
The number of max tries before failing. |
thread |
int |
The number of additional threads to create for the analytics if the 'sync_mode' is set to False. |
upload_interval |
float |
The upload_interval in seconds if the 'sync_mode' is set to False. |
upload_size |
int |
The maximum size for messages a consumer can send if the 'sync_mode' is set to False. |
Source code in zenml/analytics/client.py
class DefaultConfig(object):
"""The configuration class for the client.
Attributes:
on_error: Function to call if an error occurs.
debug: Flag to set to switch to the debug mode.
send: Flag to determine whether to send the message.
sync_mode: Flag, if set to True, uses the main thread to send
the messages, and if set to False, creates other threads
for the analytics.
max_queue_size: The maximum number of entries a single queue
can hold.
timeout: Timeout in seconds.
max_retries: The number of max tries before failing.
thread: The number of additional threads to create for the
analytics if the 'sync_mode' is set to False.
upload_interval: The upload_interval in seconds if the
'sync_mode' is set to False.
upload_size: The maximum size for messages a consumer can send
if the 'sync_mode' is set to False.
"""
on_error: Optional[Callable[..., Any]] = None
debug: bool = False
send: bool = True
sync_mode: bool = True
max_queue_size: int = 10000
timeout: int = 15
max_retries: int = 1
thread: int = 1
upload_interval: float = 0.5
upload_size: int = 100
__init__(self, debug=False, max_queue_size=10000, send=True, on_error=None, max_retries=1, sync_mode=True, timeout=15, thread=1, upload_size=100, upload_interval=0.5)
special
Initialization of the client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
debug |
bool |
Flag to set to switch to the debug mode. |
False |
max_queue_size |
int |
The maximum number of entries a single queue can hold. |
10000 |
send |
bool |
Flag to determine whether to send the message. |
True |
on_error |
Optional[Callable[..., Any]] |
Function to call if an error occurs. |
None |
max_retries |
int |
The number of max tries before failing. |
1 |
sync_mode |
bool |
Flag, if set to True, uses the main thread to send the messages, and if set to False, creates other threads for the analytics. |
True |
timeout |
int |
Timeout in seconds. |
15 |
thread |
int |
The number of additional threads to create for the analytics if the 'sync_mode' is set to False. |
1 |
upload_size |
int |
The maximum size for messages a consumer can send if the 'sync_mode' is set to False. |
100 |
upload_interval |
float |
The upload_interval in seconds if the 'sync_mode' is set to False. |
0.5 |
Source code in zenml/analytics/client.py
def __init__(
self,
debug: bool = DefaultConfig.debug,
max_queue_size: int = DefaultConfig.max_queue_size,
send: bool = DefaultConfig.send,
on_error: Optional[Callable[..., Any]] = DefaultConfig.on_error,
max_retries: int = DefaultConfig.max_retries,
sync_mode: bool = DefaultConfig.sync_mode,
timeout: int = DefaultConfig.timeout,
thread: int = DefaultConfig.thread,
upload_size: int = DefaultConfig.upload_size,
upload_interval: float = DefaultConfig.upload_interval,
) -> None:
"""Initialization of the client.
Args:
debug: Flag to set to switch to the debug mode.
max_queue_size: The maximum number of entries a single queue
can hold.
send: Flag to determine whether to send the message.
on_error: Function to call if an error occurs.
max_retries: The number of max tries before failing.
sync_mode: Flag, if set to True, uses the main thread to send
the messages, and if set to False, creates other threads
for the analytics.
timeout: Timeout in seconds.
thread: The number of additional threads to create for the
analytics if the 'sync_mode' is set to False.
upload_size: The maximum size for messages a consumer can send
if the 'sync_mode' is set to False.
upload_interval: The upload_interval in seconds if the
'sync_mode' is set to False.
"""
self.queue = queue.Queue(max_queue_size) # type: ignore[var-annotated]
self.on_error = on_error
self.debug = debug
self.send = send
self.sync_mode = sync_mode
self.timeout = timeout
if debug:
logger.setLevel(logging.DEBUG)
if sync_mode:
self.consumers = None
else:
# On program exit, allow the consumer thread to exit cleanly.
# This prevents exceptions and a messy shutdown when the
# interpreter is destroyed before the daemon thread finishes
# execution. However, it is *not* the same as flushing the queue!
# To guarantee all messages have been delivered, you'll still need
# to call flush().
if send:
atexit.register(self.join)
for _ in range(thread):
self.consumers = []
consumer = Consumer(
self.queue,
base_source_context=source_context.get(),
on_error=on_error,
upload_size=upload_size,
upload_interval=upload_interval,
retries=max_retries,
timeout=timeout,
)
self.consumers.append(consumer)
# if we've disabled sending, just don't start the consumer
if send:
consumer.start()
flush(self)
Method to force a flush from the internal queue to the server.
Source code in zenml/analytics/client.py
def flush(self) -> None:
"""Method to force a flush from the internal queue to the server."""
q = self.queue
size = q.qsize()
q.join()
# Note that this message may not be precise, because of threading.
logger.debug("successfully flushed about %s items.", size)
group(self, user_id, group_id, traits)
Method to group users.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
UUID |
The user ID. |
required |
group_id |
UUID |
The group ID. |
required |
traits |
Optional[Dict[Any, Any]] |
Traits to assign to the group. |
required |
Returns:
Type | Description |
---|---|
Tuple[bool, str] |
Tuple (success flag, the original message). |
Source code in zenml/analytics/client.py
def group(
self, user_id: UUID, group_id: UUID, traits: Optional[Dict[Any, Any]]
) -> Tuple[bool, str]:
"""Method to group users.
Args:
user_id: The user ID.
group_id: The group ID.
traits: Traits to assign to the group.
Returns:
Tuple (success flag, the original message).
"""
msg = {
"user_id": user_id,
"group_id": group_id,
"traits": traits or {},
"type": "group",
"debug": IS_DEBUG_ENV,
}
return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))
identify(self, user_id, traits)
Method to identify a user with given traits.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
UUID |
The user ID. |
required |
traits |
Optional[Dict[Any, Any]] |
The traits for the identification process. |
required |
Returns:
Type | Description |
---|---|
Tuple[bool, str] |
Tuple (success flag, the original message). |
Source code in zenml/analytics/client.py
def identify(
self, user_id: UUID, traits: Optional[Dict[Any, Any]]
) -> Tuple[bool, str]:
"""Method to identify a user with given traits.
Args:
user_id: The user ID.
traits: The traits for the identification process.
Returns:
Tuple (success flag, the original message).
"""
msg = {
"user_id": user_id,
"traits": traits or {},
"type": "identify",
"debug": IS_DEBUG_ENV,
}
return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))
join(self)
Method to end the consumer thread once the queue is empty.
Source code in zenml/analytics/client.py
def join(self) -> None:
"""Method to end the consumer thread once the queue is empty."""
for consumer in self.consumers:
consumer.pause()
try:
consumer.join()
except RuntimeError:
# consumer thread has not started
pass
shutdown(self)
Method to flush all messages and cleanly shutdown the client.
Source code in zenml/analytics/client.py
def shutdown(self) -> None:
"""Method to flush all messages and cleanly shutdown the client."""
self.flush()
self.join()
track(self, user_id, event, properties)
Method to track events.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
UUID |
The user ID. |
required |
event |
AnalyticsEvent |
The type of the event. |
required |
properties |
Optional[Dict[Any, Any]] |
Dict of additional properties for the event. |
required |
Returns:
Type | Description |
---|---|
Tuple[bool, str] |
Tuple (success flag, the original message). |
Source code in zenml/analytics/client.py
def track(
self,
user_id: UUID,
event: "AnalyticsEvent",
properties: Optional[Dict[Any, Any]],
) -> Tuple[bool, str]:
"""Method to track events.
Args:
user_id: The user ID.
event: The type of the event.
properties: Dict of additional properties for the event.
Returns:
Tuple (success flag, the original message).
"""
msg = {
"user_id": user_id,
"event": event,
"properties": properties or {},
"type": "track",
"debug": IS_DEBUG_ENV,
}
return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))
consumer
The analytics module of ZenML.
This module is based on the 'analytics-python' package created by Segment. The base functionalities are adapted to work with the ZenML analytics server.
Consumer (Thread)
Consumes the messages from the client's queue.
Source code in zenml/analytics/consumer.py
class Consumer(Thread):
"""Consumes the messages from the client's queue."""
def __init__(
self,
queue: Queue, # type: ignore[type-arg]
base_source_context: SourceContextTypes,
upload_size: int = 100,
on_error: Optional[Callable[..., Any]] = None,
upload_interval: float = 0.5,
retries: int = 10,
timeout: int = 15,
) -> None:
"""Initialize and create a consumer thread.
Args:
queue: The list of messages in the queue.
base_source_context: the context type which will be set for the
thread as this consumer runs.
upload_size: The maximum size for messages a consumer can send
if the 'sync_mode' is set to False.
on_error: Function to call if an error occurs.
upload_interval: The upload_interval in seconds
retries: The number of max tries before failing.
timeout: Timeout in seconds.
"""
Thread.__init__(self)
# Initialization of the logging, that silences the backoff logger
init_logging()
# Store the base context to set for the thread
self.base_source_context = base_source_context
# Set the source context from the base
# Make consumer a daemon thread so that it doesn't block program exit
self.daemon = True
self.upload_size = upload_size
self.upload_interval = upload_interval
self.on_error = on_error
self.queue = queue
# It's important to set running in the constructor: if we are asked to
# pause immediately after construction, we might set running to True in
# run() *after* we set it to False in pause... and keep running
# forever.
self.running = True
self.retries = retries
self.timeout = timeout
def run(self) -> None:
"""Runs the consumer."""
# Set the base context for each thread
from zenml.analytics import source_context
source_context.set(self.base_source_context)
# Run the thread
logger.debug("Consumer is running...")
while self.running:
self.upload()
logger.debug("Consumer exited.")
def pause(self) -> None:
"""Pause the consumer."""
self.running = False
def upload(self) -> bool:
"""Upload the next batch of items, return whether successful.
Returns:
If the upload succeeded.
"""
success = False
batch = self.next()
if len(batch) == 0:
return False
try:
self.request(batch)
success = True
except Exception as e:
logger.debug("error uploading: %s", e)
success = False
if self.on_error:
self.on_error(e, batch)
finally:
# mark items as acknowledged from queue
for _ in batch:
self.queue.task_done()
return success
def next(self) -> List[str]:
"""Return the next batch of items to upload.
Returns:
The next batch of items to upload.
"""
queue = self.queue
items: List[str] = []
start_time = monotonic.monotonic()
total_size = 0
while len(items) < self.upload_size:
elapsed = monotonic.monotonic() - start_time
if elapsed >= self.upload_interval:
break
try:
item = queue.get(
block=True, timeout=self.upload_interval - elapsed
)
item_size = len(item.encode())
if item_size > MAX_MSG_SIZE:
logger.debug(
"Item exceeds 32kb limit, dropping. (%s)", str(item)
)
continue
items.append(item)
total_size += item_size
if total_size >= BATCH_SIZE_LIMIT:
logger.debug("hit batch size limit (size: %d)", total_size)
break
except Empty:
break
return items
def request(self, batch: List[str]) -> None:
"""Attempt to upload the batch and retry before raising an error.
Args:
batch: The batch to upload.
"""
def fatal_exception(exc: Any) -> bool:
if isinstance(exc, AnalyticsAPIError):
# retry on server errors and client errors
# with 429 status code (rate limited),
# don't retry on other client errors
return (400 <= exc.status < 500) and exc.status != 429
else:
# retry on all other errors (e.g. network)
return False
@backoff.on_exception( # type: ignore[misc]
backoff.expo,
Exception,
max_tries=self.retries + 1,
giveup=fatal_exception,
)
def send_request() -> None:
"""Function to send a batch of messages."""
post(timeout=self.timeout, batch=batch)
send_request()
__init__(self, queue, base_source_context, upload_size=100, on_error=None, upload_interval=0.5, retries=10, timeout=15)
special
Initialize and create a consumer thread.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
queue |
Queue |
The list of messages in the queue. |
required |
base_source_context |
SourceContextTypes |
the context type which will be set for the thread as this consumer runs. |
required |
upload_size |
int |
The maximum size for messages a consumer can send if the 'sync_mode' is set to False. |
100 |
on_error |
Optional[Callable[..., Any]] |
Function to call if an error occurs. |
None |
upload_interval |
float |
The upload_interval in seconds |
0.5 |
retries |
int |
The number of max tries before failing. |
10 |
timeout |
int |
Timeout in seconds. |
15 |
Source code in zenml/analytics/consumer.py
def __init__(
self,
queue: Queue, # type: ignore[type-arg]
base_source_context: SourceContextTypes,
upload_size: int = 100,
on_error: Optional[Callable[..., Any]] = None,
upload_interval: float = 0.5,
retries: int = 10,
timeout: int = 15,
) -> None:
"""Initialize and create a consumer thread.
Args:
queue: The list of messages in the queue.
base_source_context: the context type which will be set for the
thread as this consumer runs.
upload_size: The maximum size for messages a consumer can send
if the 'sync_mode' is set to False.
on_error: Function to call if an error occurs.
upload_interval: The upload_interval in seconds
retries: The number of max tries before failing.
timeout: Timeout in seconds.
"""
Thread.__init__(self)
# Initialization of the logging, that silences the backoff logger
init_logging()
# Store the base context to set for the thread
self.base_source_context = base_source_context
# Set the source context from the base
# Make consumer a daemon thread so that it doesn't block program exit
self.daemon = True
self.upload_size = upload_size
self.upload_interval = upload_interval
self.on_error = on_error
self.queue = queue
# It's important to set running in the constructor: if we are asked to
# pause immediately after construction, we might set running to True in
# run() *after* we set it to False in pause... and keep running
# forever.
self.running = True
self.retries = retries
self.timeout = timeout
next(self)
Return the next batch of items to upload.
Returns:
Type | Description |
---|---|
List[str] |
The next batch of items to upload. |
Source code in zenml/analytics/consumer.py
def next(self) -> List[str]:
"""Return the next batch of items to upload.
Returns:
The next batch of items to upload.
"""
queue = self.queue
items: List[str] = []
start_time = monotonic.monotonic()
total_size = 0
while len(items) < self.upload_size:
elapsed = monotonic.monotonic() - start_time
if elapsed >= self.upload_interval:
break
try:
item = queue.get(
block=True, timeout=self.upload_interval - elapsed
)
item_size = len(item.encode())
if item_size > MAX_MSG_SIZE:
logger.debug(
"Item exceeds 32kb limit, dropping. (%s)", str(item)
)
continue
items.append(item)
total_size += item_size
if total_size >= BATCH_SIZE_LIMIT:
logger.debug("hit batch size limit (size: %d)", total_size)
break
except Empty:
break
return items
pause(self)
Pause the consumer.
Source code in zenml/analytics/consumer.py
def pause(self) -> None:
"""Pause the consumer."""
self.running = False
request(self, batch)
Attempt to upload the batch and retry before raising an error.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch |
List[str] |
The batch to upload. |
required |
Source code in zenml/analytics/consumer.py
def request(self, batch: List[str]) -> None:
"""Attempt to upload the batch and retry before raising an error.
Args:
batch: The batch to upload.
"""
def fatal_exception(exc: Any) -> bool:
if isinstance(exc, AnalyticsAPIError):
# retry on server errors and client errors
# with 429 status code (rate limited),
# don't retry on other client errors
return (400 <= exc.status < 500) and exc.status != 429
else:
# retry on all other errors (e.g. network)
return False
@backoff.on_exception( # type: ignore[misc]
backoff.expo,
Exception,
max_tries=self.retries + 1,
giveup=fatal_exception,
)
def send_request() -> None:
"""Function to send a batch of messages."""
post(timeout=self.timeout, batch=batch)
send_request()
run(self)
Runs the consumer.
Source code in zenml/analytics/consumer.py
def run(self) -> None:
"""Runs the consumer."""
# Set the base context for each thread
from zenml.analytics import source_context
source_context.set(self.base_source_context)
# Run the thread
logger.debug("Consumer is running...")
while self.running:
self.upload()
logger.debug("Consumer exited.")
upload(self)
Upload the next batch of items, return whether successful.
Returns:
Type | Description |
---|---|
bool |
If the upload succeeded. |
Source code in zenml/analytics/consumer.py
def upload(self) -> bool:
"""Upload the next batch of items, return whether successful.
Returns:
If the upload succeeded.
"""
success = False
batch = self.next()
if len(batch) == 0:
return False
try:
self.request(batch)
success = True
except Exception as e:
logger.debug("error uploading: %s", e)
success = False
if self.on_error:
self.on_error(e, batch)
finally:
# mark items as acknowledged from queue
for _ in batch:
self.queue.task_done()
return success
context
The analytics module of ZenML.
This module is based on the 'analytics-python' package created by Segment. The base functionalities are adapted to work with the ZenML analytics server.
AnalyticsContext
Client class for ZenML Analytics v2.
Source code in zenml/analytics/context.py
class AnalyticsContext:
"""Client class for ZenML Analytics v2."""
def __init__(self) -> None:
"""Initialization.
Use this as a context manager to ensure that analytics are initialized
properly, only tracked when configured to do so and that any errors
are handled gracefully.
"""
self.analytics_opt_in: bool = False
self.user_id: Optional[UUID] = None
self.client_id: Optional[UUID] = None
self.server_id: Optional[UUID] = None
self.database_type: Optional["ServerDatabaseType"] = None
self.deployment_type: Optional["ServerDeploymentType"] = None
@property
def in_server(self) -> bool:
"""Flag to check whether the code is running in a ZenML server.
Returns:
True if running in a server, False otherwise.
"""
return handle_bool_env_var(ENV_ZENML_SERVER)
def __enter__(self) -> "AnalyticsContext":
"""Enter analytics context manager.
Returns:
The analytics context.
"""
# Fetch the analytics opt-in setting
from zenml.config.global_config import GlobalConfiguration
gc = GlobalConfiguration()
self.analytics_opt_in = gc.analytics_opt_in
if not self.analytics_opt_in:
return self
try:
# Fetch the `user_id`
if self.in_server:
from zenml.zen_server.auth import get_auth_context
# If the code is running on the server, use the auth context.
auth_context = get_auth_context()
if auth_context is not None:
self.user_id = auth_context.user.id
else:
# If the code is running on the client, use the default user.
default_user = gc.zen_store.get_user()
self.user_id = default_user.id
# Fetch the `client_id`
if self.in_server:
# If the code is running on the server, there is no client id.
self.client_id = None
else:
# If the code is running on the client, attach the client id.
self.client_id = gc.user_id
# Fetch the store information including the `server_id`
store_info = gc.zen_store.get_store_info()
self.server_id = store_info.id
self.deployment_type = store_info.deployment_type
self.database_type = store_info.database_type
except Exception as e:
self.analytics_opt_in = False
logger.debug(f"Analytics initialization failed: {e}")
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> bool:
"""Exit context manager.
Args:
exc_type: Exception type.
exc_val: Exception value.
exc_tb: Exception traceback.
Returns:
True.
"""
if exc_val is not None:
logger.debug(f"Sending telemetry 2.0 data failed: {exc_val}")
return True
def identify(self, traits: Optional[Dict[str, Any]] = None) -> bool:
"""Identify the user through segment.
Args:
traits: Traits of the user.
Returns:
True if tracking information was sent, False otherwise.
"""
success = False
if self.analytics_opt_in and self.user_id is not None:
success, _ = analytics.identify(
user_id=self.user_id,
traits=traits,
)
return success
def group(
self,
group_id: UUID,
traits: Optional[Dict[str, Any]] = None,
) -> bool:
"""Group the user.
Args:
group_id: Group ID.
traits: Traits of the group.
Returns:
True if tracking information was sent, False otherwise.
"""
success = False
if self.analytics_opt_in and self.user_id is not None:
if traits is None:
traits = {}
traits.update({"group_id": group_id})
success, _ = analytics.group(
user_id=self.user_id,
group_id=group_id,
traits=traits,
)
return success
def track(
self,
event: "AnalyticsEvent",
properties: Optional[Dict[str, Any]] = None,
) -> bool:
"""Track an event.
Args:
event: Event to track.
properties: Event properties.
Returns:
True if tracking information was sent, False otherwise.
"""
from zenml.utils.analytics_utils import AnalyticsEvent
if properties is None:
properties = {}
if (
not self.analytics_opt_in
and event.value
not in {
AnalyticsEvent.OPT_OUT_ANALYTICS,
AnalyticsEvent.OPT_IN_ANALYTICS,
}
or self.user_id is None
):
return False
# add basics
properties.update(Environment.get_system_info())
properties.update(
{
"environment": get_environment(),
"python_version": Environment.python_version(),
"version": __version__,
"client_id": str(self.client_id),
"user_id": str(self.user_id),
"server_id": str(self.server_id),
"deployment_type": str(self.deployment_type),
"database_type": str(self.database_type),
}
)
for k, v in properties.items():
if isinstance(v, UUID):
properties[k] = str(v)
success, _ = analytics.track(
user_id=self.user_id,
event=event,
properties=properties,
)
logger.debug(
f"Sending analytics: User: {self.user_id}, Event: {event}, "
f"Metadata: {properties}"
)
return success
in_server: bool
property
readonly
Flag to check whether the code is running in a ZenML server.
Returns:
Type | Description |
---|---|
bool |
True if running in a server, False otherwise. |
__enter__(self)
special
Enter analytics context manager.
Returns:
Type | Description |
---|---|
AnalyticsContext |
The analytics context. |
Source code in zenml/analytics/context.py
def __enter__(self) -> "AnalyticsContext":
"""Enter analytics context manager.
Returns:
The analytics context.
"""
# Fetch the analytics opt-in setting
from zenml.config.global_config import GlobalConfiguration
gc = GlobalConfiguration()
self.analytics_opt_in = gc.analytics_opt_in
if not self.analytics_opt_in:
return self
try:
# Fetch the `user_id`
if self.in_server:
from zenml.zen_server.auth import get_auth_context
# If the code is running on the server, use the auth context.
auth_context = get_auth_context()
if auth_context is not None:
self.user_id = auth_context.user.id
else:
# If the code is running on the client, use the default user.
default_user = gc.zen_store.get_user()
self.user_id = default_user.id
# Fetch the `client_id`
if self.in_server:
# If the code is running on the server, there is no client id.
self.client_id = None
else:
# If the code is running on the client, attach the client id.
self.client_id = gc.user_id
# Fetch the store information including the `server_id`
store_info = gc.zen_store.get_store_info()
self.server_id = store_info.id
self.deployment_type = store_info.deployment_type
self.database_type = store_info.database_type
except Exception as e:
self.analytics_opt_in = False
logger.debug(f"Analytics initialization failed: {e}")
return self
__exit__(self, exc_type, exc_val, exc_tb)
special
Exit context manager.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
exc_type |
Optional[Type[BaseException]] |
Exception type. |
required |
exc_val |
Optional[BaseException] |
Exception value. |
required |
exc_tb |
Optional[traceback] |
Exception traceback. |
required |
Returns:
Type | Description |
---|---|
bool |
True. |
Source code in zenml/analytics/context.py
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> bool:
"""Exit context manager.
Args:
exc_type: Exception type.
exc_val: Exception value.
exc_tb: Exception traceback.
Returns:
True.
"""
if exc_val is not None:
logger.debug(f"Sending telemetry 2.0 data failed: {exc_val}")
return True
__init__(self)
special
Initialization.
Use this as a context manager to ensure that analytics are initialized properly, only tracked when configured to do so and that any errors are handled gracefully.
Source code in zenml/analytics/context.py
def __init__(self) -> None:
"""Initialization.
Use this as a context manager to ensure that analytics are initialized
properly, only tracked when configured to do so and that any errors
are handled gracefully.
"""
self.analytics_opt_in: bool = False
self.user_id: Optional[UUID] = None
self.client_id: Optional[UUID] = None
self.server_id: Optional[UUID] = None
self.database_type: Optional["ServerDatabaseType"] = None
self.deployment_type: Optional["ServerDeploymentType"] = None
group(self, group_id, traits=None)
Group the user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
group_id |
UUID |
Group ID. |
required |
traits |
Optional[Dict[str, Any]] |
Traits of the group. |
None |
Returns:
Type | Description |
---|---|
bool |
True if tracking information was sent, False otherwise. |
Source code in zenml/analytics/context.py
def group(
self,
group_id: UUID,
traits: Optional[Dict[str, Any]] = None,
) -> bool:
"""Group the user.
Args:
group_id: Group ID.
traits: Traits of the group.
Returns:
True if tracking information was sent, False otherwise.
"""
success = False
if self.analytics_opt_in and self.user_id is not None:
if traits is None:
traits = {}
traits.update({"group_id": group_id})
success, _ = analytics.group(
user_id=self.user_id,
group_id=group_id,
traits=traits,
)
return success
identify(self, traits=None)
Identify the user through segment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
traits |
Optional[Dict[str, Any]] |
Traits of the user. |
None |
Returns:
Type | Description |
---|---|
bool |
True if tracking information was sent, False otherwise. |
Source code in zenml/analytics/context.py
def identify(self, traits: Optional[Dict[str, Any]] = None) -> bool:
"""Identify the user through segment.
Args:
traits: Traits of the user.
Returns:
True if tracking information was sent, False otherwise.
"""
success = False
if self.analytics_opt_in and self.user_id is not None:
success, _ = analytics.identify(
user_id=self.user_id,
traits=traits,
)
return success
track(self, event, properties=None)
Track an event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
AnalyticsEvent |
Event to track. |
required |
properties |
Optional[Dict[str, Any]] |
Event properties. |
None |
Returns:
Type | Description |
---|---|
bool |
True if tracking information was sent, False otherwise. |
Source code in zenml/analytics/context.py
def track(
self,
event: "AnalyticsEvent",
properties: Optional[Dict[str, Any]] = None,
) -> bool:
"""Track an event.
Args:
event: Event to track.
properties: Event properties.
Returns:
True if tracking information was sent, False otherwise.
"""
from zenml.utils.analytics_utils import AnalyticsEvent
if properties is None:
properties = {}
if (
not self.analytics_opt_in
and event.value
not in {
AnalyticsEvent.OPT_OUT_ANALYTICS,
AnalyticsEvent.OPT_IN_ANALYTICS,
}
or self.user_id is None
):
return False
# add basics
properties.update(Environment.get_system_info())
properties.update(
{
"environment": get_environment(),
"python_version": Environment.python_version(),
"version": __version__,
"client_id": str(self.client_id),
"user_id": str(self.user_id),
"server_id": str(self.server_id),
"deployment_type": str(self.deployment_type),
"database_type": str(self.database_type),
}
)
for k, v in properties.items():
if isinstance(v, UUID):
properties[k] = str(v)
success, _ = analytics.track(
user_id=self.user_id,
event=event,
properties=properties,
)
logger.debug(
f"Sending analytics: User: {self.user_id}, Event: {event}, "
f"Metadata: {properties}"
)
return success
request
The 'analytics' module of ZenML.
This module is based on the 'analytics-python' package created by Segment. The base functionalities are adapted to work with the ZenML analytics server.
AnalyticsAPIError (Exception)
Custom exception class for API-related errors.
Source code in zenml/analytics/request.py
class AnalyticsAPIError(Exception):
"""Custom exception class for API-related errors."""
def __init__(self, status: int, message: str) -> None:
"""Initialization.
Args:
status: The status code of the response.
message: The text of the response.
"""
self.message = message
self.status = status
def __str__(self) -> str:
"""Method to represent the instance as a string.
Returns:
A representation of the message and the status code.
"""
msg = "[ZenML Analytics] {1}: {0}"
return msg.format(self.message, self.status)
__init__(self, status, message)
special
Initialization.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
status |
int |
The status code of the response. |
required |
message |
str |
The text of the response. |
required |
Source code in zenml/analytics/request.py
def __init__(self, status: int, message: str) -> None:
"""Initialization.
Args:
status: The status code of the response.
message: The text of the response.
"""
self.message = message
self.status = status
__str__(self)
special
Method to represent the instance as a string.
Returns:
Type | Description |
---|---|
str |
A representation of the message and the status code. |
Source code in zenml/analytics/request.py
def __str__(self) -> str:
"""Method to represent the instance as a string.
Returns:
A representation of the message and the status code.
"""
msg = "[ZenML Analytics] {1}: {0}"
return msg.format(self.message, self.status)
post(batch, timeout=15)
Post a batch of messages to the ZenML analytics server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch |
List[str] |
The messages to send. |
required |
timeout |
int |
Timeout in seconds. |
15 |
Returns:
Type | Description |
---|---|
Response |
The response. |
Exceptions:
Type | Description |
---|---|
AnalyticsAPIError |
If the post request has failed. |
Source code in zenml/analytics/request.py
def post(batch: List[str], timeout: int = 15) -> requests.Response:
"""Post a batch of messages to the ZenML analytics server.
Args:
batch: The messages to send.
timeout: Timeout in seconds.
Returns:
The response.
Raises:
AnalyticsAPIError: If the post request has failed.
"""
from zenml.analytics import source_context
headers = {
"accept": "application/json",
"content-type": "application/json",
source_context.name: source_context.get().value,
}
response = requests.post(
url=ANALYTICS_SERVER_URL + "/batch",
headers=headers,
data=f"[{','.join(batch)}]",
timeout=timeout,
)
if response.status_code == 200:
logger.debug("data uploaded successfully")
return response
raise AnalyticsAPIError(response.status_code, response.text)