Skip to content

Analytics

zenml.analytics special

The 'analytics' module of ZenML.

alias(user_id, previous_id)

Alias user IDs.

Parameters:

Name Type Description Default
user_id UUID

The user ID.

required
previous_id UUID

Previous ID for the alias.

required

Returns:

Type Description
bool

True if event is sent successfully, False is not.

Source code in zenml/analytics/__init__.py
def alias(user_id: UUID, previous_id: UUID) -> bool:  # type: ignore[return]
    """Alias user IDs.

    Args:
        user_id: The user ID.
        previous_id: Previous ID for the alias.

    Returns:
        True if event is sent successfully, False is not.
    """
    from zenml.analytics.context import AnalyticsContext

    with AnalyticsContext() as analytics:
        return analytics.alias(user_id=user_id, previous_id=previous_id)

group(group_id, group_metadata=None)

Attach metadata to a segment group.

Parameters:

Name Type Description Default
group_id UUID

ID of the group.

required
group_metadata Optional[Dict[str, Any]]

Metadata to attach to the group.

None

Returns:

Type Description
bool

True if event is sent successfully, False if not.

Source code in zenml/analytics/__init__.py
def group(  # type: ignore[return]
    group_id: UUID,
    group_metadata: Optional[Dict[str, Any]] = None,
) -> bool:
    """Attach metadata to a segment group.

    Args:
        group_id: ID of the group.
        group_metadata: Metadata to attach to the group.

    Returns:
        True if event is sent successfully, False if not.
    """
    from zenml.analytics.context import AnalyticsContext

    with AnalyticsContext() as analytics:
        return analytics.group(group_id=group_id, traits=group_metadata)

identify(metadata=None)

Attach metadata to user directly.

Parameters:

Name Type Description Default
metadata Optional[Dict[str, Any]]

Dict of metadata to attach to the user.

None

Returns:

Type Description
bool

True if event is sent successfully, False is not.

Source code in zenml/analytics/__init__.py
def identify(  # type: ignore[return]
    metadata: Optional[Dict[str, Any]] = None
) -> bool:
    """Attach metadata to user directly.

    Args:
        metadata: Dict of metadata to attach to the user.

    Returns:
        True if event is sent successfully, False is not.
    """
    from zenml.analytics.context import AnalyticsContext

    if metadata is None:
        return False

    with AnalyticsContext() as analytics:
        return analytics.identify(traits=metadata)

track(event, metadata=None)

Track segment event if user opted-in.

Parameters:

Name Type Description Default
event AnalyticsEvent

Name of event to track in segment.

required
metadata Optional[Dict[str, Any]]

Dict of metadata to track.

None

Returns:

Type Description
bool

True if event is sent successfully, False if not.

Source code in zenml/analytics/__init__.py
def track(  # type: ignore[return]
    event: "AnalyticsEvent",
    metadata: Optional[Dict[str, Any]] = None,
) -> bool:
    """Track segment event if user opted-in.

    Args:
        event: Name of event to track in segment.
        metadata: Dict of metadata to track.

    Returns:
        True if event is sent successfully, False if not.
    """
    from zenml.analytics.context import AnalyticsContext

    if metadata is None:
        metadata = {}

    metadata.setdefault("event_success", True)

    with AnalyticsContext() as analytics:
        return analytics.track(event=event, properties=metadata)

client

The analytics client of ZenML.

Client

The client class for ZenML analytics.

Source code in zenml/analytics/client.py
class Client(object):
    """The client class for ZenML analytics."""

    def __init__(self, send: bool = True, timeout: int = 15) -> None:
        """Initialization of the client.

        Args:
            send: Flag to determine whether to send the message.
            timeout: Timeout in seconds.
        """
        self.send = send
        self.timeout = timeout

    def identify(
        self, user_id: UUID, traits: Optional[Dict[Any, Any]]
    ) -> Tuple[bool, str]:
        """Method to identify a user with given traits.

        Args:
            user_id: The user ID.
            traits: The traits for the identification process.

        Returns:
            Tuple (success flag, the original message).
        """
        msg = {
            "user_id": user_id,
            "traits": traits or {},
            "type": "identify",
            "debug": IS_DEBUG_ENV,
        }
        return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))

    def alias(self, user_id: UUID, previous_id: UUID) -> Tuple[bool, str]:
        """Method to alias user IDs.

        Args:
            user_id: The user ID.
            previous_id: Previous ID for the alias.

        Returns:
            Tuple (success flag, the original message).
        """
        msg = {
            "user_id": user_id,
            "previous_id": previous_id,
            "type": "alias",
            "debug": IS_DEBUG_ENV,
        }
        return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))

    def track(
        self,
        user_id: UUID,
        event: "AnalyticsEvent",
        properties: Optional[Dict[Any, Any]],
    ) -> Tuple[bool, str]:
        """Method to track events.

        Args:
            user_id: The user ID.
            event: The type of the event.
            properties: Dict of additional properties for the event.

        Returns:
            Tuple (success flag, the original message).
        """
        msg = {
            "user_id": user_id,
            "event": event,
            "properties": properties or {},
            "type": "track",
            "debug": IS_DEBUG_ENV,
        }
        return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))

    def group(
        self, user_id: UUID, group_id: UUID, traits: Optional[Dict[Any, Any]]
    ) -> Tuple[bool, str]:
        """Method to group users.

        Args:
            user_id: The user ID.
            group_id: The group ID.
            traits: Traits to assign to the group.

        Returns:
            Tuple (success flag, the original message).
        """
        msg = {
            "user_id": user_id,
            "group_id": group_id,
            "traits": traits or {},
            "type": "group",
            "debug": IS_DEBUG_ENV,
        }
        return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))

    def _enqueue(self, msg: str) -> Tuple[bool, str]:
        """Method to queue messages to be sent.

        Args:
            msg: The message to queue.

        Returns:
            Tuple (success flag, the original message).
        """
        # if send is False, return msg as if it was successfully queued
        if not self.send:
            return True, msg

        post(timeout=self.timeout, batch=[msg])
        return True, msg
