Skip to content

Commit 24a8199

Browse files
committed
Added logic for retry item count
1 parent 70db1ec commit 24a8199

File tree

4 files changed

+239
-3
lines changed

4 files changed

+239
-3
lines changed

sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
([#41669](https://github.com/Azure/azure-sdk-for-python/pull/41669))
1111
- Customer Facing Statsbeat: Added logic for dropped item count
1212
([#41950](https://github.com/Azure/azure-sdk-for-python/pull/41950))
13+
- Customer Facing Statsbeat: Added logic for retry item count
14+
1315

1416
### Breaking Changes
1517

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,12 @@ class DropCode(str, Enum, metaclass=CaseInsensitiveEnumMeta):
148148
DropCodeType = Union[DropCode, int]
149149

150150
class RetryCode(str, Enum, metaclass=CaseInsensitiveEnumMeta):
151+
CLIENT_EXCEPTION = "CLIENT_EXCEPTION"
151152
CLIENT_TIMEOUT = "CLIENT_TIMEOUT"
152153
UNKNOWN = "UNKNOWN"
153154

155+
RetryCodeType = Union[RetryCode, int]
156+
154157
class CustomerStatsbeatMetricName(str, Enum, metaclass=CaseInsensitiveEnumMeta):
155158
ITEM_SUCCESS_COUNT = "preview.item.success.count"
156159
ITEM_DROP_COUNT = "preview.item.dropped.count"

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_customer_statsbeat.py

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
CustomerStatsbeatProperties,
2020
DropCode,
2121
DropCodeType,
22-
#RetryCode,
22+
RetryCode,
23+
RetryCodeType,
2324
CustomerStatsbeatMetricName,
2425
_CUSTOMER_STATSBEAT_LANGUAGE,
2526
)
@@ -39,8 +40,9 @@ class _CustomerStatsbeatTelemetryCounters:
3940
def __init__(self):
4041
self.total_item_success_count: Dict[str, Any] = {}
4142
self.total_item_drop_count: Dict[str, Dict[DropCodeType, Dict[str, int]]] = {}
43+
self.total_item_retry_count: Dict[str, Dict[RetryCodeType, Dict[str, int]]] = {}
4244

43-
class CustomerStatsbeatMetrics(metaclass=Singleton):
45+
class CustomerStatsbeatMetrics(metaclass=Singleton): # pylint: disable=too-many-instance-attributes
4446
def __init__(self, options):
4547
self._counters = _CustomerStatsbeatTelemetryCounters()
4648
self._language = _CUSTOMER_STATSBEAT_LANGUAGE
@@ -61,11 +63,13 @@ def __init__(self, options):
6163
metric_readers=[self._customer_statsbeat_metric_reader]
6264
)
6365
self._customer_statsbeat_meter = self._customer_statsbeat_meter_provider.get_meter(__name__)
66+
6467
self._customer_properties = CustomerStatsbeatProperties(
6568
language=self._language,
6669
version=VERSION,
6770
compute_type=get_compute_type(),
6871
)
72+
6973
self._success_gauge = self._customer_statsbeat_meter.create_observable_gauge(
7074
name=CustomerStatsbeatMetricName.ITEM_SUCCESS_COUNT.value,
7175
description="Tracks successful telemetry items sent to Azure Monitor",
@@ -76,6 +80,11 @@ def __init__(self, options):
7680
description="Tracks dropped telemetry items sent to Azure Monitor",
7781
callbacks=[self._item_drop_callback]
7882
)
83+
self._retry_gauge = self._customer_statsbeat_meter.create_observable_gauge(
84+
name=CustomerStatsbeatMetricName.ITEM_RETRY_COUNT.value,
85+
description="Tracks retry attempts for telemetry items sent to Azure Monitor",
86+
callbacks=[self._item_retry_callback]
87+
)
7988

8089
def count_successful_items(self, count: int, telemetry_type: str) -> None:
8190
if not self._is_enabled or count <= 0:
@@ -110,6 +119,26 @@ def count_dropped_items(
110119
current_count = reason_map.get(reason, 0)
111120
reason_map[reason] = current_count + count
112121

122+
def count_retry_items(
123+
self, count: int, telemetry_type: str, retry_code: RetryCodeType,
124+
exception_message: Optional[str] = None
125+
) -> None:
126+
if not self._is_enabled or count <= 0:
127+
return
128+
129+
if telemetry_type not in self._counters.total_item_retry_count:
130+
self._counters.total_item_retry_count[telemetry_type] = {}
131+
retry_code_map = self._counters.total_item_retry_count[telemetry_type]
132+
133+
if retry_code not in retry_code_map:
134+
retry_code_map[retry_code] = {}
135+
reason_map = retry_code_map[retry_code]
136+
137+
reason = self._get_retry_reason(retry_code, exception_message)
138+
139+
current_count = reason_map.get(reason, 0)
140+
reason_map[reason] = current_count + count
141+
113142
def _item_success_callback(self, options: CallbackOptions) -> Iterable[Observation]: # pylint: disable=unused-argument
114143
if not getattr(self, "_is_enabled", False):
115144
return []
@@ -146,6 +175,25 @@ def _item_drop_callback(self, options: CallbackOptions) -> Iterable[Observation]
146175

147176
return observations
148177

178+
def _item_retry_callback(self, options: CallbackOptions) -> Iterable[Observation]: # pylint: disable=unused-argument
179+
if not getattr(self, "_is_enabled", False):
180+
return []
181+
observations: List[Observation] = []
182+
for telemetry_type, retry_code_map in self._counters.total_item_retry_count.items():
183+
for retry_code, reason_map in retry_code_map.items():
184+
for reason, count in reason_map.items():
185+
attributes = {
186+
"language": self._customer_properties.language,
187+
"version": self._customer_properties.version,
188+
"compute_type": self._customer_properties.compute_type,
189+
"retry.code": retry_code,
190+
"retry.reason": reason,
191+
"telemetry_type": telemetry_type
192+
}
193+
observations.append(Observation(count, dict(attributes)))
194+
195+
return observations
196+
149197
def _get_drop_reason(self, drop_code: DropCodeType, exception_message: Optional[str] = None) -> str:
150198
if isinstance(drop_code, int):
151199
return categorize_status_code(drop_code)
@@ -160,3 +208,16 @@ def _get_drop_reason(self, drop_code: DropCodeType, exception_message: Optional[
160208
}
161209

162210
return drop_code_reasons.get(drop_code, "unknown_reason")
211+
212+
def _get_retry_reason(self, retry_code: RetryCodeType, exception_message: Optional[str] = None) -> str:
213+
if isinstance(retry_code, int):
214+
return categorize_status_code(retry_code)
215+
216+
if retry_code == RetryCode.CLIENT_EXCEPTION:
217+
return exception_message if exception_message else "unknown_exception"
218+
219+
retry_code_reasons = {
220+
RetryCode.CLIENT_TIMEOUT: "client_timeout",
221+
RetryCode.UNKNOWN: "unknown_reason",
222+
}
223+
return retry_code_reasons.get(retry_code, "unknown_reason")

sdk/monitor/azure-monitor-opentelemetry-exporter/tests/statsbeat/test_customer_statsbeat.py

Lines changed: 171 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
_TYPE_MAP,
2424
DropCode,
2525
DropCodeType,
26+
RetryCode,
27+
RetryCodeType,
2628
_TRACE,
2729
)
2830

@@ -354,4 +356,172 @@ def patched_transmit(self_exporter, envelopes):
354356
drop_code_types.add(type(drop_code).__name__)
355357

356358
# Additional assertion to verify aggregation works
357-
multi_count_categories = [cat for cat, count in category_totals.items() if count > 1]
359+
multi_count_categories = [cat for cat, count in category_totals.items() if count > 1]
360+
361+
def test_retry_items_count(self):
362+
"""Test retry item counting with both RetryCode enums and integer status codes."""
363+
retried_items = 0
364+
365+
metrics = self.mock_options.metrics
366+
metrics._counters.total_item_retry_count.clear()
367+
368+
exporter = AzureMonitorTraceExporter(connection_string=self.mock_options.connection_string)
369+
370+
def patched_transmit(self_exporter, envelopes):
371+
self.mock_options.transmit_called[0] = True
372+
373+
for envelope in envelopes:
374+
if not hasattr(envelope, "data") or not envelope.data:
375+
continue
376+
377+
envelope_name = "Microsoft.ApplicationInsights." + envelope.data.base_type
378+
telemetry_type = _BASE_TYPE_MAP.get(envelope_name, _UNKNOWN)
379+
380+
should_retry = random.choice([True, False])
381+
if should_retry:
382+
nonlocal retried_items
383+
384+
retry_type = random.choice(["http_status", "client_timeout", "unknown"])
385+
386+
if retry_type == "http_status":
387+
# HTTP status codes that would trigger retries
388+
status_codes = [429, 503, 500, 502, 504]
389+
status_code = random.choice(status_codes)
390+
391+
failure_count = random.randint(1, 3)
392+
retried_items += failure_count
393+
394+
metrics.count_retry_items(failure_count, telemetry_type, status_code, None)
395+
elif retry_type == "client_timeout":
396+
timeout_messages = [
397+
"Connection timed out after 30 seconds",
398+
"Request timed out after 60 seconds",
399+
"Operation timed out",
400+
"Socket timeout occurred"
401+
]
402+
403+
exception_message = random.choice(timeout_messages)
404+
405+
# Simulate multiple retries for the same timeout type
406+
failure_count = random.randint(1, 4)
407+
retried_items += failure_count
408+
409+
metrics.count_retry_items(failure_count, telemetry_type, RetryCode.CLIENT_TIMEOUT, exception_message)
410+
else:
411+
# Unknown retry reasons
412+
unknown_messages = [
413+
"Unknown network error",
414+
"Unexpected retry condition",
415+
"Network instability detected",
416+
"Connection reset by peer"
417+
]
418+
419+
exception_message = random.choice(unknown_messages)
420+
421+
failure_count = random.randint(1, 3)
422+
retried_items += failure_count
423+
424+
metrics.count_retry_items(failure_count, telemetry_type, RetryCode.CLIENT_EXCEPTION, exception_message)
425+
426+
continue
427+
428+
return ExportResult.SUCCESS
429+
430+
exporter._transmit = types.MethodType(patched_transmit, exporter)
431+
432+
resource = Resource.create({"service.name": "retry-test", "service.instance.id": "test-instance"})
433+
trace_provider = TracerProvider(resource=resource)
434+
435+
processor = SimpleSpanProcessor(exporter)
436+
trace_provider.add_span_processor(processor)
437+
438+
tracer = trace_provider.get_tracer(__name__)
439+
440+
total_items = random.randint(15, 25) # Increased to get more aggregation
441+
442+
for i in range(total_items):
443+
span_type = random.choice(["client", "server"])
444+
445+
if span_type == "client":
446+
# Client spans generate RemoteDependencyData
447+
with tracer.start_as_current_span(
448+
name=f"dependency-{i}",
449+
kind=trace.SpanKind.CLIENT,
450+
attributes={
451+
"db.system": "mysql",
452+
"db.name": "test_db",
453+
"db.operation": "query",
454+
"net.peer.name": "test-db-server",
455+
"net.peer.port": 3306,
456+
}
457+
) as span:
458+
span.set_status(trace.Status(trace.StatusCode.OK))
459+
time.sleep(0.01)
460+
else:
461+
# Server spans generate RequestData
462+
with tracer.start_as_current_span(
463+
name=f"GET /api/endpoint-{i}",
464+
kind=trace.SpanKind.SERVER,
465+
attributes={
466+
"http.method": "GET",
467+
"http.url": f"https://example.com/api/endpoint-{i}",
468+
"http.route": f"/api/endpoint-{i}",
469+
"http.status_code": 200,
470+
"http.scheme": "https",
471+
"http.host": "example.com",
472+
}
473+
) as span:
474+
span.set_status(trace.Status(trace.StatusCode.OK))
475+
time.sleep(0.01)
476+
477+
trace_provider.force_flush()
478+
479+
self.metrics_instance = metrics
480+
481+
self.assertTrue(self.mock_options.transmit_called[0], "Exporter _transmit method was not called")
482+
483+
# Enhanced counting and verification logic
484+
actual_retried_count = 0
485+
category_totals = {}
486+
http_status_totals = {}
487+
client_timeout_totals = {}
488+
unknown_retry_totals = {}
489+
490+
for telemetry_type, retry_code_data in metrics._counters.total_item_retry_count.items():
491+
for retry_code, reason_map in retry_code_data.items():
492+
if isinstance(reason_map, dict):
493+
for reason, count in reason_map.items():
494+
actual_retried_count += count
495+
category_totals[reason] = category_totals.get(reason, 0) + count
496+
497+
# Separate HTTP status codes from client exceptions
498+
if isinstance(retry_code, int):
499+
http_status_totals[reason] = http_status_totals.get(reason, 0) + count
500+
elif retry_code == RetryCode.CLIENT_TIMEOUT:
501+
client_timeout_totals[reason] = client_timeout_totals.get(reason, 0) + count
502+
elif retry_code == RetryCode.CLIENT_EXCEPTION:
503+
unknown_retry_totals[reason] = unknown_retry_totals.get(reason, 0) + count
504+
else:
505+
actual_retried_count += reason_map
506+
507+
# Main assertion
508+
self.assertEqual(
509+
actual_retried_count,
510+
retried_items,
511+
f"Expected {retried_items} retried items, got {actual_retried_count}. "
512+
f"HTTP Status retries: {len(http_status_totals)}, Client Timeout retries: {len(client_timeout_totals)}, "
513+
f"Unknown retries: {len(unknown_retry_totals)}"
514+
)
515+
516+
# Verify aggregation occurred
517+
self.assertGreater(len(http_status_totals) + len(client_timeout_totals) + len(unknown_retry_totals), 0,
518+
"At least one type of retry should have occurred")
519+
520+
# Verify that both integer and enum retry codes are being stored properly
521+
retry_code_types = set()
522+
for telemetry_type, retry_code_data in metrics._counters.total_item_retry_count.items():
523+
for retry_code in retry_code_data.keys():
524+
retry_code_types.add(type(retry_code).__name__)
525+
526+
# Additional assertion to verify aggregation works
527+
multi_count_categories = [cat for cat, count in category_totals.items() if count > 1]

0 commit comments

Comments
 (0)