Skip to content

Customer Facing Statsbeat: Added logic for retry item count #41971

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
([#41669](https://github.com/Azure/azure-sdk-for-python/pull/41669))
- Customer Facing Statsbeat: Added logic for dropped item count
([#41950](https://github.com/Azure/azure-sdk-for-python/pull/41950))
- Customer Facing Statsbeat: Added logic for retry item count
([#41971](https://github.com/Azure/azure-sdk-for-python/pull/41971))


### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,12 @@ class DropCode(str, Enum, metaclass=CaseInsensitiveEnumMeta):
DropCodeType = Union[DropCode, int]

class RetryCode(str, Enum, metaclass=CaseInsensitiveEnumMeta):
CLIENT_EXCEPTION = "CLIENT_EXCEPTION"
CLIENT_TIMEOUT = "CLIENT_TIMEOUT"
UNKNOWN = "UNKNOWN"

RetryCodeType = Union[RetryCode, int]

class CustomerStatsbeatMetricName(str, Enum, metaclass=CaseInsensitiveEnumMeta):
ITEM_SUCCESS_COUNT = "preview.item.success.count"
ITEM_DROP_COUNT = "preview.item.dropped.count"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
CustomerStatsbeatProperties,
DropCode,
DropCodeType,
#RetryCode,
RetryCode,
RetryCodeType,
CustomerStatsbeatMetricName,
_CUSTOMER_STATSBEAT_LANGUAGE,
)
Expand All @@ -39,8 +40,9 @@ class _CustomerStatsbeatTelemetryCounters:
def __init__(self):
self.total_item_success_count: Dict[str, Any] = {}
self.total_item_drop_count: Dict[str, Dict[DropCodeType, Dict[str, int]]] = {}
self.total_item_retry_count: Dict[str, Dict[RetryCodeType, Dict[str, int]]] = {}

class CustomerStatsbeatMetrics(metaclass=Singleton):
class CustomerStatsbeatMetrics(metaclass=Singleton): # pylint: disable=too-many-instance-attributes
def __init__(self, options):
self._counters = _CustomerStatsbeatTelemetryCounters()
self._language = _CUSTOMER_STATSBEAT_LANGUAGE
Expand All @@ -61,11 +63,13 @@ def __init__(self, options):
metric_readers=[self._customer_statsbeat_metric_reader]
)
self._customer_statsbeat_meter = self._customer_statsbeat_meter_provider.get_meter(__name__)

self._customer_properties = CustomerStatsbeatProperties(
language=self._language,
version=VERSION,
compute_type=get_compute_type(),
)

self._success_gauge = self._customer_statsbeat_meter.create_observable_gauge(
name=CustomerStatsbeatMetricName.ITEM_SUCCESS_COUNT.value,
description="Tracks successful telemetry items sent to Azure Monitor",
Expand All @@ -76,6 +80,11 @@ def __init__(self, options):
description="Tracks dropped telemetry items sent to Azure Monitor",
callbacks=[self._item_drop_callback]
)
self._retry_gauge = self._customer_statsbeat_meter.create_observable_gauge(
name=CustomerStatsbeatMetricName.ITEM_RETRY_COUNT.value,
description="Tracks retry attempts for telemetry items sent to Azure Monitor",
callbacks=[self._item_retry_callback]
)

def count_successful_items(self, count: int, telemetry_type: str) -> None:
if not self._is_enabled or count <= 0:
Expand Down Expand Up @@ -110,6 +119,26 @@ def count_dropped_items(
current_count = reason_map.get(reason, 0)
reason_map[reason] = current_count + count

def count_retry_items(
self, count: int, telemetry_type: str, retry_code: RetryCodeType,
exception_message: Optional[str] = None
) -> None:
if not self._is_enabled or count <= 0:
return

if telemetry_type not in self._counters.total_item_retry_count:
self._counters.total_item_retry_count[telemetry_type] = {}
retry_code_map = self._counters.total_item_retry_count[telemetry_type]

if retry_code not in retry_code_map:
retry_code_map[retry_code] = {}
reason_map = retry_code_map[retry_code]

reason = self._get_retry_reason(retry_code, exception_message)

current_count = reason_map.get(reason, 0)
reason_map[reason] = current_count + count

def _item_success_callback(self, options: CallbackOptions) -> Iterable[Observation]: # pylint: disable=unused-argument
if not getattr(self, "_is_enabled", False):
return []
Expand Down Expand Up @@ -146,6 +175,25 @@ def _item_drop_callback(self, options: CallbackOptions) -> Iterable[Observation]

return observations

def _item_retry_callback(self, options: CallbackOptions) -> Iterable[Observation]: # pylint: disable=unused-argument
if not getattr(self, "_is_enabled", False):
return []
observations: List[Observation] = []
for telemetry_type, retry_code_map in self._counters.total_item_retry_count.items():
for retry_code, reason_map in retry_code_map.items():
for reason, count in reason_map.items():
attributes = {
"language": self._customer_properties.language,
"version": self._customer_properties.version,
"compute_type": self._customer_properties.compute_type,
"retry.code": retry_code,
"retry.reason": reason,
"telemetry_type": telemetry_type
}
observations.append(Observation(count, dict(attributes)))

return observations

def _get_drop_reason(self, drop_code: DropCodeType, exception_message: Optional[str] = None) -> str:
if isinstance(drop_code, int):
return categorize_status_code(drop_code)
Expand All @@ -160,3 +208,16 @@ def _get_drop_reason(self, drop_code: DropCodeType, exception_message: Optional[
}

return drop_code_reasons.get(drop_code, "unknown_reason")

def _get_retry_reason(self, retry_code: RetryCodeType, exception_message: Optional[str] = None) -> str:
if isinstance(retry_code, int):
return categorize_status_code(retry_code)

if retry_code == RetryCode.CLIENT_EXCEPTION:
return exception_message if exception_message else "unknown_exception"

retry_code_reasons = {
RetryCode.CLIENT_TIMEOUT: "client_timeout",
RetryCode.UNKNOWN: "unknown_reason",
}
return retry_code_reasons.get(retry_code, "unknown_reason")
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
_TYPE_MAP,
DropCode,
DropCodeType,
RetryCode,
RetryCodeType,
_TRACE,
)

Expand Down Expand Up @@ -354,4 +356,172 @@ def patched_transmit(self_exporter, envelopes):
drop_code_types.add(type(drop_code).__name__)

# Additional assertion to verify aggregation works
multi_count_categories = [cat for cat, count in category_totals.items() if count > 1]
multi_count_categories = [cat for cat, count in category_totals.items() if count > 1]

def test_retry_items_count(self):
"""Test retry item counting with both RetryCode enums and integer status codes."""
retried_items = 0

metrics = self.mock_options.metrics
metrics._counters.total_item_retry_count.clear()

exporter = AzureMonitorTraceExporter(connection_string=self.mock_options.connection_string)

def patched_transmit(self_exporter, envelopes):
self.mock_options.transmit_called[0] = True

for envelope in envelopes:
if not hasattr(envelope, "data") or not envelope.data:
continue

envelope_name = "Microsoft.ApplicationInsights." + envelope.data.base_type
telemetry_type = _BASE_TYPE_MAP.get(envelope_name, _UNKNOWN)

should_retry = random.choice([True, False])
if should_retry:
nonlocal retried_items

retry_type = random.choice(["http_status", "client_timeout", "unknown"])

if retry_type == "http_status":
# HTTP status codes that would trigger retries
status_codes = [429, 503, 500, 502, 504]
status_code = random.choice(status_codes)

failure_count = random.randint(1, 3)
retried_items += failure_count

metrics.count_retry_items(failure_count, telemetry_type, status_code, None)
elif retry_type == "client_timeout":
timeout_messages = [
"Connection timed out after 30 seconds",
"Request timed out after 60 seconds",
"Operation timed out",
"Socket timeout occurred"
]

exception_message = random.choice(timeout_messages)

# Simulate multiple retries for the same timeout type
failure_count = random.randint(1, 4)
retried_items += failure_count

metrics.count_retry_items(failure_count, telemetry_type, RetryCode.CLIENT_TIMEOUT, exception_message)
else:
# Unknown retry reasons
unknown_messages = [
"Unknown network error",
"Unexpected retry condition",
"Network instability detected",
"Connection reset by peer"
]

exception_message = random.choice(unknown_messages)

failure_count = random.randint(1, 3)
retried_items += failure_count

metrics.count_retry_items(failure_count, telemetry_type, RetryCode.CLIENT_EXCEPTION, exception_message)

continue

return ExportResult.SUCCESS

exporter._transmit = types.MethodType(patched_transmit, exporter)

resource = Resource.create({"service.name": "retry-test", "service.instance.id": "test-instance"})
trace_provider = TracerProvider(resource=resource)

processor = SimpleSpanProcessor(exporter)
trace_provider.add_span_processor(processor)

tracer = trace_provider.get_tracer(__name__)

total_items = random.randint(15, 25) # Increased to get more aggregation

for i in range(total_items):
span_type = random.choice(["client", "server"])

if span_type == "client":
# Client spans generate RemoteDependencyData
with tracer.start_as_current_span(
name=f"dependency-{i}",
kind=trace.SpanKind.CLIENT,
attributes={
"db.system": "mysql",
"db.name": "test_db",
"db.operation": "query",
"net.peer.name": "test-db-server",
"net.peer.port": 3306,
}
) as span:
span.set_status(trace.Status(trace.StatusCode.OK))
time.sleep(0.01)
else:
# Server spans generate RequestData
with tracer.start_as_current_span(
name=f"GET /api/endpoint-{i}",
kind=trace.SpanKind.SERVER,
attributes={
"http.method": "GET",
"http.url": f"https://example.com/api/endpoint-{i}",
"http.route": f"/api/endpoint-{i}",
"http.status_code": 200,
"http.scheme": "https",
"http.host": "example.com",
}
) as span:
span.set_status(trace.Status(trace.StatusCode.OK))
time.sleep(0.01)

trace_provider.force_flush()

self.metrics_instance = metrics

self.assertTrue(self.mock_options.transmit_called[0], "Exporter _transmit method was not called")

# Enhanced counting and verification logic
actual_retried_count = 0
category_totals = {}
http_status_totals = {}
client_timeout_totals = {}
unknown_retry_totals = {}

for telemetry_type, retry_code_data in metrics._counters.total_item_retry_count.items():
for retry_code, reason_map in retry_code_data.items():
if isinstance(reason_map, dict):
for reason, count in reason_map.items():
actual_retried_count += count
category_totals[reason] = category_totals.get(reason, 0) + count

# Separate HTTP status codes from client exceptions
if isinstance(retry_code, int):
http_status_totals[reason] = http_status_totals.get(reason, 0) + count
elif retry_code == RetryCode.CLIENT_TIMEOUT:
client_timeout_totals[reason] = client_timeout_totals.get(reason, 0) + count
elif retry_code == RetryCode.CLIENT_EXCEPTION:
unknown_retry_totals[reason] = unknown_retry_totals.get(reason, 0) + count
else:
actual_retried_count += reason_map

# Main assertion
self.assertEqual(
actual_retried_count,
retried_items,
f"Expected {retried_items} retried items, got {actual_retried_count}. "
f"HTTP Status retries: {len(http_status_totals)}, Client Timeout retries: {len(client_timeout_totals)}, "
f"Unknown retries: {len(unknown_retry_totals)}"
)

# Verify aggregation occurred
self.assertGreater(len(http_status_totals) + len(client_timeout_totals) + len(unknown_retry_totals), 0,
"At least one type of retry should have occurred")

# Verify that both integer and enum retry codes are being stored properly
retry_code_types = set()
for telemetry_type, retry_code_data in metrics._counters.total_item_retry_count.items():
for retry_code in retry_code_data.keys():
retry_code_types.add(type(retry_code).__name__)

# Additional assertion to verify aggregation works
multi_count_categories = [cat for cat, count in category_totals.items() if count > 1]