__init__(self, send=True, timeout=15) special

Initialization of the client.

Parameters:

Name Type Description Default
send bool

Flag to determine whether to send the message.

True
timeout int

Timeout in seconds.

15
Source code in zenml/analytics/client.py
def __init__(self, send: bool = True, timeout: int = 15) -> None:
    """Initialization of the client.

    Args:
        send: Flag to determine whether to send the message.
        timeout: Timeout in seconds.
    """
    self.send = send
    self.timeout = timeout
alias(self, user_id, previous_id)

Method to alias user IDs.

Parameters:

Name Type Description Default
user_id UUID

The user ID.

required
previous_id UUID

Previous ID for the alias.

required

Returns:

Type Description
Tuple[bool, str]

Tuple (success flag, the original message).

Source code in zenml/analytics/client.py
def alias(self, user_id: UUID, previous_id: UUID) -> Tuple[bool, str]:
    """Method to alias user IDs.

    Args:
        user_id: The user ID.
        previous_id: Previous ID for the alias.

    Returns:
        Tuple (success flag, the original message).
    """
    msg = {
        "user_id": user_id,
        "previous_id": previous_id,
        "type": "alias",
        "debug": IS_DEBUG_ENV,
    }
    return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))
group(self, user_id, group_id, traits)

Method to group users.

Parameters:

Name Type Description Default
user_id UUID

The user ID.

required
group_id UUID

The group ID.

required
traits Optional[Dict[Any, Any]]

Traits to assign to the group.

required

Returns:

Type Description
Tuple[bool, str]

Tuple (success flag, the original message).

Source code in zenml/analytics/client.py
def group(
    self, user_id: UUID, group_id: UUID, traits: Optional[Dict[Any, Any]]
) -> Tuple[bool, str]:
    """Method to group users.

    Args:
        user_id: The user ID.
        group_id: The group ID.
        traits: Traits to assign to the group.

    Returns:
        Tuple (success flag, the original message).
    """
    msg = {
        "user_id": user_id,
        "group_id": group_id,
        "traits": traits or {},
        "type": "group",
        "debug": IS_DEBUG_ENV,
    }
    return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))
identify(self, user_id, traits)

Method to identify a user with given traits.

Parameters:

Name Type Description Default
user_id UUID

The user ID.

required
traits Optional[Dict[Any, Any]]

The traits for the identification process.

required

Returns:

Type Description
Tuple[bool, str]

Tuple (success flag, the original message).

Source code in zenml/analytics/client.py
def identify(
    self, user_id: UUID, traits: Optional[Dict[Any, Any]]
) -> Tuple[bool, str]:
    """Method to identify a user with given traits.

    Args:
        user_id: The user ID.
        traits: The traits for the identification process.

    Returns:
        Tuple (success flag, the original message).
    """
    msg = {
        "user_id": user_id,
        "traits": traits or {},
        "type": "identify",
        "debug": IS_DEBUG_ENV,
    }
    return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))
track(self, user_id, event, properties)

Method to track events.

Parameters:

Name Type Description Default
user_id UUID

The user ID.

required
event AnalyticsEvent

The type of the event.

required
properties Optional[Dict[Any, Any]]

Dict of additional properties for the event.

required

Returns:

Type Description
Tuple[bool, str]

Tuple (success flag, the original message).

Source code in zenml/analytics/client.py
def track(
    self,
    user_id: UUID,
    event: "AnalyticsEvent",
    properties: Optional[Dict[Any, Any]],
) -> Tuple[bool, str]:
    """Method to track events.

    Args:
        user_id: The user ID.
        event: The type of the event.
        properties: Dict of additional properties for the event.

    Returns:
        Tuple (success flag, the original message).
    """
    msg = {
        "user_id": user_id,
        "event": event,
        "properties": properties or {},
        "type": "track",
        "debug": IS_DEBUG_ENV,
    }
    return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))

context

The analytics module of ZenML.

This module is based on the 'analytics-python' package created by Segment. The base functionalities are adapted to work with the ZenML analytics server.

AnalyticsContext

Client class for ZenML Analytics v2.

