Analytics
zenml.analytics
special
The 'analytics' module of ZenML.
alias(user_id, previous_id)
Alias user IDs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
UUID |
The user ID. |
required |
previous_id |
UUID |
Previous ID for the alias. |
required |
Returns:
Type | Description |
---|---|
bool |
True if event is sent successfully, False is not. |
Source code in zenml/analytics/__init__.py
def alias(user_id: UUID, previous_id: UUID) -> bool: # type: ignore[return]
"""Alias user IDs.
Args:
user_id: The user ID.
previous_id: Previous ID for the alias.
Returns:
True if event is sent successfully, False is not.
"""
from zenml.analytics.context import AnalyticsContext
with AnalyticsContext() as analytics:
return analytics.alias(user_id=user_id, previous_id=previous_id)
group(group_id, group_metadata=None)
Attach metadata to a segment group.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
group_id |
UUID |
ID of the group. |
required |
group_metadata |
Optional[Dict[str, Any]] |
Metadata to attach to the group. |
None |
Returns:
Type | Description |
---|---|
bool |
True if event is sent successfully, False if not. |
Source code in zenml/analytics/__init__.py
def group( # type: ignore[return]
group_id: UUID,
group_metadata: Optional[Dict[str, Any]] = None,
) -> bool:
"""Attach metadata to a segment group.
Args:
group_id: ID of the group.
group_metadata: Metadata to attach to the group.
Returns:
True if event is sent successfully, False if not.
"""
from zenml.analytics.context import AnalyticsContext
with AnalyticsContext() as analytics:
return analytics.group(group_id=group_id, traits=group_metadata)
identify(metadata=None)
Attach metadata to user directly.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
metadata |
Optional[Dict[str, Any]] |
Dict of metadata to attach to the user. |
None |
Returns:
Type | Description |
---|---|
bool |
True if event is sent successfully, False is not. |
Source code in zenml/analytics/__init__.py
def identify( # type: ignore[return]
metadata: Optional[Dict[str, Any]] = None
) -> bool:
"""Attach metadata to user directly.
Args:
metadata: Dict of metadata to attach to the user.
Returns:
True if event is sent successfully, False is not.
"""
from zenml.analytics.context import AnalyticsContext
if metadata is None:
return False
with AnalyticsContext() as analytics:
return analytics.identify(traits=metadata)
track(event, metadata=None)
Track segment event if user opted-in.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
AnalyticsEvent |
Name of event to track in segment. |
required |
metadata |
Optional[Dict[str, Any]] |
Dict of metadata to track. |
None |
Returns:
Type | Description |
---|---|
bool |
True if event is sent successfully, False if not. |
Source code in zenml/analytics/__init__.py
def track( # type: ignore[return]
event: "AnalyticsEvent",
metadata: Optional[Dict[str, Any]] = None,
) -> bool:
"""Track segment event if user opted-in.
Args:
event: Name of event to track in segment.
metadata: Dict of metadata to track.
Returns:
True if event is sent successfully, False if not.
"""
from zenml.analytics.context import AnalyticsContext
if metadata is None:
metadata = {}
metadata.setdefault("event_success", True)
with AnalyticsContext() as analytics:
return analytics.track(event=event, properties=metadata)
client
The analytics client of ZenML.
Client
The client class for ZenML analytics.
Source code in zenml/analytics/client.py
class Client(object):
"""The client class for ZenML analytics."""
def __init__(self, send: bool = True, timeout: int = 15) -> None:
"""Initialization of the client.
Args:
send: Flag to determine whether to send the message.
timeout: Timeout in seconds.
"""
self.send = send
self.timeout = timeout
def identify(
self, user_id: UUID, traits: Optional[Dict[Any, Any]]
) -> Tuple[bool, str]:
"""Method to identify a user with given traits.
Args:
user_id: The user ID.
traits: The traits for the identification process.
Returns:
Tuple (success flag, the original message).
"""
msg = {
"user_id": user_id,
"traits": traits or {},
"type": "identify",
"debug": IS_DEBUG_ENV,
}
return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))
def alias(self, user_id: UUID, previous_id: UUID) -> Tuple[bool, str]:
"""Method to alias user IDs.
Args:
user_id: The user ID.
previous_id: Previous ID for the alias.
Returns:
Tuple (success flag, the original message).
"""
msg = {
"user_id": user_id,
"previous_id": previous_id,
"type": "alias",
"debug": IS_DEBUG_ENV,
}
return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))
def track(
self,
user_id: UUID,
event: "AnalyticsEvent",
properties: Optional[Dict[Any, Any]],
) -> Tuple[bool, str]:
"""Method to track events.
Args:
user_id: The user ID.
event: The type of the event.
properties: Dict of additional properties for the event.
Returns:
Tuple (success flag, the original message).
"""
msg = {
"user_id": user_id,
"event": event,
"properties": properties or {},
"type": "track",
"debug": IS_DEBUG_ENV,
}
return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))
def group(
self, user_id: UUID, group_id: UUID, traits: Optional[Dict[Any, Any]]
) -> Tuple[bool, str]:
"""Method to group users.
Args:
user_id: The user ID.
group_id: The group ID.
traits: Traits to assign to the group.
Returns:
Tuple (success flag, the original message).
"""
msg = {
"user_id": user_id,
"group_id": group_id,
"traits": traits or {},
"type": "group",
"debug": IS_DEBUG_ENV,
}
return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))
def _enqueue(self, msg: str) -> Tuple[bool, str]:
"""Method to queue messages to be sent.
Args:
msg: The message to queue.
Returns:
Tuple (success flag, the original message).
"""
# if send is False, return msg as if it was successfully queued
if not self.send:
return True, msg
post(timeout=self.timeout, batch=[msg])
return True, msg
__init__(self, send=True, timeout=15)
special
Initialization of the client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
send |
bool |
Flag to determine whether to send the message. |
True |
timeout |
int |
Timeout in seconds. |
15 |
Source code in zenml/analytics/client.py
def __init__(self, send: bool = True, timeout: int = 15) -> None:
"""Initialization of the client.
Args:
send: Flag to determine whether to send the message.
timeout: Timeout in seconds.
"""
self.send = send
self.timeout = timeout
alias(self, user_id, previous_id)
Method to alias user IDs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
UUID |
The user ID. |
required |
previous_id |
UUID |
Previous ID for the alias. |
required |
Returns:
Type | Description |
---|---|
Tuple[bool, str] |
Tuple (success flag, the original message). |
Source code in zenml/analytics/client.py
def alias(self, user_id: UUID, previous_id: UUID) -> Tuple[bool, str]:
"""Method to alias user IDs.
Args:
user_id: The user ID.
previous_id: Previous ID for the alias.
Returns:
Tuple (success flag, the original message).
"""
msg = {
"user_id": user_id,
"previous_id": previous_id,
"type": "alias",
"debug": IS_DEBUG_ENV,
}
return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))
group(self, user_id, group_id, traits)
Method to group users.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
UUID |
The user ID. |
required |
group_id |
UUID |
The group ID. |
required |
traits |
Optional[Dict[Any, Any]] |
Traits to assign to the group. |
required |
Returns:
Type | Description |
---|---|
Tuple[bool, str] |
Tuple (success flag, the original message). |
Source code in zenml/analytics/client.py
def group(
self, user_id: UUID, group_id: UUID, traits: Optional[Dict[Any, Any]]
) -> Tuple[bool, str]:
"""Method to group users.
Args:
user_id: The user ID.
group_id: The group ID.
traits: Traits to assign to the group.
Returns:
Tuple (success flag, the original message).
"""
msg = {
"user_id": user_id,
"group_id": group_id,
"traits": traits or {},
"type": "group",
"debug": IS_DEBUG_ENV,
}
return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))
identify(self, user_id, traits)
Method to identify a user with given traits.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
UUID |
The user ID. |
required |
traits |
Optional[Dict[Any, Any]] |
The traits for the identification process. |
required |
Returns:
Type | Description |
---|---|
Tuple[bool, str] |
Tuple (success flag, the original message). |
Source code in zenml/analytics/client.py
def identify(
self, user_id: UUID, traits: Optional[Dict[Any, Any]]
) -> Tuple[bool, str]:
"""Method to identify a user with given traits.
Args:
user_id: The user ID.
traits: The traits for the identification process.
Returns:
Tuple (success flag, the original message).
"""
msg = {
"user_id": user_id,
"traits": traits or {},
"type": "identify",
"debug": IS_DEBUG_ENV,
}
return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))
track(self, user_id, event, properties)
Method to track events.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
UUID |
The user ID. |
required |
event |
AnalyticsEvent |
The type of the event. |
required |
properties |
Optional[Dict[Any, Any]] |
Dict of additional properties for the event. |
required |
Returns:
Type | Description |
---|---|
Tuple[bool, str] |
Tuple (success flag, the original message). |
Source code in zenml/analytics/client.py
def track(
self,
user_id: UUID,
event: "AnalyticsEvent",
properties: Optional[Dict[Any, Any]],
) -> Tuple[bool, str]:
"""Method to track events.
Args:
user_id: The user ID.
event: The type of the event.
properties: Dict of additional properties for the event.
Returns:
Tuple (success flag, the original message).
"""
msg = {
"user_id": user_id,
"event": event,
"properties": properties or {},
"type": "track",
"debug": IS_DEBUG_ENV,
}
return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))
context
The analytics module of ZenML.
This module is based on the 'analytics-python' package created by Segment. The base functionalities are adapted to work with the ZenML analytics server.
AnalyticsContext
Client class for ZenML Analytics v2.
Source code in zenml/analytics/context.py
class AnalyticsContext:
"""Client class for ZenML Analytics v2."""
def __init__(self) -> None:
"""Initialization.
Use this as a context manager to ensure that analytics are initialized
properly, only tracked when configured to do so and that any errors
are handled gracefully.
"""
self.analytics_opt_in: bool = False
self.user_id: Optional[UUID] = None
self.external_user_id: Optional[UUID] = None
self.executed_by_service_account: Optional[bool] = None
self.client_id: Optional[UUID] = None
self.server_id: Optional[UUID] = None
self.external_server_id: Optional[UUID] = None
self.server_metadata: Optional[Dict[str, str]] = None
self.database_type: Optional["ServerDatabaseType"] = None
self.deployment_type: Optional["ServerDeploymentType"] = None
def __enter__(self) -> "AnalyticsContext":
"""Enter analytics context manager.
Returns:
The analytics context.
"""
# Fetch the analytics opt-in setting
from zenml.config.global_config import GlobalConfiguration
try:
gc = GlobalConfiguration()
store_info = gc.zen_store.get_store_info()
if self.in_server:
self.analytics_opt_in = store_info.analytics_enabled
else:
self.analytics_opt_in = gc.analytics_opt_in
if not self.analytics_opt_in:
return self
# Fetch the `user_id`
if self.in_server:
from zenml.zen_server.auth import get_auth_context
from zenml.zen_server.utils import server_config
# If the code is running on the server, use the auth context.
auth_context = get_auth_context()
if auth_context is not None:
self.user_id = auth_context.user.id
self.executed_by_service_account = (
auth_context.user.is_service_account
)
self.external_user_id = auth_context.user.external_user_id
self.external_server_id = server_config().external_server_id
else:
# If the code is running on the client, use the default user.
active_user = gc.zen_store.get_user()
self.user_id = active_user.id
self.executed_by_service_account = (
active_user.is_service_account
)
self.external_user_id = active_user.external_user_id
# Fetch the `client_id`
if self.in_server:
# If the code is running on the server, there is no client id.
self.client_id = None
else:
# If the code is running on the client, attach the client id.
self.client_id = gc.user_id
self.server_id = store_info.id
self.deployment_type = store_info.deployment_type
self.database_type = store_info.database_type
self.server_metadata = store_info.metadata
except Exception as e:
self.analytics_opt_in = False
logger.debug(f"Analytics initialization failed: {e}")
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> bool:
"""Exit context manager.
Args:
exc_type: Exception type.
exc_val: Exception value.
exc_tb: Exception traceback.
Returns:
True.
"""
if exc_val is not None:
logger.debug(f"Sending telemetry data failed: {exc_val}")
return True
@property
def in_server(self) -> bool:
"""Flag to check whether the code is running in a ZenML server.
Returns:
True if running in a server, False otherwise.
"""
return handle_bool_env_var(ENV_ZENML_SERVER)
def identify(self, traits: Optional[Dict[str, Any]] = None) -> bool:
"""Identify the user through segment.
Args:
traits: Traits of the user.
Returns:
True if tracking information was sent, False otherwise.
"""
success = False
if self.analytics_opt_in and self.user_id is not None:
success, _ = default_client.identify(
user_id=self.user_id,
traits=traits,
)
return success
def alias(self, user_id: UUID, previous_id: UUID) -> bool:
"""Alias user IDs.
Args:
user_id: The user ID.
previous_id: Previous ID for the alias.
Returns:
True if alias information was sent, False otherwise.
"""
success = False
if self.analytics_opt_in:
success, _ = default_client.alias(
user_id=user_id,
previous_id=previous_id,
)
return success
def group(
self,
group_id: UUID,
traits: Optional[Dict[str, Any]] = None,
) -> bool:
"""Group the user.
Args:
group_id: Group ID.
traits: Traits of the group.
Returns:
True if tracking information was sent, False otherwise.
"""
success = False
if self.analytics_opt_in and self.user_id is not None:
success, _ = default_client.group(
user_id=self.user_id,
group_id=group_id,
traits=traits or {},
)
return success
def track(
self,
event: "AnalyticsEvent",
properties: Optional[Dict[str, Any]] = None,
) -> bool:
"""Track an event.
Args:
event: Event to track.
properties: Event properties.
Returns:
True if tracking information was sent, False otherwise.
"""
from zenml.analytics.enums import AnalyticsEvent
if properties is None:
properties = {}
if (
not self.analytics_opt_in
and event.value
not in {
AnalyticsEvent.OPT_OUT_ANALYTICS,
AnalyticsEvent.OPT_IN_ANALYTICS,
}
or self.user_id is None
):
return False
# add basics
properties.update(Environment.get_system_info())
properties.update(
{
"environment": get_environment(),
"python_version": Environment.python_version(),
"version": __version__,
"client_id": str(self.client_id),
"user_id": str(self.user_id),
"server_id": str(self.server_id),
"deployment_type": str(self.deployment_type),
"database_type": str(self.database_type),
"executed_by_service_account": self.executed_by_service_account,
}
)
try:
# Timezone as tzdata
tz = (
datetime.datetime.now(datetime.timezone.utc)
.astimezone()
.tzname()
)
if tz is not None:
properties.update({"timezone": tz})
# Language code such as "en_DE"
language_code, encoding = locale.getlocale()
if language_code is not None:
properties.update({"locale": language_code})
except Exception:
pass
if self.external_user_id:
properties["external_user_id"] = self.external_user_id
if self.external_server_id:
properties["external_server_id"] = self.external_server_id
if self.server_metadata:
properties.update(self.server_metadata)
for k, v in properties.items():
if isinstance(v, UUID):
properties[k] = str(v)
success, _ = default_client.track(
user_id=self.user_id,
event=event,
properties=properties,
)
logger.debug(
f"Sending analytics: User: {self.user_id}, Event: {event}, "
f"Metadata: {properties}"
)
return success
in_server: bool
property
readonly
Flag to check whether the code is running in a ZenML server.
Returns:
Type | Description |
---|---|
bool |
True if running in a server, False otherwise. |
__enter__(self)
special
Enter analytics context manager.
Returns:
Type | Description |
---|---|
AnalyticsContext |
The analytics context. |
Source code in zenml/analytics/context.py
def __enter__(self) -> "AnalyticsContext":
"""Enter analytics context manager.
Returns:
The analytics context.
"""
# Fetch the analytics opt-in setting
from zenml.config.global_config import GlobalConfiguration
try:
gc = GlobalConfiguration()
store_info = gc.zen_store.get_store_info()
if self.in_server:
self.analytics_opt_in = store_info.analytics_enabled
else:
self.analytics_opt_in = gc.analytics_opt_in
if not self.analytics_opt_in:
return self
# Fetch the `user_id`
if self.in_server:
from zenml.zen_server.auth import get_auth_context
from zenml.zen_server.utils import server_config
# If the code is running on the server, use the auth context.
auth_context = get_auth_context()
if auth_context is not None:
self.user_id = auth_context.user.id
self.executed_by_service_account = (
auth_context.user.is_service_account
)
self.external_user_id = auth_context.user.external_user_id
self.external_server_id = server_config().external_server_id
else:
# If the code is running on the client, use the default user.
active_user = gc.zen_store.get_user()
self.user_id = active_user.id
self.executed_by_service_account = (
active_user.is_service_account
)
self.external_user_id = active_user.external_user_id
# Fetch the `client_id`
if self.in_server:
# If the code is running on the server, there is no client id.
self.client_id = None
else:
# If the code is running on the client, attach the client id.
self.client_id = gc.user_id
self.server_id = store_info.id
self.deployment_type = store_info.deployment_type
self.database_type = store_info.database_type
self.server_metadata = store_info.metadata
except Exception as e:
self.analytics_opt_in = False
logger.debug(f"Analytics initialization failed: {e}")
return self
__exit__(self, exc_type, exc_val, exc_tb)
special
Exit context manager.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
exc_type |
Optional[Type[BaseException]] |
Exception type. |
required |
exc_val |
Optional[BaseException] |
Exception value. |
required |
exc_tb |
Optional[traceback] |
Exception traceback. |
required |
Returns:
Type | Description |
---|---|
bool |
True. |
Source code in zenml/analytics/context.py
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> bool:
"""Exit context manager.
Args:
exc_type: Exception type.
exc_val: Exception value.
exc_tb: Exception traceback.
Returns:
True.
"""
if exc_val is not None:
logger.debug(f"Sending telemetry data failed: {exc_val}")
return True
__init__(self)
special
Initialization.
Use this as a context manager to ensure that analytics are initialized properly, only tracked when configured to do so and that any errors are handled gracefully.
Source code in zenml/analytics/context.py
def __init__(self) -> None:
"""Initialization.
Use this as a context manager to ensure that analytics are initialized
properly, only tracked when configured to do so and that any errors
are handled gracefully.
"""
self.analytics_opt_in: bool = False
self.user_id: Optional[UUID] = None
self.external_user_id: Optional[UUID] = None
self.executed_by_service_account: Optional[bool] = None
self.client_id: Optional[UUID] = None
self.server_id: Optional[UUID] = None
self.external_server_id: Optional[UUID] = None
self.server_metadata: Optional[Dict[str, str]] = None
self.database_type: Optional["ServerDatabaseType"] = None
self.deployment_type: Optional["ServerDeploymentType"] = None
alias(self, user_id, previous_id)
Alias user IDs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
UUID |
The user ID. |
required |
previous_id |
UUID |
Previous ID for the alias. |
required |
Returns:
Type | Description |
---|---|
bool |
True if alias information was sent, False otherwise. |
Source code in zenml/analytics/context.py
def alias(self, user_id: UUID, previous_id: UUID) -> bool:
"""Alias user IDs.
Args:
user_id: The user ID.
previous_id: Previous ID for the alias.
Returns:
True if alias information was sent, False otherwise.
"""
success = False
if self.analytics_opt_in:
success, _ = default_client.alias(
user_id=user_id,
previous_id=previous_id,
)
return success
group(self, group_id, traits=None)
Group the user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
group_id |
UUID |
Group ID. |
required |
traits |
Optional[Dict[str, Any]] |
Traits of the group. |
None |
Returns:
Type | Description |
---|---|
bool |
True if tracking information was sent, False otherwise. |
Source code in zenml/analytics/context.py
def group(
self,
group_id: UUID,
traits: Optional[Dict[str, Any]] = None,
) -> bool:
"""Group the user.
Args:
group_id: Group ID.
traits: Traits of the group.
Returns:
True if tracking information was sent, False otherwise.
"""
success = False
if self.analytics_opt_in and self.user_id is not None:
success, _ = default_client.group(
user_id=self.user_id,
group_id=group_id,
traits=traits or {},
)
return success
identify(self, traits=None)
Identify the user through segment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
traits |
Optional[Dict[str, Any]] |
Traits of the user. |
None |
Returns:
Type | Description |
---|---|
bool |
True if tracking information was sent, False otherwise. |
Source code in zenml/analytics/context.py
def identify(self, traits: Optional[Dict[str, Any]] = None) -> bool:
"""Identify the user through segment.
Args:
traits: Traits of the user.
Returns:
True if tracking information was sent, False otherwise.
"""
success = False
if self.analytics_opt_in and self.user_id is not None:
success, _ = default_client.identify(
user_id=self.user_id,
traits=traits,
)
return success
track(self, event, properties=None)
Track an event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
AnalyticsEvent |
Event to track. |
required |
properties |
Optional[Dict[str, Any]] |
Event properties. |
None |
Returns:
Type | Description |
---|---|
bool |
True if tracking information was sent, False otherwise. |
Source code in zenml/analytics/context.py
def track(
self,
event: "AnalyticsEvent",
properties: Optional[Dict[str, Any]] = None,
) -> bool:
"""Track an event.
Args:
event: Event to track.
properties: Event properties.
Returns:
True if tracking information was sent, False otherwise.
"""
from zenml.analytics.enums import AnalyticsEvent
if properties is None:
properties = {}
if (
not self.analytics_opt_in
and event.value
not in {
AnalyticsEvent.OPT_OUT_ANALYTICS,
AnalyticsEvent.OPT_IN_ANALYTICS,
}
or self.user_id is None
):
return False
# add basics
properties.update(Environment.get_system_info())
properties.update(
{
"environment": get_environment(),
"python_version": Environment.python_version(),
"version": __version__,
"client_id": str(self.client_id),
"user_id": str(self.user_id),
"server_id": str(self.server_id),
"deployment_type": str(self.deployment_type),
"database_type": str(self.database_type),
"executed_by_service_account": self.executed_by_service_account,
}
)
try:
# Timezone as tzdata
tz = (
datetime.datetime.now(datetime.timezone.utc)
.astimezone()
.tzname()
)
if tz is not None:
properties.update({"timezone": tz})
# Language code such as "en_DE"
language_code, encoding = locale.getlocale()
if language_code is not None:
properties.update({"locale": language_code})
except Exception:
pass
if self.external_user_id:
properties["external_user_id"] = self.external_user_id
if self.external_server_id:
properties["external_server_id"] = self.external_server_id
if self.server_metadata:
properties.update(self.server_metadata)
for k, v in properties.items():
if isinstance(v, UUID):
properties[k] = str(v)
success, _ = default_client.track(
user_id=self.user_id,
event=event,
properties=properties,
)
logger.debug(
f"Sending analytics: User: {self.user_id}, Event: {event}, "
f"Metadata: {properties}"
)
return success
enums
Collection of analytics events for ZenML.
AnalyticsEvent (str, Enum)
Enum of events to track in segment.
Source code in zenml/analytics/enums.py
class AnalyticsEvent(str, Enum):
"""Enum of events to track in segment."""
# Login
DEVICE_VERIFIED = "Device verified"
# Onboarding
USER_ENRICHED = "User Enriched"
# Pipelines
RUN_PIPELINE = "Pipeline run"
RUN_PIPELINE_ENDED = "Pipeline run ended"
CREATE_PIPELINE = "Pipeline created"
BUILD_PIPELINE = "Pipeline built"
# Template
GENERATE_TEMPLATE = "Template generated"
# Components
REGISTERED_STACK_COMPONENT = "Stack component registered"
# Code repository
REGISTERED_CODE_REPOSITORY = "Code repository registered"
# Stack
REGISTERED_STACK = "Stack registered"
UPDATED_STACK = "Stack updated"
# Trigger
CREATED_TRIGGER = "Trigger created"
UPDATED_TRIGGER = "Trigger updated"
# Templates
CREATED_RUN_TEMPLATE = "Run template created"
EXECUTED_RUN_TEMPLATE = "Run templated executed"
# Model Control Plane
MODEL_DEPLOYED = "Model deployed"
CREATED_MODEL = "Model created"
CREATED_MODEL_VERSION = "Model Version created"
# Analytics opt in and out
OPT_IN_ANALYTICS = "Analytics opt-in"
OPT_OUT_ANALYTICS = "Analytics opt-out"
OPT_IN_OUT_EMAIL = "Response for Email prompt"
# Examples
RUN_ZENML_GO = "ZenML go"
# Workspaces
CREATED_WORKSPACE = "Workspace created"
# Flavor
CREATED_FLAVOR = "Flavor created"
# Secret
CREATED_SECRET = "Secret created"
# Service connector
CREATED_SERVICE_CONNECTOR = "Service connector created"
# Service account and API keys
CREATED_SERVICE_ACCOUNT = "Service account created"
# Full stack infrastructure deployment
DEPLOY_FULL_STACK = "Full stack deployed"
# Tag created
CREATED_TAG = "Tag created"
# Server Settings
SERVER_SETTINGS_UPDATED = "Server Settings Updated"
__format__(self, format_spec)
special
Default object formatter.
Source code in zenml/analytics/enums.py
def __format__(self, format_spec):
return str.__format__(str(self), format_spec)
models
Helper models for ZenML analytics.
AnalyticsTrackedModelMixin (BaseModel)
Mixin for models that are tracked through analytics events.
Classes that have information tracked in analytics events can inherit
from this mixin and implement the abstract methods. The @track_method
decorator will detect function arguments and return values that inherit
from this class and will include the ANALYTICS_FIELDS
attributes as
tracking metadata.
Source code in zenml/analytics/models.py
class AnalyticsTrackedModelMixin(BaseModel):
"""Mixin for models that are tracked through analytics events.
Classes that have information tracked in analytics events can inherit
from this mixin and implement the abstract methods. The `@track_method`
decorator will detect function arguments and return values that inherit
from this class and will include the `ANALYTICS_FIELDS` attributes as
tracking metadata.
"""
ANALYTICS_FIELDS: ClassVar[List[str]] = []
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Get the analytics metadata for the model.
Returns:
Dict of analytics metadata.
"""
metadata = {}
for field_name in self.ANALYTICS_FIELDS:
metadata[field_name] = getattr(self, field_name, None)
return metadata
get_analytics_metadata(self)
Get the analytics metadata for the model.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
Dict of analytics metadata. |
Source code in zenml/analytics/models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
"""Get the analytics metadata for the model.
Returns:
Dict of analytics metadata.
"""
metadata = {}
for field_name in self.ANALYTICS_FIELDS:
metadata[field_name] = getattr(self, field_name, None)
return metadata
request
The 'analytics' module of ZenML.
This module is based on the 'analytics-python' package created by Segment. The base functionalities are adapted to work with the ZenML analytics server.
post(batch, timeout=15)
Post a batch of messages to the ZenML analytics server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch |
List[str] |
The messages to send. |
required |
timeout |
int |
Timeout in seconds. |
15 |
Returns:
Type | Description |
---|---|
Response |
The response. |
Exceptions:
Type | Description |
---|---|
AnalyticsAPIError |
If the post request has failed. |
Source code in zenml/analytics/request.py
def post(batch: List[str], timeout: int = 15) -> requests.Response:
"""Post a batch of messages to the ZenML analytics server.
Args:
batch: The messages to send.
timeout: Timeout in seconds.
Returns:
The response.
Raises:
AnalyticsAPIError: If the post request has failed.
"""
from zenml.analytics import source_context
headers = {
"accept": "application/json",
"content-type": "application/json",
source_context.name: source_context.get().value,
}
response = requests.post(
url=ANALYTICS_SERVER_URL + "/batch",
headers=headers,
data=f"[{','.join(batch)}]",
timeout=timeout,
)
if response.status_code == 200:
logger.debug("data uploaded successfully")
return response
raise AnalyticsAPIError(response.status_code, response.text)
utils
Utility functions and classes for ZenML analytics.
AnalyticsAPIError (Exception)
Custom exception class for API-related errors.
Source code in zenml/analytics/utils.py
class AnalyticsAPIError(Exception):
"""Custom exception class for API-related errors."""
def __init__(self, status: int, message: str) -> None:
"""Initialization.
Args:
status: The status code of the response.
message: The text of the response.
"""
self.message = message
self.status = status
def __str__(self) -> str:
"""Method to represent the instance as a string.
Returns:
A representation of the message and the status code.
"""
msg = "[ZenML Analytics] {1}: {0}"
return msg.format(self.message, self.status)
__init__(self, status, message)
special
Initialization.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
status |
int |
The status code of the response. |
required |
message |
str |
The text of the response. |
required |
Source code in zenml/analytics/utils.py
def __init__(self, status: int, message: str) -> None:
"""Initialization.
Args:
status: The status code of the response.
message: The text of the response.
"""
self.message = message
self.status = status
__str__(self)
special
Method to represent the instance as a string.
Returns:
Type | Description |
---|---|
str |
A representation of the message and the status code. |
Source code in zenml/analytics/utils.py
def __str__(self) -> str:
"""Method to represent the instance as a string.
Returns:
A representation of the message and the status code.
"""
msg = "[ZenML Analytics] {1}: {0}"
return msg.format(self.message, self.status)
AnalyticsEncoder (JSONEncoder)
Helper encoder class for JSON serialization.
Source code in zenml/analytics/utils.py
class AnalyticsEncoder(json.JSONEncoder):
"""Helper encoder class for JSON serialization."""
def default(self, obj: Any) -> Any:
"""The default method to handle UUID and 'AnalyticsEvent' objects.
Args:
obj: The object to encode.
Returns:
The encoded object.
"""
from zenml.analytics.enums import AnalyticsEvent
# If the object is UUID, we simply return the value of UUID
if isinstance(obj, UUID):
return str(obj)
# If the object is an AnalyticsEvent, return its value
elif isinstance(obj, AnalyticsEvent):
return str(obj.value)
return json.JSONEncoder.default(self, obj)
default(self, obj)
The default method to handle UUID and 'AnalyticsEvent' objects.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj |
Any |
The object to encode. |
required |
Returns:
Type | Description |
---|---|
Any |
The encoded object. |
Source code in zenml/analytics/utils.py
def default(self, obj: Any) -> Any:
"""The default method to handle UUID and 'AnalyticsEvent' objects.
Args:
obj: The object to encode.
Returns:
The encoded object.
"""
from zenml.analytics.enums import AnalyticsEvent
# If the object is UUID, we simply return the value of UUID
if isinstance(obj, UUID):
return str(obj)
# If the object is an AnalyticsEvent, return its value
elif isinstance(obj, AnalyticsEvent):
return str(obj.value)
return json.JSONEncoder.default(self, obj)
analytics_disabler
Context manager which disables ZenML analytics.
Source code in zenml/analytics/utils.py
class analytics_disabler:
"""Context manager which disables ZenML analytics."""
def __init__(self) -> None:
"""Initialization of the context manager."""
self.original_value = os.environ.get(ENV_ZENML_ANALYTICS_OPT_IN)
def __enter__(self) -> None:
"""Disable the analytics."""
os.environ[ENV_ZENML_ANALYTICS_OPT_IN] = str(False)
def __exit__(
self,
exc_type: Optional[Any],
exc_value: Optional[Any],
traceback: Optional[Any],
) -> None:
"""Set it back to the original state.
Args:
exc_type: The class of the exception
exc_value: The instance of the exception
traceback: The traceback of the exception
"""
if self.original_value is not None:
os.environ[ENV_ZENML_ANALYTICS_OPT_IN] = self.original_value
else:
del os.environ[ENV_ZENML_ANALYTICS_OPT_IN]
__enter__(self)
special
Disable the analytics.
Source code in zenml/analytics/utils.py
def __enter__(self) -> None:
"""Disable the analytics."""
os.environ[ENV_ZENML_ANALYTICS_OPT_IN] = str(False)
__exit__(self, exc_type, exc_value, traceback)
special
Set it back to the original state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
exc_type |
Optional[Any] |
The class of the exception |
required |
exc_value |
Optional[Any] |
The instance of the exception |
required |
traceback |
Optional[Any] |
The traceback of the exception |
required |
Source code in zenml/analytics/utils.py
def __exit__(
self,
exc_type: Optional[Any],
exc_value: Optional[Any],
traceback: Optional[Any],
) -> None:
"""Set it back to the original state.
Args:
exc_type: The class of the exception
exc_value: The instance of the exception
traceback: The traceback of the exception
"""
if self.original_value is not None:
os.environ[ENV_ZENML_ANALYTICS_OPT_IN] = self.original_value
else:
del os.environ[ENV_ZENML_ANALYTICS_OPT_IN]
__init__(self)
special
Initialization of the context manager.
Source code in zenml/analytics/utils.py
def __init__(self) -> None:
"""Initialization of the context manager."""
self.original_value = os.environ.get(ENV_ZENML_ANALYTICS_OPT_IN)
track_handler
Context handler to enable tracking the success status of an event.
Source code in zenml/analytics/utils.py
class track_handler(object):
"""Context handler to enable tracking the success status of an event."""
def __init__(
self,
event: AnalyticsEvent,
metadata: Optional[Dict[str, Any]] = None,
):
"""Initialization of the context manager.
Args:
event: The type of the analytics event
metadata: The metadata of the event.
"""
self.event: AnalyticsEvent = event
self.metadata: Dict[str, Any] = metadata or {}
def __enter__(self) -> "track_handler":
"""Enter function of the event handler.
Returns:
the handler instance.
"""
return self
def __exit__(
self,
type_: Optional[Any],
value: Optional[Any],
traceback: Optional[Any],
) -> Any:
"""Exit function of the event handler.
Checks whether there was a traceback and updates the metadata
accordingly. Following the check, it calls the function to track the
event.
Args:
type_: The class of the exception
value: The instance of the exception
traceback: The traceback of the exception
"""
if traceback is not None:
self.metadata.update({"event_success": False})
else:
self.metadata.update({"event_success": True})
if type_ is not None:
self.metadata.update({"event_error_type": type_.__name__})
track(self.event, self.metadata)
__enter__(self)
special
Enter function of the event handler.
Returns:
Type | Description |
---|---|
track_handler |
the handler instance. |
Source code in zenml/analytics/utils.py
def __enter__(self) -> "track_handler":
"""Enter function of the event handler.
Returns:
the handler instance.
"""
return self
__exit__(self, type_, value, traceback)
special
Exit function of the event handler.
Checks whether there was a traceback and updates the metadata accordingly. Following the check, it calls the function to track the event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
type_ |
Optional[Any] |
The class of the exception |
required |
value |
Optional[Any] |
The instance of the exception |
required |
traceback |
Optional[Any] |
The traceback of the exception |
required |
Source code in zenml/analytics/utils.py
def __exit__(
self,
type_: Optional[Any],
value: Optional[Any],
traceback: Optional[Any],
) -> Any:
"""Exit function of the event handler.
Checks whether there was a traceback and updates the metadata
accordingly. Following the check, it calls the function to track the
event.
Args:
type_: The class of the exception
value: The instance of the exception
traceback: The traceback of the exception
"""
if traceback is not None:
self.metadata.update({"event_success": False})
else:
self.metadata.update({"event_success": True})
if type_ is not None:
self.metadata.update({"event_error_type": type_.__name__})
track(self.event, self.metadata)
__init__(self, event, metadata=None)
special
Initialization of the context manager.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
AnalyticsEvent |
The type of the analytics event |
required |
metadata |
Optional[Dict[str, Any]] |
The metadata of the event. |
None |
Source code in zenml/analytics/utils.py
def __init__(
self,
event: AnalyticsEvent,
metadata: Optional[Dict[str, Any]] = None,
):
"""Initialization of the context manager.
Args:
event: The type of the analytics event
metadata: The metadata of the event.
"""
self.event: AnalyticsEvent = event
self.metadata: Dict[str, Any] = metadata or {}
email_opt_int(opted_in, email, source)
Track the event of the users response to the email prompt, identify them.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
opted_in |
bool |
Did the user decide to opt-in |
required |
email |
Optional[str] |
The email the user optionally provided |
required |
source |
str |
Location when the user replied ["zenml go", "zenml server"] |
required |
Source code in zenml/analytics/utils.py
def email_opt_int(opted_in: bool, email: Optional[str], source: str) -> None:
"""Track the event of the users response to the email prompt, identify them.
Args:
opted_in: Did the user decide to opt-in
email: The email the user optionally provided
source: Location when the user replied ["zenml go", "zenml server"]
"""
# If the user opted in, associate email with the anonymous distinct ID
if opted_in and email is not None and email != "":
identify(metadata={"email": email, "source": source})
# Track that the user answered the prompt
track(
AnalyticsEvent.OPT_IN_OUT_EMAIL,
{"opted_in": opted_in, "source": source},
)
track_decorator(event)
Decorator to track event.
If the decorated function takes in a AnalyticsTrackedModelMixin
object as
an argument or returns one, it will be called to track the event. The return
value takes precedence over the argument when determining which object is
called to track the event.
If the decorated function is a method of a class that inherits from
AnalyticsTrackerMixin
, the parent object will be used to intermediate
tracking analytics.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
AnalyticsEvent |
Event string to stamp with. |
required |
Returns:
Type | Description |
---|---|
Callable[[~F], ~F] |
A decorator that applies the analytics tracking to a function. |
Source code in zenml/analytics/utils.py
def track_decorator(event: AnalyticsEvent) -> Callable[[F], F]:
"""Decorator to track event.
If the decorated function takes in a `AnalyticsTrackedModelMixin` object as
an argument or returns one, it will be called to track the event. The return
value takes precedence over the argument when determining which object is
called to track the event.
If the decorated function is a method of a class that inherits from
`AnalyticsTrackerMixin`, the parent object will be used to intermediate
tracking analytics.
Args:
event: Event string to stamp with.
Returns:
A decorator that applies the analytics tracking to a function.
"""
def inner_decorator(func: F) -> F:
"""Inner decorator function.
Args:
func: Function to decorate.
Returns:
Decorated function.
"""
@wraps(func)
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
with track_handler(event=event) as handler:
try:
for obj in list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
handler.metadata = obj.get_analytics_metadata()
break
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
result = func(*args, **kwargs)
try:
if isinstance(result, AnalyticsTrackedModelMixin):
handler.metadata = result.get_analytics_metadata()
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
return cast(F, inner_func)
return inner_decorator