Skip to content

Customer Facing Statsbeat: Added logic for dropped item count #41950

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
([#41733](https://github.com/Azure/azure-sdk-for-python/pull/41733))
- Added customer-facing statsbeat preview.
([#41669](https://github.com/Azure/azure-sdk-for-python/pull/41669))
- Customer Facing Statsbeat: Added logic for dropped item count
([#41950](https://github.com/Azure/azure-sdk-for-python/pull/41950))

### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# cSpell:disable

from enum import Enum
from typing import Union
from opentelemetry.semconv.metrics import MetricInstruments
from opentelemetry.semconv.metrics.http_metrics import (
HTTP_CLIENT_REQUEST_DURATION,
Expand Down Expand Up @@ -64,7 +65,6 @@
_MESSAGE_ENVELOPE_NAME = "Microsoft.ApplicationInsights.Message"
_REQUEST_ENVELOPE_NAME = "Microsoft.ApplicationInsights.Request"
_REMOTE_DEPENDENCY_ENVELOPE_NAME = "Microsoft.ApplicationInsights.RemoteDependency"
_REMOTE_DEPENDENCY_ENVELOPE_DATA = "Microsoft.ApplicationInsights.RemoteDependencyData"
_EVENT_ENVELOPE_NAME = "Microsoft.ApplicationInsights.Event"
_PAGE_VIEW_ENVELOPE_NAME = "Microsoft.ApplicationInsights.PageView"
_PERFORMANCE_COUNTER_ENVELOPE_NAME = "Microsoft.ApplicationInsights.PerformanceCounter"
Expand Down Expand Up @@ -145,6 +145,8 @@ class DropCode(str, Enum, metaclass=CaseInsensitiveEnumMeta):
CLIENT_PERSISTENCE_CAPACITY = "CLIENT_PERSISTENCE_CAPACITY"
UNKNOWN = "UNKNOWN"

DropCodeType = Union[DropCode, int]

class RetryCode(str, Enum, metaclass=CaseInsensitiveEnumMeta):
CLIENT_TIMEOUT = "CLIENT_TIMEOUT"
UNKNOWN = "UNKNOWN"
Expand All @@ -167,7 +169,7 @@ def __init__(self, language: str, version: str, compute_type: str):
_TYPE_MAP = {
_EVENT_ENVELOPE_NAME: _CUSTOM_EVENT,
_METRIC_ENVELOPE_NAME: _CUSTOM_METRIC,
_REMOTE_DEPENDENCY_ENVELOPE_DATA: _DEPENDENCY,
_REMOTE_DEPENDENCY_ENVELOPE_NAME: _DEPENDENCY,
_EXCEPTION_ENVELOPE_NAME: _EXCEPTION,
_PAGE_VIEW_ENVELOPE_NAME: _PAGE_VIEW,
_MESSAGE_ENVELOPE_NAME: _TRACE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
metrics that track the usage and performance of the Azure Monitor OpenTelemetry Exporter.
"""

from typing import List, Dict, Any, Iterable
from typing import List, Dict, Any, Iterable, Optional
import os

from opentelemetry.metrics import CallbackOptions, Observation
Expand All @@ -17,7 +17,8 @@
_APPLICATIONINSIGHTS_STATSBEAT_ENABLED_PREVIEW,
_DEFAULT_STATS_SHORT_EXPORT_INTERVAL,
CustomerStatsbeatProperties,
#DropCode,
DropCode,
DropCodeType,
#RetryCode,
CustomerStatsbeatMetricName,
_CUSTOMER_STATSBEAT_LANGUAGE,
Expand All @@ -29,11 +30,15 @@
get_compute_type,
)

from azure.monitor.opentelemetry.exporter.statsbeat._utils import (
categorize_status_code,
)
from azure.monitor.opentelemetry.exporter import VERSION

class _CustomerStatsbeatTelemetryCounters:
def __init__(self):
self.total_item_success_count: Dict[str, Any] = {}
self.total_item_drop_count: Dict[str, Dict[DropCodeType, Dict[str, int]]] = {}

class CustomerStatsbeatMetrics(metaclass=Singleton):
def __init__(self, options):
Expand Down Expand Up @@ -66,20 +71,51 @@ def __init__(self, options):
description="Tracks successful telemetry items sent to Azure Monitor",
callbacks=[self._item_success_callback]
)
self._dropped_gauge = self._customer_statsbeat_meter.create_observable_gauge(
name=CustomerStatsbeatMetricName.ITEM_DROP_COUNT.value,
description="Tracks dropped telemetry items sent to Azure Monitor",
callbacks=[self._item_drop_callback]
)

def count_successful_items(self, count: int, telemetry_type: str) -> None:
if not self._is_enabled or count <= 0:
return

if telemetry_type in self._counters.total_item_success_count:
self._counters.total_item_success_count[telemetry_type] += count
else:
self._counters.total_item_success_count[telemetry_type] = count

def count_dropped_items(
self, count: int, telemetry_type: str, drop_code: DropCodeType,
exception_message: Optional[str] = None
) -> None:
if not self._is_enabled or count <= 0:
return

# Get or create the drop_code map for this telemetry_type
if telemetry_type not in self._counters.total_item_drop_count:
self._counters.total_item_drop_count[telemetry_type] = {}
drop_code_map = self._counters.total_item_drop_count[telemetry_type]

# Get or create the reason map for this drop_code
if drop_code not in drop_code_map:
drop_code_map[drop_code] = {}
reason_map = drop_code_map[drop_code]

# Generate a low-cardinality, informative reason description
reason = self._get_drop_reason(drop_code, exception_message)

# Update the count for this reason
current_count = reason_map.get(reason, 0)
reason_map[reason] = current_count + count

def _item_success_callback(self, options: CallbackOptions) -> Iterable[Observation]: # pylint: disable=unused-argument
if not getattr(self, "_is_enabled", False):
return []

observations: List[Observation] = []

for telemetry_type, count in self._counters.total_item_success_count.items():
attributes = {
"language": self._customer_properties.language,
Expand All @@ -90,3 +126,37 @@ def _item_success_callback(self, options: CallbackOptions) -> Iterable[Observati
observations.append(Observation(count, dict(attributes)))

return observations

def _item_drop_callback(self, options: CallbackOptions) -> Iterable[Observation]: # pylint: disable=unused-argument
if not getattr(self, "_is_enabled", False):
return []
observations: List[Observation] = []
for telemetry_type, drop_code_map in self._counters.total_item_drop_count.items():
for drop_code, reason_map in drop_code_map.items():
for reason, count in reason_map.items():
attributes = {
"language": self._customer_properties.language,
"version": self._customer_properties.version,
"compute_type": self._customer_properties.compute_type,
"drop.code": drop_code,
"drop.reason": reason,
"telemetry_type": telemetry_type
}
observations.append(Observation(count, dict(attributes)))

return observations

def _get_drop_reason(self, drop_code: DropCodeType, exception_message: Optional[str] = None) -> str:
if isinstance(drop_code, int):
return categorize_status_code(drop_code)

if drop_code == DropCode.CLIENT_EXCEPTION:
return exception_message if exception_message else "unknown_exception"

drop_code_reasons = {
DropCode.CLIENT_READONLY: "readonly_mode",
DropCode.CLIENT_STALE_DATA: "stale_data",
DropCode.CLIENT_PERSISTENCE_CAPACITY: "persistence_full",
}

return drop_code_reasons.get(drop_code, "unknown_reason")
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,26 @@ def _update_requests_map(type_name, value):
else:
_REQUESTS_MAP[type_name] = {}
_REQUESTS_MAP[type_name][value] = prev + 1

def categorize_status_code(status_code: int) -> str:
status_map = {
400: "bad_request",
401: "unauthorized",
402: "daily quota exceeded",
403: "forbidden",
404: "not_found",
408: "request_timeout",
413: "payload_too_large",
429: "too_many_requests",
500: "internal_server_error",
502: "bad_gateway",
503: "service_unavailable",
504: "gateway_timeout",
}
if status_code in status_map:
return status_map[status_code]
if 400 <= status_code < 500:
return "client_error_4xx"
if 500 <= status_code < 600:
return "server_error_5xx"
return f"status_{status_code}"
Loading