Source code in zenml/analytics/context.py
class AnalyticsContext:
    """Client class for ZenML Analytics v2."""

    def __init__(self) -> None:
        """Initialization.

        Use this as a context manager to ensure that analytics are initialized
        properly, only tracked when configured to do so and that any errors
        are handled gracefully.
        """
        self.analytics_opt_in: bool = False

        self.user_id: Optional[UUID] = None
        self.external_user_id: Optional[UUID] = None
        self.executed_by_service_account: Optional[bool] = None
        self.client_id: Optional[UUID] = None
        self.server_id: Optional[UUID] = None
        self.external_server_id: Optional[UUID] = None
        self.server_metadata: Optional[Dict[str, str]] = None

        self.database_type: Optional["ServerDatabaseType"] = None
        self.deployment_type: Optional["ServerDeploymentType"] = None

    def __enter__(self) -> "AnalyticsContext":
        """Enter analytics context manager.

        Returns:
            The analytics context.
        """
        # Fetch the analytics opt-in setting
        from zenml.config.global_config import GlobalConfiguration

        try:
            gc = GlobalConfiguration()
            store_info = gc.zen_store.get_store_info()

            if self.in_server:
                self.analytics_opt_in = store_info.analytics_enabled
            else:
                self.analytics_opt_in = gc.analytics_opt_in

            if not self.analytics_opt_in:
                return self

            # Fetch the `user_id`
            if self.in_server:
                from zenml.zen_server.auth import get_auth_context
                from zenml.zen_server.utils import server_config

                # If the code is running on the server, use the auth context.
                auth_context = get_auth_context()
                if auth_context is not None:
                    self.user_id = auth_context.user.id
                    self.executed_by_service_account = (
                        auth_context.user.is_service_account
                    )
                    self.external_user_id = auth_context.user.external_user_id

                self.external_server_id = server_config().external_server_id
            else:
                # If the code is running on the client, use the default user.
                active_user = gc.zen_store.get_user()
                self.user_id = active_user.id
                self.executed_by_service_account = (
                    active_user.is_service_account
                )
                self.external_user_id = active_user.external_user_id

            # Fetch the `client_id`
            if self.in_server:
                # If the code is running on the server, there is no client id.
                self.client_id = None
            else:
                # If the code is running on the client, attach the client id.
                self.client_id = gc.user_id

            self.server_id = store_info.id
            self.deployment_type = store_info.deployment_type
            self.database_type = store_info.database_type
            self.server_metadata = store_info.metadata
        except Exception as e:
            self.analytics_opt_in = False
            logger.debug(f"Analytics initialization failed: {e}")

        return self

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_val: Optional[BaseException],
        exc_tb: Optional[TracebackType],
    ) -> bool:
        """Exit context manager.

        Args:
            exc_type: Exception type.
            exc_val: Exception value.
            exc_tb: Exception traceback.

        Returns:
            True.
        """
        if exc_val is not None:
            logger.debug(f"Sending telemetry data failed: {exc_val}")

        return True

    @property
    def in_server(self) -> bool:
        """Flag to check whether the code is running in a ZenML server.

        Returns:
            True if running in a server, False otherwise.
        """
        return handle_bool_env_var(ENV_ZENML_SERVER)

    def identify(self, traits: Optional[Dict[str, Any]] = None) -> bool:
        """Identify the user through segment.

        Args:
            traits: Traits of the user.

        Returns:
            True if tracking information was sent, False otherwise.
        """
        success = False
        if self.analytics_opt_in and self.user_id is not None:
            success, _ = default_client.identify(
                user_id=self.user_id,
                traits=traits,
            )

        return success

    def alias(self, user_id: UUID, previous_id: UUID) -> bool:
        """Alias user IDs.

        Args:
            user_id: The user ID.
            previous_id: Previous ID for the alias.

        Returns:
            True if alias information was sent, False otherwise.
        """
        success = False
        if self.analytics_opt_in:
            success, _ = default_client.alias(
                user_id=user_id,
                previous_id=previous_id,
            )

        return success

    def group(
        self,
        group_id: UUID,
        traits: Optional[Dict[str, Any]] = None,
    ) -> bool:
        """Group the user.

        Args:
            group_id: Group ID.
            traits: Traits of the group.

        Returns:
            True if tracking information was sent, False otherwise.
        """
        success = False
        if self.analytics_opt_in and self.user_id is not None:
            success, _ = default_client.group(
                user_id=self.user_id,
                group_id=group_id,
                traits=traits or {},
            )

        return success

    def track(
        self,
        event: "AnalyticsEvent",
        properties: Optional[Dict[str, Any]] = None,
    ) -> bool:
        """Track an event.

        Args:
            event: Event to track.
            properties: Event properties.

        Returns:
            True if tracking information was sent, False otherwise.
        """
        from zenml.analytics.enums import AnalyticsEvent

        if properties is None:
            properties = {}

        if (
            not self.analytics_opt_in
            and event.value
            not in {
                AnalyticsEvent.OPT_OUT_ANALYTICS,
                AnalyticsEvent.OPT_IN_ANALYTICS,
            }
            or self.user_id is None
        ):
            return False

        # add basics
        properties.update(Environment.get_system_info())
        properties.update(
            {
                "environment": get_environment(),
                "python_version": Environment.python_version(),
                "version": __version__,
                "client_id": str(self.client_id),
                "user_id": str(self.user_id),
                "server_id": str(self.server_id),
                "deployment_type": str(self.deployment_type),
                "database_type": str(self.database_type),
                "executed_by_service_account": self.executed_by_service_account,
            }
        )

        try:
            # Timezone as tzdata
            tz = (
                datetime.datetime.now(datetime.timezone.utc)
                .astimezone()
                .tzname()
            )
            if tz is not None:
                properties.update({"timezone": tz})

            # Language code such as "en_DE"
            language_code, encoding = locale.getlocale()
            if language_code is not None:
                properties.update({"locale": language_code})
        except Exception:
            pass

        if self.external_user_id:
            properties["external_user_id"] = self.external_user_id

        if self.external_server_id:
            properties["external_server_id"] = self.external_server_id

        if self.server_metadata:
            properties.update(self.server_metadata)

        for k, v in properties.items():
            if isinstance(v, UUID):
                properties[k] = str(v)

        success, _ = default_client.track(
            user_id=self.user_id,
            event=event,
            properties=properties,
        )

        logger.debug(
            f"Sending analytics: User: {self.user_id}, Event: {event}, "
            f"Metadata: {properties}"
        )

        return success
