diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md index eb6f38b5fb6..59aaf88d529 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md @@ -10,6 +10,9 @@ ([#41669](https://github.com/Azure/azure-sdk-for-python/pull/41669)) - Customer Facing Statsbeat: Added logic for dropped item count ([#41950](https://github.com/Azure/azure-sdk-for-python/pull/41950)) +- Customer Facing Statsbeat: Added logic for retry item count + ([#41971](https://github.com/Azure/azure-sdk-for-python/pull/41971)) + ### Breaking Changes diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py index 95f115a29cc..20b22e91e71 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py @@ -148,9 +148,12 @@ class DropCode(str, Enum, metaclass=CaseInsensitiveEnumMeta): DropCodeType = Union[DropCode, int] class RetryCode(str, Enum, metaclass=CaseInsensitiveEnumMeta): + CLIENT_EXCEPTION = "CLIENT_EXCEPTION" CLIENT_TIMEOUT = "CLIENT_TIMEOUT" UNKNOWN = "UNKNOWN" +RetryCodeType = Union[RetryCode, int] + class CustomerStatsbeatMetricName(str, Enum, metaclass=CaseInsensitiveEnumMeta): ITEM_SUCCESS_COUNT = "preview.item.success.count" ITEM_DROP_COUNT = "preview.item.dropped.count" diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_customer_statsbeat.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_customer_statsbeat.py index d0203b42c87..d10abe82b64 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_customer_statsbeat.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_customer_statsbeat.py @@ -19,7 +19,8 @@ CustomerStatsbeatProperties, DropCode, DropCodeType, - #RetryCode, + RetryCode, + RetryCodeType, CustomerStatsbeatMetricName, _CUSTOMER_STATSBEAT_LANGUAGE, ) @@ -39,8 +40,9 @@ class _CustomerStatsbeatTelemetryCounters: def __init__(self): self.total_item_success_count: Dict[str, Any] = {} self.total_item_drop_count: Dict[str, Dict[DropCodeType, Dict[str, int]]] = {} + self.total_item_retry_count: Dict[str, Dict[RetryCodeType, Dict[str, int]]] = {} -class CustomerStatsbeatMetrics(metaclass=Singleton): +class CustomerStatsbeatMetrics(metaclass=Singleton): # pylint: disable=too-many-instance-attributes def __init__(self, options): self._counters = _CustomerStatsbeatTelemetryCounters() self._language = _CUSTOMER_STATSBEAT_LANGUAGE @@ -61,11 +63,13 @@ def __init__(self, options): metric_readers=[self._customer_statsbeat_metric_reader] ) self._customer_statsbeat_meter = self._customer_statsbeat_meter_provider.get_meter(__name__) + self._customer_properties = CustomerStatsbeatProperties( language=self._language, version=VERSION, compute_type=get_compute_type(), ) + self._success_gauge = self._customer_statsbeat_meter.create_observable_gauge( name=CustomerStatsbeatMetricName.ITEM_SUCCESS_COUNT.value, description="Tracks successful telemetry items sent to Azure Monitor", @@ -76,6 +80,11 @@ def __init__(self, options): description="Tracks dropped telemetry items sent to Azure Monitor", callbacks=[self._item_drop_callback] ) + self._retry_gauge = self._customer_statsbeat_meter.create_observable_gauge( + name=CustomerStatsbeatMetricName.ITEM_RETRY_COUNT.value, + description="Tracks retry attempts for telemetry items sent to Azure Monitor", + callbacks=[self._item_retry_callback] + ) def count_successful_items(self, count: int, telemetry_type: str) -> None: if not self._is_enabled or count <= 0: @@ -110,6 +119,26 @@ def count_dropped_items( current_count = reason_map.get(reason, 0) reason_map[reason] = current_count + count + def count_retry_items( + self, count: int, telemetry_type: str, retry_code: RetryCodeType, + exception_message: Optional[str] = None + ) -> None: + if not self._is_enabled or count <= 0: + return + + if telemetry_type not in self._counters.total_item_retry_count: + self._counters.total_item_retry_count[telemetry_type] = {} + retry_code_map = self._counters.total_item_retry_count[telemetry_type] + + if retry_code not in retry_code_map: + retry_code_map[retry_code] = {} + reason_map = retry_code_map[retry_code] + + reason = self._get_retry_reason(retry_code, exception_message) + + current_count = reason_map.get(reason, 0) + reason_map[reason] = current_count + count + def _item_success_callback(self, options: CallbackOptions) -> Iterable[Observation]: # pylint: disable=unused-argument if not getattr(self, "_is_enabled", False): return [] @@ -146,6 +175,25 @@ def _item_drop_callback(self, options: CallbackOptions) -> Iterable[Observation] return observations + def _item_retry_callback(self, options: CallbackOptions) -> Iterable[Observation]: # pylint: disable=unused-argument + if not getattr(self, "_is_enabled", False): + return [] + observations: List[Observation] = [] + for telemetry_type, retry_code_map in self._counters.total_item_retry_count.items(): + for retry_code, reason_map in retry_code_map.items(): + for reason, count in reason_map.items(): + attributes = { + "language": self._customer_properties.language, + "version": self._customer_properties.version, + "compute_type": self._customer_properties.compute_type, + "retry.code": retry_code, + "retry.reason": reason, + "telemetry_type": telemetry_type + } + observations.append(Observation(count, dict(attributes))) + + return observations + def _get_drop_reason(self, drop_code: DropCodeType, exception_message: Optional[str] = None) -> str: if isinstance(drop_code, int): return categorize_status_code(drop_code) @@ -160,3 +208,16 @@ def _get_drop_reason(self, drop_code: DropCodeType, exception_message: Optional[ } return drop_code_reasons.get(drop_code, "unknown_reason") + + def _get_retry_reason(self, retry_code: RetryCodeType, exception_message: Optional[str] = None) -> str: + if isinstance(retry_code, int): + return categorize_status_code(retry_code) + + if retry_code == RetryCode.CLIENT_EXCEPTION: + return exception_message if exception_message else "unknown_exception" + + retry_code_reasons = { + RetryCode.CLIENT_TIMEOUT: "client_timeout", + RetryCode.UNKNOWN: "unknown_reason", + } + return retry_code_reasons.get(retry_code, "unknown_reason") diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/statsbeat/test_customer_statsbeat.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/statsbeat/test_customer_statsbeat.py index 67885bdb455..239566d048e 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/statsbeat/test_customer_statsbeat.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/statsbeat/test_customer_statsbeat.py @@ -23,6 +23,8 @@ _TYPE_MAP, DropCode, DropCodeType, + RetryCode, + RetryCodeType, _TRACE, ) @@ -354,4 +356,172 @@ def patched_transmit(self_exporter, envelopes): drop_code_types.add(type(drop_code).__name__) # Additional assertion to verify aggregation works - multi_count_categories = [cat for cat, count in category_totals.items() if count > 1] \ No newline at end of file + multi_count_categories = [cat for cat, count in category_totals.items() if count > 1] + + def test_retry_items_count(self): + """Test retry item counting with both RetryCode enums and integer status codes.""" + retried_items = 0 + + metrics = self.mock_options.metrics + metrics._counters.total_item_retry_count.clear() + + exporter = AzureMonitorTraceExporter(connection_string=self.mock_options.connection_string) + + def patched_transmit(self_exporter, envelopes): + self.mock_options.transmit_called[0] = True + + for envelope in envelopes: + if not hasattr(envelope, "data") or not envelope.data: + continue + + envelope_name = "Microsoft.ApplicationInsights." + envelope.data.base_type + telemetry_type = _BASE_TYPE_MAP.get(envelope_name, _UNKNOWN) + + should_retry = random.choice([True, False]) + if should_retry: + nonlocal retried_items + + retry_type = random.choice(["http_status", "client_timeout", "unknown"]) + + if retry_type == "http_status": + # HTTP status codes that would trigger retries + status_codes = [429, 503, 500, 502, 504] + status_code = random.choice(status_codes) + + failure_count = random.randint(1, 3) + retried_items += failure_count + + metrics.count_retry_items(failure_count, telemetry_type, status_code, None) + elif retry_type == "client_timeout": + timeout_messages = [ + "Connection timed out after 30 seconds", + "Request timed out after 60 seconds", + "Operation timed out", + "Socket timeout occurred" + ] + + exception_message = random.choice(timeout_messages) + + # Simulate multiple retries for the same timeout type + failure_count = random.randint(1, 4) + retried_items += failure_count + + metrics.count_retry_items(failure_count, telemetry_type, RetryCode.CLIENT_TIMEOUT, exception_message) + else: + # Unknown retry reasons + unknown_messages = [ + "Unknown network error", + "Unexpected retry condition", + "Network instability detected", + "Connection reset by peer" + ] + + exception_message = random.choice(unknown_messages) + + failure_count = random.randint(1, 3) + retried_items += failure_count + + metrics.count_retry_items(failure_count, telemetry_type, RetryCode.CLIENT_EXCEPTION, exception_message) + + continue + + return ExportResult.SUCCESS + + exporter._transmit = types.MethodType(patched_transmit, exporter) + + resource = Resource.create({"service.name": "retry-test", "service.instance.id": "test-instance"}) + trace_provider = TracerProvider(resource=resource) + + processor = SimpleSpanProcessor(exporter) + trace_provider.add_span_processor(processor) + + tracer = trace_provider.get_tracer(__name__) + + total_items = random.randint(15, 25) # Increased to get more aggregation + + for i in range(total_items): + span_type = random.choice(["client", "server"]) + + if span_type == "client": + # Client spans generate RemoteDependencyData + with tracer.start_as_current_span( + name=f"dependency-{i}", + kind=trace.SpanKind.CLIENT, + attributes={ + "db.system": "mysql", + "db.name": "test_db", + "db.operation": "query", + "net.peer.name": "test-db-server", + "net.peer.port": 3306, + } + ) as span: + span.set_status(trace.Status(trace.StatusCode.OK)) + time.sleep(0.01) + else: + # Server spans generate RequestData + with tracer.start_as_current_span( + name=f"GET /api/endpoint-{i}", + kind=trace.SpanKind.SERVER, + attributes={ + "http.method": "GET", + "http.url": f"https://example.com/api/endpoint-{i}", + "http.route": f"/api/endpoint-{i}", + "http.status_code": 200, + "http.scheme": "https", + "http.host": "example.com", + } + ) as span: + span.set_status(trace.Status(trace.StatusCode.OK)) + time.sleep(0.01) + + trace_provider.force_flush() + + self.metrics_instance = metrics + + self.assertTrue(self.mock_options.transmit_called[0], "Exporter _transmit method was not called") + + # Enhanced counting and verification logic + actual_retried_count = 0 + category_totals = {} + http_status_totals = {} + client_timeout_totals = {} + unknown_retry_totals = {} + + for telemetry_type, retry_code_data in metrics._counters.total_item_retry_count.items(): + for retry_code, reason_map in retry_code_data.items(): + if isinstance(reason_map, dict): + for reason, count in reason_map.items(): + actual_retried_count += count + category_totals[reason] = category_totals.get(reason, 0) + count + + # Separate HTTP status codes from client exceptions + if isinstance(retry_code, int): + http_status_totals[reason] = http_status_totals.get(reason, 0) + count + elif retry_code == RetryCode.CLIENT_TIMEOUT: + client_timeout_totals[reason] = client_timeout_totals.get(reason, 0) + count + elif retry_code == RetryCode.CLIENT_EXCEPTION: + unknown_retry_totals[reason] = unknown_retry_totals.get(reason, 0) + count + else: + actual_retried_count += reason_map + + # Main assertion + self.assertEqual( + actual_retried_count, + retried_items, + f"Expected {retried_items} retried items, got {actual_retried_count}. " + f"HTTP Status retries: {len(http_status_totals)}, Client Timeout retries: {len(client_timeout_totals)}, " + f"Unknown retries: {len(unknown_retry_totals)}" + ) + + # Verify aggregation occurred + self.assertGreater(len(http_status_totals) + len(client_timeout_totals) + len(unknown_retry_totals), 0, + "At least one type of retry should have occurred") + + # Verify that both integer and enum retry codes are being stored properly + retry_code_types = set() + for telemetry_type, retry_code_data in metrics._counters.total_item_retry_count.items(): + for retry_code in retry_code_data.keys(): + retry_code_types.add(type(retry_code).__name__) + + # Additional assertion to verify aggregation works + multi_count_categories = [cat for cat, count in category_totals.items() if count > 1]