in_server: bool property readonly

Flag to check whether the code is running in a ZenML server.

Returns:

Type Description
bool

True if running in a server, False otherwise.

__enter__(self) special

Enter analytics context manager.

Returns:

Type Description
AnalyticsContext

The analytics context.

Source code in zenml/analytics/context.py
def __enter__(self) -> "AnalyticsContext":
    """Enter analytics context manager.

    Returns:
        The analytics context.
    """
    # Fetch the analytics opt-in setting
    from zenml.config.global_config import GlobalConfiguration

    try:
        gc = GlobalConfiguration()
        store_info = gc.zen_store.get_store_info()

        if self.in_server:
            self.analytics_opt_in = store_info.analytics_enabled
        else:
            self.analytics_opt_in = gc.analytics_opt_in

        if not self.analytics_opt_in:
            return self

        # Fetch the `user_id`
        if self.in_server:
            from zenml.zen_server.auth import get_auth_context
            from zenml.zen_server.utils import server_config

            # If the code is running on the server, use the auth context.
            auth_context = get_auth_context()
            if auth_context is not None:
                self.user_id = auth_context.user.id
                self.executed_by_service_account = (
                    auth_context.user.is_service_account
                )
                self.external_user_id = auth_context.user.external_user_id

            self.external_server_id = server_config().external_server_id
        else:
            # If the code is running on the client, use the default user.
            active_user = gc.zen_store.get_user()
            self.user_id = active_user.id
            self.executed_by_service_account = (
                active_user.is_service_account
            )
            self.external_user_id = active_user.external_user_id

        # Fetch the `client_id`
        if self.in_server:
            # If the code is running on the server, there is no client id.
            self.client_id = None
        else:
            # If the code is running on the client, attach the client id.
            self.client_id = gc.user_id

        self.server_id = store_info.id
        self.deployment_type = store_info.deployment_type
        self.database_type = store_info.database_type
        self.server_metadata = store_info.metadata
    except Exception as e:
        self.analytics_opt_in = False
        logger.debug(f"Analytics initialization failed: {e}")

    return self
__exit__(self, exc_type, exc_val, exc_tb) special

Exit context manager.

Parameters:

Name Type Description Default
exc_type Optional[Type[BaseException]]

Exception type.

required
exc_val Optional[BaseException]

Exception value.

required
exc_tb Optional[traceback]

Exception traceback.

required

Returns:

Type Description
bool

True.

Source code in zenml/analytics/context.py
def __exit__(
    self,
    exc_type: Optional[Type[BaseException]],
    exc_val: Optional[BaseException],
    exc_tb: Optional[TracebackType],
) -> bool:
    """Exit context manager.

    Args:
        exc_type: Exception type.
        exc_val: Exception value.
        exc_tb: Exception traceback.

    Returns:
        True.
    """
    if exc_val is not None:
        logger.debug(f"Sending telemetry data failed: {exc_val}")

    return True
__init__(self) special

Initialization.

Use this as a context manager to ensure that analytics are initialized properly, only tracked when configured to do so and that any errors are handled gracefully.

Source code in zenml/analytics/context.py
def __init__(self) -> None:
    """Initialization.

    Use this as a context manager to ensure that analytics are initialized
    properly, only tracked when configured to do so and that any errors
    are handled gracefully.
    """
    self.analytics_opt_in: bool = False

    self.user_id: Optional[UUID] = None
    self.external_user_id: Optional[UUID] = None
    self.executed_by_service_account: Optional[bool] = None
    self.client_id: Optional[UUID] = None
    self.server_id: Optional[UUID] = None
    self.external_server_id: Optional[UUID] = None
    self.server_metadata: Optional[Dict[str, str]] = None

    self.database_type: Optional["ServerDatabaseType"] = None
    self.deployment_type: Optional["ServerDeploymentType"] = None
alias(self, user_id, previous_id)

Alias user IDs.

Parameters:

Name Type Description Default
user_id UUID

The user ID.

required
previous_id UUID

Previous ID for the alias.

required

Returns:

Type Description
bool

True if alias information was sent, False otherwise.

Source code in zenml/analytics/context.py
def alias(self, user_id: UUID, previous_id: UUID) -> bool:
    """Alias user IDs.

    Args:
        user_id: The user ID.
        previous_id: Previous ID for the alias.

    Returns:
        True if alias information was sent, False otherwise.
    """
    success = False
    if self.analytics_opt_in:
        success, _ = default_client.alias(
            user_id=user_id,
            previous_id=previous_id,
        )

    return success
group(self, group_id, traits=None)

Group the user.

Parameters:

Name Type Description Default
group_id UUID

Group ID.

required
traits Optional[Dict[str, Any]]

Traits of the group.

None

Returns:

Type Description
bool

True if tracking information was sent, False otherwise.

Source code in zenml/analytics/context.py
def group(
    self,
    group_id: UUID,
    traits: Optional[Dict[str, Any]] = None,
) -> bool:
    """Group the user.

    Args:
        group_id: Group ID.
        traits: Traits of the group.

    Returns:
        True if tracking information was sent, False otherwise.
    """
    success = False
    if self.analytics_opt_in and self.user_id is not None:
        success, _ = default_client.group(
            user_id=self.user_id,
            group_id=group_id,
            traits=traits or {},
        )

    return success
identify(self, traits=None)

Identify the user through segment.

Parameters:

Name Type Description Default
traits Optional[Dict[str, Any]]

Traits of the user.

None

Returns:

Type Description
bool

True if tracking information was sent, False otherwise.

Source code in zenml/analytics/context.py
def identify(self, traits: Optional[Dict[str, Any]] = None) -> bool:
    """Identify the user through segment.

    Args:
        traits: Traits of the user.

    Returns:
        True if tracking information was sent, False otherwise.
    """
    success = False
    if self.analytics_opt_in and self.user_id is not None:
        success, _ = default_client.identify(
            user_id=self.user_id,
            traits=traits,
        )

    return success
track(self, event, properties=None)

Track an event.

Parameters:

Name Type Description Default
event AnalyticsEvent

Event to track.

required
properties Optional[Dict[str, Any]]

Event properties.

None

Returns:

Type Description
bool

True if tracking information was sent, False otherwise.

Source code in zenml/analytics/context.py
def track(
    self,
    event: "AnalyticsEvent",
    properties: Optional[Dict[str, Any]] = None,
) -> bool:
    """Track an event.

    Args:
        event: Event to track.
        properties: Event properties.

    Returns:
        True if tracking information was sent, False otherwise.
    """
    from zenml.analytics.enums import AnalyticsEvent

    if properties is None:
        properties = {}

    if (
        not self.analytics_opt_in
        and event.value
        not in {
            AnalyticsEvent.OPT_OUT_ANALYTICS,
            AnalyticsEvent.OPT_IN_ANALYTICS,
        }
        or self.user_id is None
    ):
        return False

    # add basics
    properties.update(Environment.get_system_info())
    properties.update(
        {
            "environment": get_environment(),
            "python_version": Environment.python_version(),
            "version": __version__,
            "client_id": str(self.client_id),
            "user_id": str(self.user_id),
            "server_id": str(self.server_id),
            "deployment_type": str(self.deployment_type),
            "database_type": str(self.database_type),
            "executed_by_service_account": self.executed_by_service_account,
        }
    )

    try:
        # Timezone as tzdata
        tz = (
            datetime.datetime.now(datetime.timezone.utc)
            .astimezone()
            .tzname()
        )
        if tz is not None:
            properties.update({"timezone": tz})

        # Language code such as "en_DE"
        language_code, encoding = locale.getlocale()
        if language_code is not None:
            properties.update({"locale": language_code})
    except Exception:
        pass

    if self.external_user_id:
        properties["external_user_id"] = self.external_user_id

    if self.external_server_id:
        properties["external_server_id"] = self.external_server_id

    if self.server_metadata:
        properties.update(self.server_metadata)

    for k, v in properties.items():
        if isinstance(v, UUID):
            properties[k] = str(v)

    success, _ = default_client.track(
        user_id=self.user_id,
        event=event,
        properties=properties,
    )

    logger.debug(
        f"Sending analytics: User: {self.user_id}, Event: {event}, "
        f"Metadata: {properties}"
    )

    return success

enums

Collection of analytics events for ZenML.

AnalyticsEvent (str, Enum)

Enum of events to track in segment.

Source code in zenml/analytics/enums.py
class AnalyticsEvent(str, Enum):
    """Enum of events to track in segment."""

    # Login
    DEVICE_VERIFIED = "Device verified"

    # Onboarding
    USER_ENRICHED = "User Enriched"

    # Pipelines
    RUN_PIPELINE = "Pipeline run"
    RUN_PIPELINE_ENDED = "Pipeline run ended"
    CREATE_PIPELINE = "Pipeline created"
    BUILD_PIPELINE = "Pipeline built"

    # Template
    GENERATE_TEMPLATE = "Template generated"

    # Components
    REGISTERED_STACK_COMPONENT = "Stack component registered"

    # Code repository
    REGISTERED_CODE_REPOSITORY = "Code repository registered"

    # Stack
    REGISTERED_STACK = "Stack registered"
    UPDATED_STACK = "Stack updated"

    # Trigger
    CREATED_TRIGGER = "Trigger created"
    UPDATED_TRIGGER = "Trigger updated"

    # Templates
    CREATED_RUN_TEMPLATE = "Run template created"
    EXECUTED_RUN_TEMPLATE = "Run templated executed"

    # Model Control Plane
    MODEL_DEPLOYED = "Model deployed"
    CREATED_MODEL = "Model created"
    CREATED_MODEL_VERSION = "Model Version created"

    # Analytics opt in and out
    OPT_IN_ANALYTICS = "Analytics opt-in"
    OPT_OUT_ANALYTICS = "Analytics opt-out"
    OPT_IN_OUT_EMAIL = "Response for Email prompt"

    # Examples
    RUN_ZENML_GO = "ZenML go"

    # Workspaces
    CREATED_WORKSPACE = "Workspace created"

    # Flavor
    CREATED_FLAVOR = "Flavor created"

    # Secret
    CREATED_SECRET = "Secret created"

    # Service connector
    CREATED_SERVICE_CONNECTOR = "Service connector created"

    # Service account and API keys
    CREATED_SERVICE_ACCOUNT = "Service account created"

    # Stack recipes
    RUN_STACK_RECIPE = "Stack recipe ran"
    DEPLOY_STACK = "Stack deployed"
    DESTROY_STACK = "Stack destroyed"

    # Stack component deploy
    DEPLOY_STACK_COMPONENT = "Stack component deployed"
    DESTROY_STACK_COMPONENT = "Stack component destroyed"

    # Full stack infrastructure deployment
    DEPLOY_FULL_STACK = "Full stack deployed"

    # Tag created
    CREATED_TAG = "Tag created"

    # ZenML server events
    ZENML_SERVER_DEPLOYED = "ZenML server deployed"
    ZENML_SERVER_DESTROYED = "ZenML server destroyed"

    # Server Settings
    SERVER_SETTINGS_UPDATED = "Server Settings Updated"
__format__(self, format_spec) special

Default object formatter.

Source code in zenml/analytics/enums.py
def __format__(self, format_spec):
    return str.__format__(str(self), format_spec)

models

Helper models for ZenML analytics.

AnalyticsTrackedModelMixin (BaseModel)

Mixin for models that are tracked through analytics events.

Classes that have information tracked in analytics events can inherit from this mixin and implement the abstract methods. The @track_method decorator will detect function arguments and return values that inherit from this class and will include the ANALYTICS_FIELDS attributes as tracking metadata.

Source code in zenml/analytics/models.py
class AnalyticsTrackedModelMixin(BaseModel):
    """Mixin for models that are tracked through analytics events.

    Classes that have information tracked in analytics events can inherit
    from this mixin and implement the abstract methods. The `@track_method`
    decorator will detect function arguments and return values that inherit
    from this class and will include the `ANALYTICS_FIELDS` attributes as
    tracking metadata.
    """

    ANALYTICS_FIELDS: ClassVar[List[str]] = []

    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Get the analytics metadata for the model.

        Returns:
            Dict of analytics metadata.
        """
        metadata = {}
        for field_name in self.ANALYTICS_FIELDS:
            metadata[field_name] = getattr(self, field_name, None)
        return metadata
get_analytics_metadata(self)

Get the analytics metadata for the model.

Returns:

Type Description
Dict[str, Any]

Dict of analytics metadata.

Source code in zenml/analytics/models.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Get the analytics metadata for the model.

    Returns:
        Dict of analytics metadata.
    """
    metadata = {}
    for field_name in self.ANALYTICS_FIELDS:
        metadata[field_name] = getattr(self, field_name, None)
    return metadata

request

The 'analytics' module of ZenML.

This module is based on the 'analytics-python' package created by Segment. The base functionalities are adapted to work with the ZenML analytics server.

post(batch, timeout=15)

Post a batch of messages to the ZenML analytics server.

Parameters:

Name Type Description Default
batch List[str]

The messages to send.

required
timeout int

Timeout in seconds.

15

Returns:

Type Description
Response

The response.

Exceptions:

Type Description
AnalyticsAPIError

If the post request has failed.

Source code in zenml/analytics/request.py
def post(batch: List[str], timeout: int = 15) -> requests.Response:
    """Post a batch of messages to the ZenML analytics server.

    Args:
        batch: The messages to send.
        timeout: Timeout in seconds.

    Returns:
        The response.

    Raises:
        AnalyticsAPIError: If the post request has failed.
    """
    from zenml.analytics import source_context

    headers = {
        "accept": "application/json",
        "content-type": "application/json",
        source_context.name: source_context.get().value,
    }
    response = requests.post(
        url=ANALYTICS_SERVER_URL + "/batch",
        headers=headers,
        data=f"[{','.join(batch)}]",
        timeout=timeout,
    )

    if response.status_code == 200:
        logger.debug("data uploaded successfully")
        return response

    raise AnalyticsAPIError(response.status_code, response.text)

utils

Utility functions and classes for ZenML analytics.

AnalyticsAPIError (Exception)

Custom exception class for API-related errors.

Source code in zenml/analytics/utils.py
class AnalyticsAPIError(Exception):
    """Custom exception class for API-related errors."""

    def __init__(self, status: int, message: str) -> None:
        """Initialization.

        Args:
            status: The status code of the response.
            message: The text of the response.
        """
        self.message = message
        self.status = status

    def __str__(self) -> str:
        """Method to represent the instance as a string.

        Returns:
            A representation of the message and the status code.
        """
        msg = "[ZenML Analytics] {1}: {0}"
        return msg.format(self.message, self.status)
__init__(self, status, message) special

Initialization.

Parameters:

Name Type Description Default
status int

The status code of the response.

required
message str

The text of the response.

required
Source code in zenml/analytics/utils.py
def __init__(self, status: int, message: str) -> None:
    """Initialization.

    Args:
        status: The status code of the response.
        message: The text of the response.
    """
    self.message = message
    self.status = status
__str__(self) special

Method to represent the instance as a string.

Returns:

Type Description
str

A representation of the message and the status code.

Source code in zenml/analytics/utils.py
def __str__(self) -> str:
    """Method to represent the instance as a string.

    Returns:
        A representation of the message and the status code.
    """
    msg = "[ZenML Analytics] {1}: {0}"
    return msg.format(self.message, self.status)

AnalyticsEncoder (JSONEncoder)

Helper encoder class for JSON serialization.

Source code in zenml/analytics/utils.py
class AnalyticsEncoder(json.JSONEncoder):
    """Helper encoder class for JSON serialization."""

    def default(self, obj: Any) -> Any:
        """The default method to handle UUID and 'AnalyticsEvent' objects.

        Args:
            obj: The object to encode.

        Returns:
            The encoded object.
        """
        from zenml.analytics.enums import AnalyticsEvent

        # If the object is UUID, we simply return the value of UUID
        if isinstance(obj, UUID):
            return str(obj)

        # If the object is an AnalyticsEvent, return its value
        elif isinstance(obj, AnalyticsEvent):
            return str(obj.value)

        return json.JSONEncoder.default(self, obj)
default(self, obj)

The default method to handle UUID and 'AnalyticsEvent' objects.

Parameters:

Name Type Description Default
obj Any

The object to encode.

required

Returns:

Type Description
Any

The encoded object.

Source code in zenml/analytics/utils.py
def default(self, obj: Any) -> Any:
    """The default method to handle UUID and 'AnalyticsEvent' objects.

    Args:
        obj: The object to encode.

    Returns:
        The encoded object.
    """
    from zenml.analytics.enums import AnalyticsEvent

    # If the object is UUID, we simply return the value of UUID
    if isinstance(obj, UUID):
        return str(obj)

    # If the object is an AnalyticsEvent, return its value
    elif isinstance(obj, AnalyticsEvent):
        return str(obj.value)

    return json.JSONEncoder.default(self, obj)

analytics_disabler

Context manager which disables ZenML analytics.

Source code in zenml/analytics/utils.py
class analytics_disabler:
    """Context manager which disables ZenML analytics."""

    def __init__(self) -> None:
        """Initialization of the context manager."""
        self.original_value = os.environ.get(ENV_ZENML_ANALYTICS_OPT_IN)

    def __enter__(self) -> None:
        """Disable the analytics."""
        os.environ[ENV_ZENML_ANALYTICS_OPT_IN] = str(False)

    def __exit__(
        self,
        exc_type: Optional[Any],
        exc_value: Optional[Any],
        traceback: Optional[Any],
    ) -> None:
        """Set it back to the original state.

        Args:
            exc_type: The class of the exception
            exc_value: The instance of the exception
            traceback: The traceback of the exception
        """
        if self.original_value is not None:
            os.environ[ENV_ZENML_ANALYTICS_OPT_IN] = self.original_value
        else:
            del os.environ[ENV_ZENML_ANALYTICS_OPT_IN]
__enter__(self) special

Disable the analytics.

Source code in zenml/analytics/utils.py
def __enter__(self) -> None:
    """Disable the analytics."""
    os.environ[ENV_ZENML_ANALYTICS_OPT_IN] = str(False)
__exit__(self, exc_type, exc_value, traceback) special

Set it back to the original state.

Parameters:

Name Type Description Default
exc_type Optional[Any]

The class of the exception

required
exc_value Optional[Any]

The instance of the exception

required
traceback Optional[Any]

The traceback of the exception

required
Source code in zenml/analytics/utils.py
def __exit__(
    self,
    exc_type: Optional[Any],
    exc_value: Optional[Any],
    traceback: Optional[Any],
) -> None:
    """Set it back to the original state.

    Args:
        exc_type: The class of the exception
        exc_value: The instance of the exception
        traceback: The traceback of the exception
    """
    if self.original_value is not None:
        os.environ[ENV_ZENML_ANALYTICS_OPT_IN] = self.original_value
    else:
        del os.environ[ENV_ZENML_ANALYTICS_OPT_IN]
__init__(self) special

Initialization of the context manager.

Source code in zenml/analytics/utils.py
def __init__(self) -> None:
    """Initialization of the context manager."""
    self.original_value = os.environ.get(ENV_ZENML_ANALYTICS_OPT_IN)

track_handler

Context handler to enable tracking the success status of an event.

Source code in zenml/analytics/utils.py
class track_handler(object):
    """Context handler to enable tracking the success status of an event."""

    def __init__(
        self,
        event: AnalyticsEvent,
        metadata: Optional[Dict[str, Any]] = None,
    ):
        """Initialization of the context manager.

        Args:
            event: The type of the analytics event
            metadata: The metadata of the event.
        """
        self.event: AnalyticsEvent = event
        self.metadata: Dict[str, Any] = metadata or {}

    def __enter__(self) -> "track_handler":
        """Enter function of the event handler.

        Returns:
            the handler instance.
        """
        return self

    def __exit__(
        self,
        type_: Optional[Any],
        value: Optional[Any],
        traceback: Optional[Any],
    ) -> Any:
        """Exit function of the event handler.

        Checks whether there was a traceback and updates the metadata
        accordingly. Following the check, it calls the function to track the
        event.

        Args:
            type_: The class of the exception
            value: The instance of the exception
            traceback: The traceback of the exception

        """
        if traceback is not None:
            self.metadata.update({"event_success": False})
        else:
            self.metadata.update({"event_success": True})

        if type_ is not None:
            self.metadata.update({"event_error_type": type_.__name__})

        track(self.event, self.metadata)
__enter__(self) special

Enter function of the event handler.

Returns:

Type Description
track_handler

the handler instance.

Source code in zenml/analytics/utils.py
def __enter__(self) -> "track_handler":
    """Enter function of the event handler.

    Returns:
        the handler instance.
    """
    return self
__exit__(self, type_, value, traceback) special

Exit function of the event handler.

Checks whether there was a traceback and updates the metadata accordingly. Following the check, it calls the function to track the event.

Parameters:

Name Type Description Default
type_ Optional[Any]

The class of the exception

required
value Optional[Any]

The instance of the exception

required
traceback Optional[Any]

The traceback of the exception

required
Source code in zenml/analytics/utils.py
def __exit__(
    self,
    type_: Optional[Any],
    value: Optional[Any],
    traceback: Optional[Any],
) -> Any:
    """Exit function of the event handler.

    Checks whether there was a traceback and updates the metadata
    accordingly. Following the check, it calls the function to track the
    event.

    Args:
        type_: The class of the exception
        value: The instance of the exception
        traceback: The traceback of the exception

    """
    if traceback is not None:
        self.metadata.update({"event_success": False})
    else:
        self.metadata.update({"event_success": True})

    if type_ is not None:
        self.metadata.update({"event_error_type": type_.__name__})

    track(self.event, self.metadata)
__init__(self, event, metadata=None) special

Initialization of the context manager.

Parameters:

Name Type Description Default
event AnalyticsEvent

The type of the analytics event

required
metadata Optional[Dict[str, Any]]

The metadata of the event.

None
Source code in zenml/analytics/utils.py
def __init__(
    self,
    event: AnalyticsEvent,
    metadata: Optional[Dict[str, Any]] = None,
):
    """Initialization of the context manager.

    Args:
        event: The type of the analytics event
        metadata: The metadata of the event.
    """
    self.event: AnalyticsEvent = event
    self.metadata: Dict[str, Any] = metadata or {}

email_opt_int(opted_in, email, source)

Track the event of the users response to the email prompt, identify them.

Parameters:

Name Type Description Default
opted_in bool

Did the user decide to opt-in

required
email Optional[str]

The email the user optionally provided

required
source str

Location when the user replied ["zenml go", "zenml server"]

required
Source code in zenml/analytics/utils.py
def email_opt_int(opted_in: bool, email: Optional[str], source: str) -> None:
    """Track the event of the users response to the email prompt, identify them.

    Args:
        opted_in: Did the user decide to opt-in
        email: The email the user optionally provided
        source: Location when the user replied ["zenml go", "zenml server"]
    """
    # If the user opted in, associate email with the anonymous distinct ID
    if opted_in and email is not None and email != "":
        identify(metadata={"email": email, "source": source})

    # Track that the user answered the prompt
    track(
        AnalyticsEvent.OPT_IN_OUT_EMAIL,
        {"opted_in": opted_in, "source": source},
    )

track_decorator(event)

Decorator to track event.

If the decorated function takes in a AnalyticsTrackedModelMixin object as an argument or returns one, it will be called to track the event. The return value takes precedence over the argument when determining which object is called to track the event.

If the decorated function is a method of a class that inherits from AnalyticsTrackerMixin, the parent object will be used to intermediate tracking analytics.

Parameters:

Name Type Description Default
event AnalyticsEvent

Event string to stamp with.

required

Returns:

Type Description
Callable[[~F], ~F]

A decorator that applies the analytics tracking to a function.

Source code in zenml/analytics/utils.py
def track_decorator(event: AnalyticsEvent) -> Callable[[F], F]:
    """Decorator to track event.

    If the decorated function takes in a `AnalyticsTrackedModelMixin` object as
    an argument or returns one, it will be called to track the event. The return
    value takes precedence over the argument when determining which object is
    called to track the event.

    If the decorated function is a method of a class that inherits from
    `AnalyticsTrackerMixin`, the parent object will be used to intermediate
    tracking analytics.

    Args:
        event: Event string to stamp with.

    Returns:
        A decorator that applies the analytics tracking to a function.
    """

    def inner_decorator(func: F) -> F:
        """Inner decorator function.

        Args:
            func: Function to decorate.

        Returns:
            Decorated function.
        """

        @wraps(func)
        def inner_func(*args: Any, **kwargs: Any) -> Any:
            """Inner function.

            Args:
                *args: Arguments to be passed to the function.
                **kwargs: Keyword arguments to be passed to the function.

            Returns:
                Result of the function.
            """
            with track_handler(event=event) as handler:
                try:
                    for obj in list(args) + list(kwargs.values()):
                        if isinstance(obj, AnalyticsTrackedModelMixin):
                            handler.metadata = obj.get_analytics_metadata()
                            break
                except Exception as e:
                    logger.debug(f"Analytics tracking failure for {func}: {e}")

                result = func(*args, **kwargs)

                try:
                    if isinstance(result, AnalyticsTrackedModelMixin):
                        handler.metadata = result.get_analytics_metadata()
                except Exception as e:
                    logger.debug(f"Analytics tracking failure for {func}: {e}")

                return result

        return cast(F, inner_func)

    return inner_decorator